Skip to main content

grafeo_core/graph/lpg/
store.rs

1//! The in-memory LPG graph store.
2//!
3//! This is where your nodes and edges actually live. Most users interact
4//! through [`GrafeoDB`](grafeo_engine::GrafeoDB), but algorithm implementers
5//! sometimes need the raw [`LpgStore`] for direct adjacency traversal.
6//!
7//! Key features:
8//! - MVCC versioning - concurrent readers don't block each other
9//! - Columnar properties with zone maps for fast filtering
10//! - Forward and backward adjacency indexes
11
12use super::property::CompareOp;
13use super::{Edge, EdgeRecord, Node, NodeRecord, PropertyStorage};
14use crate::graph::Direction;
15use crate::index::adjacency::ChunkedAdjacency;
16use crate::index::zone_map::ZoneMapEntry;
17use crate::statistics::{EdgeTypeStatistics, LabelStatistics, Statistics};
18use arcstr::ArcStr;
19use dashmap::DashMap;
20#[cfg(not(feature = "tiered-storage"))]
21use grafeo_common::mvcc::VersionChain;
22use grafeo_common::types::{EdgeId, EpochId, HashableValue, NodeId, PropertyKey, TxId, Value};
23use grafeo_common::utils::hash::{FxHashMap, FxHashSet};
24use parking_lot::RwLock;
25use std::cmp::Ordering as CmpOrdering;
26use std::sync::Arc;
27use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
28
29#[cfg(feature = "vector-index")]
30use crate::index::vector::HnswIndex;
31
32/// Compares two values for ordering (used for range checks).
33fn compare_values_for_range(a: &Value, b: &Value) -> Option<CmpOrdering> {
34    match (a, b) {
35        (Value::Int64(a), Value::Int64(b)) => Some(a.cmp(b)),
36        (Value::Float64(a), Value::Float64(b)) => a.partial_cmp(b),
37        (Value::String(a), Value::String(b)) => Some(a.cmp(b)),
38        (Value::Bool(a), Value::Bool(b)) => Some(a.cmp(b)),
39        _ => None,
40    }
41}
42
43/// Checks if a value is within a range.
44fn value_in_range(
45    value: &Value,
46    min: Option<&Value>,
47    max: Option<&Value>,
48    min_inclusive: bool,
49    max_inclusive: bool,
50) -> bool {
51    // Check lower bound
52    if let Some(min_val) = min {
53        match compare_values_for_range(value, min_val) {
54            Some(CmpOrdering::Less) => return false,
55            Some(CmpOrdering::Equal) if !min_inclusive => return false,
56            None => return false, // Can't compare
57            _ => {}
58        }
59    }
60
61    // Check upper bound
62    if let Some(max_val) = max {
63        match compare_values_for_range(value, max_val) {
64            Some(CmpOrdering::Greater) => return false,
65            Some(CmpOrdering::Equal) if !max_inclusive => return false,
66            None => return false,
67            _ => {}
68        }
69    }
70
71    true
72}
73
74// Tiered storage imports
75#[cfg(feature = "tiered-storage")]
76use crate::storage::EpochStore;
77#[cfg(feature = "tiered-storage")]
78use grafeo_common::memory::arena::ArenaAllocator;
79#[cfg(feature = "tiered-storage")]
80use grafeo_common::mvcc::{ColdVersionRef, HotVersionRef, VersionIndex, VersionRef};
81
82/// Configuration for the LPG store.
83///
84/// The defaults work well for most cases. Tune `backward_edges` if you only
85/// traverse in one direction (saves memory), or adjust capacities if you know
86/// your graph size upfront (avoids reallocations).
87#[derive(Debug, Clone)]
88pub struct LpgStoreConfig {
89    /// Maintain backward adjacency for incoming edge queries. Turn off if
90    /// you only traverse outgoing edges - saves ~50% adjacency memory.
91    pub backward_edges: bool,
92    /// Initial capacity for nodes (avoids early reallocations).
93    pub initial_node_capacity: usize,
94    /// Initial capacity for edges (avoids early reallocations).
95    pub initial_edge_capacity: usize,
96}
97
98impl Default for LpgStoreConfig {
99    fn default() -> Self {
100        Self {
101            backward_edges: true,
102            initial_node_capacity: 1024,
103            initial_edge_capacity: 4096,
104        }
105    }
106}
107
108/// The core in-memory graph storage.
109///
110/// Everything lives here: nodes, edges, properties, adjacency indexes, and
111/// version chains for MVCC. Concurrent reads never block each other.
112///
113/// Most users should go through `GrafeoDB` (from the `grafeo_engine` crate) which
114/// adds transaction management and query execution. Use `LpgStore` directly
115/// when you need raw performance for algorithm implementations.
116///
117/// # Example
118///
119/// ```
120/// use grafeo_core::graph::lpg::LpgStore;
121/// use grafeo_core::graph::Direction;
122///
123/// let store = LpgStore::new();
124///
125/// // Create a small social network
126/// let alice = store.create_node(&["Person"]);
127/// let bob = store.create_node(&["Person"]);
128/// store.create_edge(alice, bob, "KNOWS");
129///
130/// // Traverse outgoing edges
131/// for neighbor in store.neighbors(alice, Direction::Outgoing) {
132///     println!("Alice knows node {:?}", neighbor);
133/// }
134/// ```
135///
136/// # Lock Ordering
137///
138/// `LpgStore` contains multiple `RwLock` fields that must be acquired in a
139/// consistent order to prevent deadlocks. Always acquire locks in this order:
140///
141/// ## Level 1 - Entity Storage (mutually exclusive via feature flag)
142/// 1. `nodes` / `node_versions`
143/// 2. `edges` / `edge_versions`
144///
145/// ## Level 2 - Catalogs (acquire as pairs when writing)
146/// 3. `label_to_id` + `id_to_label`
147/// 4. `edge_type_to_id` + `id_to_edge_type`
148///
149/// ## Level 3 - Indexes
150/// 5. `label_index`
151/// 6. `node_labels`
152/// 7. `property_indexes`
153///
154/// ## Level 4 - Statistics
155/// 8. `statistics`
156///
157/// ## Level 5 - Nested Locks (internal to other structs)
158/// 9. `PropertyStorage::columns` (via `node_properties`/`edge_properties`)
159/// 10. `ChunkedAdjacency::lists` (via `forward_adj`/`backward_adj`)
160///
161/// ## Rules
162/// - Catalog pairs must be acquired together when writing.
163/// - Never hold entity locks while acquiring catalog locks in a different scope.
164/// - Statistics lock is always last.
165/// - Read locks are generally safe, but avoid read-to-write upgrades.
166pub struct LpgStore {
167    /// Configuration.
168    #[allow(dead_code)]
169    config: LpgStoreConfig,
170
171    /// Node records indexed by NodeId, with version chains for MVCC.
172    /// Used when `tiered-storage` feature is disabled.
173    /// Lock order: 1
174    #[cfg(not(feature = "tiered-storage"))]
175    nodes: RwLock<FxHashMap<NodeId, VersionChain<NodeRecord>>>,
176
177    /// Edge records indexed by EdgeId, with version chains for MVCC.
178    /// Used when `tiered-storage` feature is disabled.
179    /// Lock order: 2
180    #[cfg(not(feature = "tiered-storage"))]
181    edges: RwLock<FxHashMap<EdgeId, VersionChain<EdgeRecord>>>,
182
183    // === Tiered Storage Fields (feature-gated) ===
184    //
185    // Lock ordering for arena access:
186    //   version_lock (read/write) → arena read lock (via arena_allocator.arena())
187    //
188    // Rules:
189    // - Acquire arena read lock *after* version locks, never before.
190    // - Multiple threads may call arena.read_at() concurrently (shared refs only).
191    // - Never acquire arena write lock (alloc_new_chunk) while holding version locks.
192    // - freeze_epoch order: node_versions.read() → arena.read_at(),
193    //   then edge_versions.read() → arena.read_at().
194    /// Arena allocator for hot data storage.
195    /// Data is stored in per-epoch arenas for fast allocation and bulk deallocation.
196    #[cfg(feature = "tiered-storage")]
197    arena_allocator: Arc<ArenaAllocator>,
198
199    /// Node version indexes - store metadata and arena offsets.
200    /// The actual NodeRecord data is stored in the arena.
201    /// Lock order: 1
202    #[cfg(feature = "tiered-storage")]
203    node_versions: RwLock<FxHashMap<NodeId, VersionIndex>>,
204
205    /// Edge version indexes - store metadata and arena offsets.
206    /// The actual EdgeRecord data is stored in the arena.
207    /// Lock order: 2
208    #[cfg(feature = "tiered-storage")]
209    edge_versions: RwLock<FxHashMap<EdgeId, VersionIndex>>,
210
211    /// Cold storage for frozen epochs.
212    /// Contains compressed epoch blocks for historical data.
213    #[cfg(feature = "tiered-storage")]
214    epoch_store: Arc<EpochStore>,
215
216    /// Property storage for nodes.
217    node_properties: PropertyStorage<NodeId>,
218
219    /// Property storage for edges.
220    edge_properties: PropertyStorage<EdgeId>,
221
222    /// Label name to ID mapping.
223    /// Lock order: 3 (acquire with id_to_label)
224    label_to_id: RwLock<FxHashMap<ArcStr, u32>>,
225
226    /// Label ID to name mapping.
227    /// Lock order: 3 (acquire with label_to_id)
228    id_to_label: RwLock<Vec<ArcStr>>,
229
230    /// Edge type name to ID mapping.
231    /// Lock order: 4 (acquire with id_to_edge_type)
232    edge_type_to_id: RwLock<FxHashMap<ArcStr, u32>>,
233
234    /// Edge type ID to name mapping.
235    /// Lock order: 4 (acquire with edge_type_to_id)
236    id_to_edge_type: RwLock<Vec<ArcStr>>,
237
238    /// Forward adjacency lists (outgoing edges).
239    forward_adj: ChunkedAdjacency,
240
241    /// Backward adjacency lists (incoming edges).
242    /// Only populated if config.backward_edges is true.
243    backward_adj: Option<ChunkedAdjacency>,
244
245    /// Label index: label_id -> set of node IDs.
246    /// Lock order: 5
247    label_index: RwLock<Vec<FxHashMap<NodeId, ()>>>,
248
249    /// Node labels: node_id -> set of label IDs.
250    /// Reverse mapping to efficiently get labels for a node.
251    /// Lock order: 6
252    node_labels: RwLock<FxHashMap<NodeId, FxHashSet<u32>>>,
253
254    /// Property indexes: property_key -> (value -> set of node IDs).
255    ///
256    /// When a property is indexed, lookups by value are O(1) instead of O(n).
257    /// Use [`create_property_index`] to enable indexing for a property.
258    /// Lock order: 7
259    property_indexes: RwLock<FxHashMap<PropertyKey, DashMap<HashableValue, FxHashSet<NodeId>>>>,
260
261    /// Vector indexes: "label:property" -> HNSW index.
262    ///
263    /// Created via [`GrafeoDB::create_vector_index`](grafeo_engine::GrafeoDB::create_vector_index).
264    /// Lock order: 7 (same level as property_indexes, disjoint keys)
265    #[cfg(feature = "vector-index")]
266    vector_indexes: RwLock<FxHashMap<String, Arc<HnswIndex>>>,
267
268    /// Text indexes: "label:property" -> inverted index with BM25 scoring.
269    ///
270    /// Created via [`GrafeoDB::create_text_index`](grafeo_engine::GrafeoDB::create_text_index).
271    /// Lock order: 7 (same level as property_indexes, disjoint keys)
272    #[cfg(feature = "text-index")]
273    text_indexes: RwLock<FxHashMap<String, Arc<RwLock<crate::index::text::InvertedIndex>>>>,
274
275    /// Next node ID.
276    next_node_id: AtomicU64,
277
278    /// Next edge ID.
279    next_edge_id: AtomicU64,
280
281    /// Current epoch.
282    current_epoch: AtomicU64,
283
284    /// Statistics for cost-based optimization.
285    /// Lock order: 8 (always last)
286    statistics: RwLock<Arc<Statistics>>,
287
288    /// Whether statistics need recomputation after mutations.
289    needs_stats_recompute: AtomicBool,
290}
291
292impl LpgStore {
293    /// Creates a new LPG store with default configuration.
294    #[must_use]
295    pub fn new() -> Self {
296        Self::with_config(LpgStoreConfig::default())
297    }
298
299    /// Creates a new LPG store with custom configuration.
300    #[must_use]
301    pub fn with_config(config: LpgStoreConfig) -> Self {
302        let backward_adj = if config.backward_edges {
303            Some(ChunkedAdjacency::new())
304        } else {
305            None
306        };
307
308        Self {
309            #[cfg(not(feature = "tiered-storage"))]
310            nodes: RwLock::new(FxHashMap::default()),
311            #[cfg(not(feature = "tiered-storage"))]
312            edges: RwLock::new(FxHashMap::default()),
313            #[cfg(feature = "tiered-storage")]
314            arena_allocator: Arc::new(ArenaAllocator::new()),
315            #[cfg(feature = "tiered-storage")]
316            node_versions: RwLock::new(FxHashMap::default()),
317            #[cfg(feature = "tiered-storage")]
318            edge_versions: RwLock::new(FxHashMap::default()),
319            #[cfg(feature = "tiered-storage")]
320            epoch_store: Arc::new(EpochStore::new()),
321            node_properties: PropertyStorage::new(),
322            edge_properties: PropertyStorage::new(),
323            label_to_id: RwLock::new(FxHashMap::default()),
324            id_to_label: RwLock::new(Vec::new()),
325            edge_type_to_id: RwLock::new(FxHashMap::default()),
326            id_to_edge_type: RwLock::new(Vec::new()),
327            forward_adj: ChunkedAdjacency::new(),
328            backward_adj,
329            label_index: RwLock::new(Vec::new()),
330            node_labels: RwLock::new(FxHashMap::default()),
331            property_indexes: RwLock::new(FxHashMap::default()),
332            #[cfg(feature = "vector-index")]
333            vector_indexes: RwLock::new(FxHashMap::default()),
334            #[cfg(feature = "text-index")]
335            text_indexes: RwLock::new(FxHashMap::default()),
336            next_node_id: AtomicU64::new(0),
337            next_edge_id: AtomicU64::new(0),
338            current_epoch: AtomicU64::new(0),
339            statistics: RwLock::new(Arc::new(Statistics::new())),
340            needs_stats_recompute: AtomicBool::new(true),
341            config,
342        }
343    }
344
345    /// Returns the current epoch.
346    #[must_use]
347    pub fn current_epoch(&self) -> EpochId {
348        EpochId::new(self.current_epoch.load(Ordering::Acquire))
349    }
350
351    /// Creates a new epoch.
352    pub fn new_epoch(&self) -> EpochId {
353        let id = self.current_epoch.fetch_add(1, Ordering::AcqRel) + 1;
354        EpochId::new(id)
355    }
356
357    /// Syncs the store epoch to match an external epoch counter.
358    ///
359    /// Used by the transaction manager to keep the store's epoch in step
360    /// after a transaction commit advances the global epoch.
361    pub fn sync_epoch(&self, epoch: EpochId) {
362        self.current_epoch
363            .fetch_max(epoch.as_u64(), Ordering::AcqRel);
364    }
365
366    // === Node Operations ===
367
368    /// Creates a new node with the given labels.
369    ///
370    /// Uses the system transaction for non-transactional operations.
371    pub fn create_node(&self, labels: &[&str]) -> NodeId {
372        self.needs_stats_recompute.store(true, Ordering::Relaxed);
373        self.create_node_versioned(labels, self.current_epoch(), TxId::SYSTEM)
374    }
375
376    /// Creates a new node with the given labels within a transaction context.
377    #[cfg(not(feature = "tiered-storage"))]
378    pub fn create_node_versioned(&self, labels: &[&str], epoch: EpochId, tx_id: TxId) -> NodeId {
379        let id = NodeId::new(self.next_node_id.fetch_add(1, Ordering::Relaxed));
380
381        let mut record = NodeRecord::new(id, epoch);
382        record.set_label_count(labels.len() as u16);
383
384        // Store labels in node_labels map and label_index
385        let mut node_label_set = FxHashSet::default();
386        for label in labels {
387            let label_id = self.get_or_create_label_id(*label);
388            node_label_set.insert(label_id);
389
390            // Update label index
391            let mut index = self.label_index.write();
392            while index.len() <= label_id as usize {
393                index.push(FxHashMap::default());
394            }
395            index[label_id as usize].insert(id, ());
396        }
397
398        // Store node's labels
399        self.node_labels.write().insert(id, node_label_set);
400
401        // Create version chain with initial version
402        let chain = VersionChain::with_initial(record, epoch, tx_id);
403        self.nodes.write().insert(id, chain);
404        id
405    }
406
407    /// Creates a new node with the given labels within a transaction context.
408    /// (Tiered storage version: stores data in arena, metadata in VersionIndex)
409    #[cfg(feature = "tiered-storage")]
410    pub fn create_node_versioned(&self, labels: &[&str], epoch: EpochId, tx_id: TxId) -> NodeId {
411        let id = NodeId::new(self.next_node_id.fetch_add(1, Ordering::Relaxed));
412
413        let mut record = NodeRecord::new(id, epoch);
414        record.set_label_count(labels.len() as u16);
415
416        // Store labels in node_labels map and label_index
417        let mut node_label_set = FxHashSet::default();
418        for label in labels {
419            let label_id = self.get_or_create_label_id(*label);
420            node_label_set.insert(label_id);
421
422            // Update label index
423            let mut index = self.label_index.write();
424            while index.len() <= label_id as usize {
425                index.push(FxHashMap::default());
426            }
427            index[label_id as usize].insert(id, ());
428        }
429
430        // Store node's labels
431        self.node_labels.write().insert(id, node_label_set);
432
433        // Allocate record in arena and get offset (create epoch if needed)
434        let arena = self.arena_allocator.arena_or_create(epoch);
435        let (offset, _stored) = arena.alloc_value_with_offset(record);
436
437        // Create HotVersionRef pointing to arena data
438        let hot_ref = HotVersionRef::new(epoch, offset, tx_id);
439
440        // Create or update version index
441        let mut versions = self.node_versions.write();
442        if let Some(index) = versions.get_mut(&id) {
443            index.add_hot(hot_ref);
444        } else {
445            versions.insert(id, VersionIndex::with_initial(hot_ref));
446        }
447
448        id
449    }
450
451    /// Creates a new node with labels and properties.
452    pub fn create_node_with_props(
453        &self,
454        labels: &[&str],
455        properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
456    ) -> NodeId {
457        self.create_node_with_props_versioned(
458            labels,
459            properties,
460            self.current_epoch(),
461            TxId::SYSTEM,
462        )
463    }
464
465    /// Creates a new node with labels and properties within a transaction context.
466    #[cfg(not(feature = "tiered-storage"))]
467    pub fn create_node_with_props_versioned(
468        &self,
469        labels: &[&str],
470        properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
471        epoch: EpochId,
472        tx_id: TxId,
473    ) -> NodeId {
474        let id = self.create_node_versioned(labels, epoch, tx_id);
475
476        for (key, value) in properties {
477            let prop_key: PropertyKey = key.into();
478            let prop_value: Value = value.into();
479            // Update property index before setting the property
480            self.update_property_index_on_set(id, &prop_key, &prop_value);
481            self.node_properties.set(id, prop_key, prop_value);
482        }
483
484        // Update props_count in record
485        let count = self.node_properties.get_all(id).len() as u16;
486        if let Some(chain) = self.nodes.write().get_mut(&id)
487            && let Some(record) = chain.latest_mut()
488        {
489            record.props_count = count;
490        }
491
492        id
493    }
494
495    /// Creates a new node with labels and properties within a transaction context.
496    /// (Tiered storage version)
497    #[cfg(feature = "tiered-storage")]
498    pub fn create_node_with_props_versioned(
499        &self,
500        labels: &[&str],
501        properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
502        epoch: EpochId,
503        tx_id: TxId,
504    ) -> NodeId {
505        let id = self.create_node_versioned(labels, epoch, tx_id);
506
507        for (key, value) in properties {
508            let prop_key: PropertyKey = key.into();
509            let prop_value: Value = value.into();
510            // Update property index before setting the property
511            self.update_property_index_on_set(id, &prop_key, &prop_value);
512            self.node_properties.set(id, prop_key, prop_value);
513        }
514
515        // Note: props_count in record is not updated for tiered storage.
516        // The record is immutable once allocated in the arena.
517
518        id
519    }
520
521    /// Gets a node by ID (latest visible version).
522    #[must_use]
523    pub fn get_node(&self, id: NodeId) -> Option<Node> {
524        self.get_node_at_epoch(id, self.current_epoch())
525    }
526
527    /// Gets a node by ID at a specific epoch.
528    #[must_use]
529    #[cfg(not(feature = "tiered-storage"))]
530    pub fn get_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> Option<Node> {
531        let nodes = self.nodes.read();
532        let chain = nodes.get(&id)?;
533        let record = chain.visible_at(epoch)?;
534
535        if record.is_deleted() {
536            return None;
537        }
538
539        let mut node = Node::new(id);
540
541        // Get labels from node_labels map
542        let id_to_label = self.id_to_label.read();
543        let node_labels = self.node_labels.read();
544        if let Some(label_ids) = node_labels.get(&id) {
545            for &label_id in label_ids {
546                if let Some(label) = id_to_label.get(label_id as usize) {
547                    node.labels.push(label.clone());
548                }
549            }
550        }
551
552        // Get properties
553        node.properties = self.node_properties.get_all(id).into_iter().collect();
554
555        Some(node)
556    }
557
558    /// Gets a node by ID at a specific epoch.
559    /// (Tiered storage version: reads from arena via VersionIndex)
560    #[must_use]
561    #[cfg(feature = "tiered-storage")]
562    pub fn get_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> Option<Node> {
563        let versions = self.node_versions.read();
564        let index = versions.get(&id)?;
565        let version_ref = index.visible_at(epoch)?;
566
567        // Read the record from arena
568        let record = self.read_node_record(&version_ref)?;
569
570        if record.is_deleted() {
571            return None;
572        }
573
574        let mut node = Node::new(id);
575
576        // Get labels from node_labels map
577        let id_to_label = self.id_to_label.read();
578        let node_labels = self.node_labels.read();
579        if let Some(label_ids) = node_labels.get(&id) {
580            for &label_id in label_ids {
581                if let Some(label) = id_to_label.get(label_id as usize) {
582                    node.labels.push(label.clone());
583                }
584            }
585        }
586
587        // Get properties
588        node.properties = self.node_properties.get_all(id).into_iter().collect();
589
590        Some(node)
591    }
592
593    /// Gets a node visible to a specific transaction.
594    #[must_use]
595    #[cfg(not(feature = "tiered-storage"))]
596    pub fn get_node_versioned(&self, id: NodeId, epoch: EpochId, tx_id: TxId) -> Option<Node> {
597        let nodes = self.nodes.read();
598        let chain = nodes.get(&id)?;
599        let record = chain.visible_to(epoch, tx_id)?;
600
601        if record.is_deleted() {
602            return None;
603        }
604
605        let mut node = Node::new(id);
606
607        // Get labels from node_labels map
608        let id_to_label = self.id_to_label.read();
609        let node_labels = self.node_labels.read();
610        if let Some(label_ids) = node_labels.get(&id) {
611            for &label_id in label_ids {
612                if let Some(label) = id_to_label.get(label_id as usize) {
613                    node.labels.push(label.clone());
614                }
615            }
616        }
617
618        // Get properties
619        node.properties = self.node_properties.get_all(id).into_iter().collect();
620
621        Some(node)
622    }
623
624    /// Gets a node visible to a specific transaction.
625    /// (Tiered storage version: reads from arena via VersionIndex)
626    #[must_use]
627    #[cfg(feature = "tiered-storage")]
628    pub fn get_node_versioned(&self, id: NodeId, epoch: EpochId, tx_id: TxId) -> Option<Node> {
629        let versions = self.node_versions.read();
630        let index = versions.get(&id)?;
631        let version_ref = index.visible_to(epoch, tx_id)?;
632
633        // Read the record from arena
634        let record = self.read_node_record(&version_ref)?;
635
636        if record.is_deleted() {
637            return None;
638        }
639
640        let mut node = Node::new(id);
641
642        // Get labels from node_labels map
643        let id_to_label = self.id_to_label.read();
644        let node_labels = self.node_labels.read();
645        if let Some(label_ids) = node_labels.get(&id) {
646            for &label_id in label_ids {
647                if let Some(label) = id_to_label.get(label_id as usize) {
648                    node.labels.push(label.clone());
649                }
650            }
651        }
652
653        // Get properties
654        node.properties = self.node_properties.get_all(id).into_iter().collect();
655
656        Some(node)
657    }
658
659    /// Reads a NodeRecord from arena (hot) or epoch store (cold) using a VersionRef.
660    #[cfg(feature = "tiered-storage")]
661    #[allow(unsafe_code)]
662    fn read_node_record(&self, version_ref: &VersionRef) -> Option<NodeRecord> {
663        match version_ref {
664            VersionRef::Hot(hot_ref) => {
665                let arena = self.arena_allocator.arena(hot_ref.epoch);
666                // SAFETY: The offset was returned by alloc_value_with_offset for a NodeRecord
667                let record: &NodeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
668                Some(*record)
669            }
670            VersionRef::Cold(cold_ref) => {
671                // Read from compressed epoch store
672                self.epoch_store
673                    .get_node(cold_ref.epoch, cold_ref.block_offset, cold_ref.length)
674            }
675        }
676    }
677
678    /// Deletes a node and all its edges (using latest epoch).
679    pub fn delete_node(&self, id: NodeId) -> bool {
680        self.needs_stats_recompute.store(true, Ordering::Relaxed);
681        self.delete_node_at_epoch(id, self.current_epoch())
682    }
683
684    /// Deletes a node at a specific epoch.
685    #[cfg(not(feature = "tiered-storage"))]
686    pub fn delete_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> bool {
687        let mut nodes = self.nodes.write();
688        if let Some(chain) = nodes.get_mut(&id) {
689            // Check if visible at this epoch (not already deleted)
690            if let Some(record) = chain.visible_at(epoch) {
691                if record.is_deleted() {
692                    return false;
693                }
694            } else {
695                // Not visible at this epoch (already deleted or doesn't exist)
696                return false;
697            }
698
699            // Mark the version chain as deleted at this epoch
700            chain.mark_deleted(epoch);
701
702            // Remove from label index using node_labels map
703            let mut index = self.label_index.write();
704            let mut node_labels = self.node_labels.write();
705            if let Some(label_ids) = node_labels.remove(&id) {
706                for label_id in label_ids {
707                    if let Some(set) = index.get_mut(label_id as usize) {
708                        set.remove(&id);
709                    }
710                }
711            }
712
713            // Remove from text indexes before removing properties
714            #[cfg(feature = "text-index")]
715            self.remove_from_all_text_indexes(id);
716
717            // Remove properties
718            drop(nodes); // Release lock before removing properties
719            drop(index);
720            drop(node_labels);
721            self.node_properties.remove_all(id);
722
723            // Note: Caller should use delete_node_edges() first if detach is needed
724
725            true
726        } else {
727            false
728        }
729    }
730
731    /// Deletes a node at a specific epoch.
732    /// (Tiered storage version)
733    #[cfg(feature = "tiered-storage")]
734    pub fn delete_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> bool {
735        let mut versions = self.node_versions.write();
736        if let Some(index) = versions.get_mut(&id) {
737            // Check if visible at this epoch
738            if let Some(version_ref) = index.visible_at(epoch) {
739                if let Some(record) = self.read_node_record(&version_ref) {
740                    if record.is_deleted() {
741                        return false;
742                    }
743                } else {
744                    return false;
745                }
746            } else {
747                return false;
748            }
749
750            // Mark as deleted in version index
751            index.mark_deleted(epoch);
752
753            // Remove from label index using node_labels map
754            let mut label_index = self.label_index.write();
755            let mut node_labels = self.node_labels.write();
756            if let Some(label_ids) = node_labels.remove(&id) {
757                for label_id in label_ids {
758                    if let Some(set) = label_index.get_mut(label_id as usize) {
759                        set.remove(&id);
760                    }
761                }
762            }
763
764            // Remove from text indexes before removing properties
765            #[cfg(feature = "text-index")]
766            self.remove_from_all_text_indexes(id);
767
768            // Remove properties
769            drop(versions);
770            drop(label_index);
771            drop(node_labels);
772            self.node_properties.remove_all(id);
773
774            true
775        } else {
776            false
777        }
778    }
779
780    /// Deletes all edges connected to a node (implements DETACH DELETE).
781    ///
782    /// Call this before `delete_node()` if you want to remove a node that
783    /// has edges. Grafeo doesn't auto-delete edges - you have to be explicit.
784    #[cfg(not(feature = "tiered-storage"))]
785    pub fn delete_node_edges(&self, node_id: NodeId) {
786        // Get outgoing edges
787        let outgoing: Vec<EdgeId> = self
788            .forward_adj
789            .edges_from(node_id)
790            .into_iter()
791            .map(|(_, edge_id)| edge_id)
792            .collect();
793
794        // Get incoming edges
795        let incoming: Vec<EdgeId> = if let Some(ref backward) = self.backward_adj {
796            backward
797                .edges_from(node_id)
798                .into_iter()
799                .map(|(_, edge_id)| edge_id)
800                .collect()
801        } else {
802            // No backward adjacency - scan all edges
803            let epoch = self.current_epoch();
804            self.edges
805                .read()
806                .iter()
807                .filter_map(|(id, chain)| {
808                    chain.visible_at(epoch).and_then(|r| {
809                        if !r.is_deleted() && r.dst == node_id {
810                            Some(*id)
811                        } else {
812                            None
813                        }
814                    })
815                })
816                .collect()
817        };
818
819        // Delete all edges
820        for edge_id in outgoing.into_iter().chain(incoming) {
821            self.delete_edge(edge_id);
822        }
823    }
824
825    /// Deletes all edges connected to a node (implements DETACH DELETE).
826    /// (Tiered storage version)
827    #[cfg(feature = "tiered-storage")]
828    pub fn delete_node_edges(&self, node_id: NodeId) {
829        // Get outgoing edges
830        let outgoing: Vec<EdgeId> = self
831            .forward_adj
832            .edges_from(node_id)
833            .into_iter()
834            .map(|(_, edge_id)| edge_id)
835            .collect();
836
837        // Get incoming edges
838        let incoming: Vec<EdgeId> = if let Some(ref backward) = self.backward_adj {
839            backward
840                .edges_from(node_id)
841                .into_iter()
842                .map(|(_, edge_id)| edge_id)
843                .collect()
844        } else {
845            // No backward adjacency - scan all edges
846            let epoch = self.current_epoch();
847            let versions = self.edge_versions.read();
848            versions
849                .iter()
850                .filter_map(|(id, index)| {
851                    index.visible_at(epoch).and_then(|vref| {
852                        self.read_edge_record(&vref).and_then(|r| {
853                            if !r.is_deleted() && r.dst == node_id {
854                                Some(*id)
855                            } else {
856                                None
857                            }
858                        })
859                    })
860                })
861                .collect()
862        };
863
864        // Delete all edges
865        for edge_id in outgoing.into_iter().chain(incoming) {
866            self.delete_edge(edge_id);
867        }
868    }
869
870    /// Sets a property on a node.
871    #[cfg(not(feature = "tiered-storage"))]
872    pub fn set_node_property(&self, id: NodeId, key: &str, value: Value) {
873        let prop_key: PropertyKey = key.into();
874
875        // Update property index before setting the property (needs to read old value)
876        self.update_property_index_on_set(id, &prop_key, &value);
877
878        // Sync text index if applicable
879        #[cfg(feature = "text-index")]
880        self.update_text_index_on_set(id, key, &value);
881
882        self.node_properties.set(id, prop_key, value);
883
884        // Update props_count in record
885        let count = self.node_properties.get_all(id).len() as u16;
886        if let Some(chain) = self.nodes.write().get_mut(&id)
887            && let Some(record) = chain.latest_mut()
888        {
889            record.props_count = count;
890        }
891    }
892
893    /// Sets a property on a node.
894    /// (Tiered storage version: properties stored separately, record is immutable)
895    #[cfg(feature = "tiered-storage")]
896    pub fn set_node_property(&self, id: NodeId, key: &str, value: Value) {
897        let prop_key: PropertyKey = key.into();
898
899        // Update property index before setting the property (needs to read old value)
900        self.update_property_index_on_set(id, &prop_key, &value);
901
902        // Sync text index if applicable
903        #[cfg(feature = "text-index")]
904        self.update_text_index_on_set(id, key, &value);
905
906        self.node_properties.set(id, prop_key, value);
907        // Note: props_count in record is not updated for tiered storage.
908        // The record is immutable once allocated in the arena.
909        // Property count can be derived from PropertyStorage if needed.
910    }
911
912    /// Sets a property on an edge.
913    pub fn set_edge_property(&self, id: EdgeId, key: &str, value: Value) {
914        self.edge_properties.set(id, key.into(), value);
915    }
916
917    /// Removes a property from a node.
918    ///
919    /// Returns the previous value if it existed, or None if the property didn't exist.
920    #[cfg(not(feature = "tiered-storage"))]
921    pub fn remove_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
922        let prop_key: PropertyKey = key.into();
923
924        // Update property index before removing (needs to read old value)
925        self.update_property_index_on_remove(id, &prop_key);
926
927        // Sync text index if applicable
928        #[cfg(feature = "text-index")]
929        self.update_text_index_on_remove(id, key);
930
931        let result = self.node_properties.remove(id, &prop_key);
932
933        // Update props_count in record
934        let count = self.node_properties.get_all(id).len() as u16;
935        if let Some(chain) = self.nodes.write().get_mut(&id)
936            && let Some(record) = chain.latest_mut()
937        {
938            record.props_count = count;
939        }
940
941        result
942    }
943
944    /// Removes a property from a node.
945    /// (Tiered storage version)
946    #[cfg(feature = "tiered-storage")]
947    pub fn remove_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
948        let prop_key: PropertyKey = key.into();
949
950        // Update property index before removing (needs to read old value)
951        self.update_property_index_on_remove(id, &prop_key);
952
953        // Sync text index if applicable
954        #[cfg(feature = "text-index")]
955        self.update_text_index_on_remove(id, key);
956
957        self.node_properties.remove(id, &prop_key)
958        // Note: props_count in record is not updated for tiered storage.
959    }
960
961    /// Removes a property from an edge.
962    ///
963    /// Returns the previous value if it existed, or None if the property didn't exist.
964    pub fn remove_edge_property(&self, id: EdgeId, key: &str) -> Option<Value> {
965        self.edge_properties.remove(id, &key.into())
966    }
967
968    /// Gets a single property from a node without loading all properties.
969    ///
970    /// This is O(1) vs O(properties) for `get_node().get_property()`.
971    /// Use this for filter predicates where you only need one property value.
972    ///
973    /// # Example
974    ///
975    /// ```ignore
976    /// // Fast: Direct single-property lookup
977    /// let age = store.get_node_property(node_id, "age");
978    ///
979    /// // Slow: Loads all properties, then extracts one
980    /// let age = store.get_node(node_id).and_then(|n| n.get_property("age").cloned());
981    /// ```
982    #[must_use]
983    pub fn get_node_property(&self, id: NodeId, key: &PropertyKey) -> Option<Value> {
984        self.node_properties.get(id, key)
985    }
986
987    /// Gets a single property from an edge without loading all properties.
988    ///
989    /// This is O(1) vs O(properties) for `get_edge().get_property()`.
990    #[must_use]
991    pub fn get_edge_property(&self, id: EdgeId, key: &PropertyKey) -> Option<Value> {
992        self.edge_properties.get(id, key)
993    }
994
995    // === Batch Property Operations ===
996
997    /// Gets a property for multiple nodes in a single batch operation.
998    ///
999    /// More efficient than calling [`Self::get_node_property`] in a loop because it
1000    /// reduces lock overhead and enables better cache utilization.
1001    ///
1002    /// # Example
1003    ///
1004    /// ```
1005    /// use grafeo_core::graph::lpg::LpgStore;
1006    /// use grafeo_common::types::{NodeId, PropertyKey, Value};
1007    ///
1008    /// let store = LpgStore::new();
1009    /// let n1 = store.create_node(&["Person"]);
1010    /// let n2 = store.create_node(&["Person"]);
1011    /// store.set_node_property(n1, "age", Value::from(25i64));
1012    /// store.set_node_property(n2, "age", Value::from(30i64));
1013    ///
1014    /// let ages = store.get_node_property_batch(&[n1, n2], &PropertyKey::new("age"));
1015    /// assert_eq!(ages, vec![Some(Value::from(25i64)), Some(Value::from(30i64))]);
1016    /// ```
1017    #[must_use]
1018    pub fn get_node_property_batch(&self, ids: &[NodeId], key: &PropertyKey) -> Vec<Option<Value>> {
1019        self.node_properties.get_batch(ids, key)
1020    }
1021
1022    /// Gets all properties for multiple nodes in a single batch operation.
1023    ///
1024    /// Returns a vector of property maps, one per node ID (empty map if no properties).
1025    /// More efficient than calling [`Self::get_node`] in a loop.
1026    #[must_use]
1027    pub fn get_nodes_properties_batch(&self, ids: &[NodeId]) -> Vec<FxHashMap<PropertyKey, Value>> {
1028        self.node_properties.get_all_batch(ids)
1029    }
1030
1031    /// Gets selected properties for multiple nodes (projection pushdown).
1032    ///
1033    /// This is more efficient than [`Self::get_nodes_properties_batch`] when you only
1034    /// need a subset of properties. It only iterates the requested columns instead of
1035    /// all columns.
1036    ///
1037    /// **Use this for**: Queries with explicit projections like `RETURN n.name, n.age`
1038    /// instead of `RETURN n` (which requires all properties).
1039    ///
1040    /// # Example
1041    ///
1042    /// ```
1043    /// use grafeo_core::graph::lpg::LpgStore;
1044    /// use grafeo_common::types::{PropertyKey, Value};
1045    ///
1046    /// let store = LpgStore::new();
1047    /// let n1 = store.create_node(&["Person"]);
1048    /// store.set_node_property(n1, "name", Value::from("Alice"));
1049    /// store.set_node_property(n1, "age", Value::from(30i64));
1050    /// store.set_node_property(n1, "email", Value::from("alice@example.com"));
1051    ///
1052    /// // Only fetch name and age (faster than get_nodes_properties_batch)
1053    /// let keys = vec![PropertyKey::new("name"), PropertyKey::new("age")];
1054    /// let props = store.get_nodes_properties_selective_batch(&[n1], &keys);
1055    ///
1056    /// assert_eq!(props[0].len(), 2); // Only name and age, not email
1057    /// ```
1058    #[must_use]
1059    pub fn get_nodes_properties_selective_batch(
1060        &self,
1061        ids: &[NodeId],
1062        keys: &[PropertyKey],
1063    ) -> Vec<FxHashMap<PropertyKey, Value>> {
1064        self.node_properties.get_selective_batch(ids, keys)
1065    }
1066
1067    /// Gets selected properties for multiple edges (projection pushdown).
1068    ///
1069    /// Edge-property version of [`Self::get_nodes_properties_selective_batch`].
1070    #[must_use]
1071    pub fn get_edges_properties_selective_batch(
1072        &self,
1073        ids: &[EdgeId],
1074        keys: &[PropertyKey],
1075    ) -> Vec<FxHashMap<PropertyKey, Value>> {
1076        self.edge_properties.get_selective_batch(ids, keys)
1077    }
1078
1079    /// Finds nodes where a property value is in a range.
1080    ///
1081    /// This is useful for queries like `n.age > 30` or `n.price BETWEEN 10 AND 100`.
1082    /// Uses zone maps to skip scanning when the range definitely doesn't match.
1083    ///
1084    /// # Arguments
1085    ///
1086    /// * `property` - The property to check
1087    /// * `min` - Optional lower bound (None for unbounded)
1088    /// * `max` - Optional upper bound (None for unbounded)
1089    /// * `min_inclusive` - Whether lower bound is inclusive (>= vs >)
1090    /// * `max_inclusive` - Whether upper bound is inclusive (<= vs <)
1091    ///
1092    /// # Example
1093    ///
1094    /// ```
1095    /// use grafeo_core::graph::lpg::LpgStore;
1096    /// use grafeo_common::types::Value;
1097    ///
1098    /// let store = LpgStore::new();
1099    /// let n1 = store.create_node(&["Person"]);
1100    /// let n2 = store.create_node(&["Person"]);
1101    /// store.set_node_property(n1, "age", Value::from(25i64));
1102    /// store.set_node_property(n2, "age", Value::from(35i64));
1103    ///
1104    /// // Find nodes where age > 30
1105    /// let result = store.find_nodes_in_range(
1106    ///     "age",
1107    ///     Some(&Value::from(30i64)),
1108    ///     None,
1109    ///     false, // exclusive lower bound
1110    ///     true,  // inclusive upper bound (doesn't matter since None)
1111    /// );
1112    /// assert_eq!(result.len(), 1); // Only n2 matches
1113    /// ```
1114    #[must_use]
1115    pub fn find_nodes_in_range(
1116        &self,
1117        property: &str,
1118        min: Option<&Value>,
1119        max: Option<&Value>,
1120        min_inclusive: bool,
1121        max_inclusive: bool,
1122    ) -> Vec<NodeId> {
1123        let key = PropertyKey::new(property);
1124
1125        // Check zone map first - if no values could match, return empty
1126        if !self
1127            .node_properties
1128            .might_match_range(&key, min, max, min_inclusive, max_inclusive)
1129        {
1130            return Vec::new();
1131        }
1132
1133        // Scan all nodes and filter by range
1134        self.node_ids()
1135            .into_iter()
1136            .filter(|&node_id| {
1137                self.node_properties
1138                    .get(node_id, &key)
1139                    .is_some_and(|v| value_in_range(&v, min, max, min_inclusive, max_inclusive))
1140            })
1141            .collect()
1142    }
1143
1144    /// Finds nodes matching multiple property equality conditions.
1145    ///
1146    /// This is more efficient than intersecting multiple single-property lookups
1147    /// because it can use indexes when available and short-circuits on the first
1148    /// miss.
1149    ///
1150    /// # Example
1151    ///
1152    /// ```
1153    /// use grafeo_core::graph::lpg::LpgStore;
1154    /// use grafeo_common::types::Value;
1155    ///
1156    /// let store = LpgStore::new();
1157    /// let alice = store.create_node(&["Person"]);
1158    /// store.set_node_property(alice, "name", Value::from("Alice"));
1159    /// store.set_node_property(alice, "city", Value::from("NYC"));
1160    ///
1161    /// // Find nodes where name = "Alice" AND city = "NYC"
1162    /// let matches = store.find_nodes_by_properties(&[
1163    ///     ("name", Value::from("Alice")),
1164    ///     ("city", Value::from("NYC")),
1165    /// ]);
1166    /// assert!(matches.contains(&alice));
1167    /// ```
1168    #[must_use]
1169    pub fn find_nodes_by_properties(&self, conditions: &[(&str, Value)]) -> Vec<NodeId> {
1170        if conditions.is_empty() {
1171            return self.node_ids();
1172        }
1173
1174        // Find the most selective condition (smallest result set) to start
1175        // If any condition has an index, use that first
1176        let mut best_start: Option<(usize, Vec<NodeId>)> = None;
1177        let indexes = self.property_indexes.read();
1178
1179        for (i, (prop, value)) in conditions.iter().enumerate() {
1180            let key = PropertyKey::new(*prop);
1181            let hv = HashableValue::new(value.clone());
1182
1183            if let Some(index) = indexes.get(&key) {
1184                let matches: Vec<NodeId> = index
1185                    .get(&hv)
1186                    .map(|nodes| nodes.iter().copied().collect())
1187                    .unwrap_or_default();
1188
1189                // Short-circuit if any indexed condition has no matches
1190                if matches.is_empty() {
1191                    return Vec::new();
1192                }
1193
1194                // Use smallest indexed result as starting point
1195                if best_start
1196                    .as_ref()
1197                    .is_none_or(|(_, best)| matches.len() < best.len())
1198                {
1199                    best_start = Some((i, matches));
1200                }
1201            }
1202        }
1203        drop(indexes);
1204
1205        // Start from best indexed result or fall back to full node scan
1206        let (start_idx, mut candidates) = best_start.unwrap_or_else(|| {
1207            // No indexes available, start with first condition via full scan
1208            let (prop, value) = &conditions[0];
1209            (0, self.find_nodes_by_property(prop, value))
1210        });
1211
1212        // Filter candidates through remaining conditions
1213        for (i, (prop, value)) in conditions.iter().enumerate() {
1214            if i == start_idx {
1215                continue;
1216            }
1217
1218            let key = PropertyKey::new(*prop);
1219            candidates.retain(|&node_id| {
1220                self.node_properties
1221                    .get(node_id, &key)
1222                    .is_some_and(|v| v == *value)
1223            });
1224
1225            // Short-circuit if no candidates remain
1226            if candidates.is_empty() {
1227                return Vec::new();
1228            }
1229        }
1230
1231        candidates
1232    }
1233
1234    // === Property Index Operations ===
1235
1236    /// Creates an index on a node property for O(1) lookups by value.
1237    ///
1238    /// After creating an index, calls to [`Self::find_nodes_by_property`] will be
1239    /// O(1) instead of O(n) for this property. The index is automatically
1240    /// maintained when properties are set or removed.
1241    ///
1242    /// # Example
1243    ///
1244    /// ```
1245    /// use grafeo_core::graph::lpg::LpgStore;
1246    /// use grafeo_common::types::Value;
1247    ///
1248    /// let store = LpgStore::new();
1249    ///
1250    /// // Create nodes with an 'id' property
1251    /// let alice = store.create_node(&["Person"]);
1252    /// store.set_node_property(alice, "id", Value::from("alice_123"));
1253    ///
1254    /// // Create an index on the 'id' property
1255    /// store.create_property_index("id");
1256    ///
1257    /// // Now lookups by 'id' are O(1)
1258    /// let found = store.find_nodes_by_property("id", &Value::from("alice_123"));
1259    /// assert!(found.contains(&alice));
1260    /// ```
1261    pub fn create_property_index(&self, property: &str) {
1262        let key = PropertyKey::new(property);
1263
1264        let mut indexes = self.property_indexes.write();
1265        if indexes.contains_key(&key) {
1266            return; // Already indexed
1267        }
1268
1269        // Create the index and populate it with existing data
1270        let index: DashMap<HashableValue, FxHashSet<NodeId>> = DashMap::new();
1271
1272        // Scan all nodes to build the index
1273        for node_id in self.node_ids() {
1274            if let Some(value) = self.node_properties.get(node_id, &key) {
1275                let hv = HashableValue::new(value);
1276                index.entry(hv).or_default().insert(node_id);
1277            }
1278        }
1279
1280        indexes.insert(key, index);
1281    }
1282
1283    /// Drops an index on a node property.
1284    ///
1285    /// Returns `true` if the index existed and was removed.
1286    pub fn drop_property_index(&self, property: &str) -> bool {
1287        let key = PropertyKey::new(property);
1288        self.property_indexes.write().remove(&key).is_some()
1289    }
1290
1291    /// Returns `true` if the property has an index.
1292    #[must_use]
1293    pub fn has_property_index(&self, property: &str) -> bool {
1294        let key = PropertyKey::new(property);
1295        self.property_indexes.read().contains_key(&key)
1296    }
1297
1298    /// Stores a vector index for a label+property pair.
1299    #[cfg(feature = "vector-index")]
1300    pub fn add_vector_index(&self, label: &str, property: &str, index: Arc<HnswIndex>) {
1301        let key = format!("{label}:{property}");
1302        self.vector_indexes.write().insert(key, index);
1303    }
1304
1305    /// Retrieves the vector index for a label+property pair.
1306    #[cfg(feature = "vector-index")]
1307    #[must_use]
1308    pub fn get_vector_index(&self, label: &str, property: &str) -> Option<Arc<HnswIndex>> {
1309        let key = format!("{label}:{property}");
1310        self.vector_indexes.read().get(&key).cloned()
1311    }
1312
1313    /// Removes a vector index for a label+property pair.
1314    ///
1315    /// Returns `true` if the index existed and was removed.
1316    #[cfg(feature = "vector-index")]
1317    pub fn remove_vector_index(&self, label: &str, property: &str) -> bool {
1318        let key = format!("{label}:{property}");
1319        self.vector_indexes.write().remove(&key).is_some()
1320    }
1321
1322    /// Returns all vector index entries as `(key, index)` pairs.
1323    ///
1324    /// Keys are in `"label:property"` format.
1325    #[cfg(feature = "vector-index")]
1326    #[must_use]
1327    pub fn vector_index_entries(&self) -> Vec<(String, Arc<HnswIndex>)> {
1328        self.vector_indexes
1329            .read()
1330            .iter()
1331            .map(|(k, v)| (k.clone(), v.clone()))
1332            .collect()
1333    }
1334
1335    /// Stores a text index for a label+property pair.
1336    #[cfg(feature = "text-index")]
1337    pub fn add_text_index(
1338        &self,
1339        label: &str,
1340        property: &str,
1341        index: Arc<RwLock<crate::index::text::InvertedIndex>>,
1342    ) {
1343        let key = format!("{label}:{property}");
1344        self.text_indexes.write().insert(key, index);
1345    }
1346
1347    /// Retrieves the text index for a label+property pair.
1348    #[cfg(feature = "text-index")]
1349    #[must_use]
1350    pub fn get_text_index(
1351        &self,
1352        label: &str,
1353        property: &str,
1354    ) -> Option<Arc<RwLock<crate::index::text::InvertedIndex>>> {
1355        let key = format!("{label}:{property}");
1356        self.text_indexes.read().get(&key).cloned()
1357    }
1358
1359    /// Removes a text index for a label+property pair.
1360    ///
1361    /// Returns `true` if the index existed and was removed.
1362    #[cfg(feature = "text-index")]
1363    pub fn remove_text_index(&self, label: &str, property: &str) -> bool {
1364        let key = format!("{label}:{property}");
1365        self.text_indexes.write().remove(&key).is_some()
1366    }
1367
1368    /// Returns all text index entries as `(key, index)` pairs.
1369    ///
1370    /// The key format is `"label:property"`.
1371    #[cfg(feature = "text-index")]
1372    pub fn text_index_entries(
1373        &self,
1374    ) -> Vec<(String, Arc<RwLock<crate::index::text::InvertedIndex>>)> {
1375        self.text_indexes
1376            .read()
1377            .iter()
1378            .map(|(k, v)| (k.clone(), v.clone()))
1379            .collect()
1380    }
1381
1382    /// Updates text indexes when a node property is set.
1383    ///
1384    /// If the node has a label with a text index on this property key,
1385    /// the index is updated with the new value (if it's a string).
1386    #[cfg(feature = "text-index")]
1387    fn update_text_index_on_set(&self, id: NodeId, key: &str, value: &Value) {
1388        let text_indexes = self.text_indexes.read();
1389        if text_indexes.is_empty() {
1390            return;
1391        }
1392        let id_to_label = self.id_to_label.read();
1393        let node_labels = self.node_labels.read();
1394        if let Some(label_ids) = node_labels.get(&id) {
1395            for &label_id in label_ids {
1396                if let Some(label_name) = id_to_label.get(label_id as usize) {
1397                    let index_key = format!("{label_name}:{key}");
1398                    if let Some(index) = text_indexes.get(&index_key) {
1399                        let mut idx = index.write();
1400                        // Remove old entry first, then insert new if it's a string
1401                        idx.remove(id);
1402                        if let Value::String(text) = value {
1403                            idx.insert(id, text);
1404                        }
1405                    }
1406                }
1407            }
1408        }
1409    }
1410
1411    /// Updates text indexes when a node property is removed.
1412    #[cfg(feature = "text-index")]
1413    fn update_text_index_on_remove(&self, id: NodeId, key: &str) {
1414        let text_indexes = self.text_indexes.read();
1415        if text_indexes.is_empty() {
1416            return;
1417        }
1418        let id_to_label = self.id_to_label.read();
1419        let node_labels = self.node_labels.read();
1420        if let Some(label_ids) = node_labels.get(&id) {
1421            for &label_id in label_ids {
1422                if let Some(label_name) = id_to_label.get(label_id as usize) {
1423                    let index_key = format!("{label_name}:{key}");
1424                    if let Some(index) = text_indexes.get(&index_key) {
1425                        index.write().remove(id);
1426                    }
1427                }
1428            }
1429        }
1430    }
1431
1432    /// Removes a node from all text indexes.
1433    #[cfg(feature = "text-index")]
1434    fn remove_from_all_text_indexes(&self, id: NodeId) {
1435        let text_indexes = self.text_indexes.read();
1436        if text_indexes.is_empty() {
1437            return;
1438        }
1439        for (_, index) in text_indexes.iter() {
1440            index.write().remove(id);
1441        }
1442    }
1443
1444    /// Finds all nodes that have a specific property value.
1445    ///
1446    /// If the property is indexed, this is O(1). Otherwise, it scans all nodes
1447    /// which is O(n). Use [`Self::create_property_index`] for frequently queried properties.
1448    ///
1449    /// # Example
1450    ///
1451    /// ```
1452    /// use grafeo_core::graph::lpg::LpgStore;
1453    /// use grafeo_common::types::Value;
1454    ///
1455    /// let store = LpgStore::new();
1456    /// store.create_property_index("city"); // Optional but makes lookups fast
1457    ///
1458    /// let alice = store.create_node(&["Person"]);
1459    /// let bob = store.create_node(&["Person"]);
1460    /// store.set_node_property(alice, "city", Value::from("NYC"));
1461    /// store.set_node_property(bob, "city", Value::from("NYC"));
1462    ///
1463    /// let nyc_people = store.find_nodes_by_property("city", &Value::from("NYC"));
1464    /// assert_eq!(nyc_people.len(), 2);
1465    /// ```
1466    #[must_use]
1467    pub fn find_nodes_by_property(&self, property: &str, value: &Value) -> Vec<NodeId> {
1468        let key = PropertyKey::new(property);
1469        let hv = HashableValue::new(value.clone());
1470
1471        // Try indexed lookup first
1472        let indexes = self.property_indexes.read();
1473        if let Some(index) = indexes.get(&key) {
1474            if let Some(nodes) = index.get(&hv) {
1475                return nodes.iter().copied().collect();
1476            }
1477            return Vec::new();
1478        }
1479        drop(indexes);
1480
1481        // Fall back to full scan
1482        self.node_ids()
1483            .into_iter()
1484            .filter(|&node_id| {
1485                self.node_properties
1486                    .get(node_id, &key)
1487                    .is_some_and(|v| v == *value)
1488            })
1489            .collect()
1490    }
1491
1492    /// Finds nodes whose property matches an operator filter.
1493    ///
1494    /// The `filter_value` is either a scalar (equality) or a `Value::Map` with
1495    /// `$`-prefixed operator keys like `$gt`, `$lt`, `$gte`, `$lte`, `$in`,
1496    /// `$nin`, `$ne`, `$contains`.
1497    pub fn find_nodes_matching_filter(&self, property: &str, filter_value: &Value) -> Vec<NodeId> {
1498        let key = PropertyKey::new(property);
1499        self.node_ids()
1500            .into_iter()
1501            .filter(|&node_id| {
1502                self.node_properties
1503                    .get(node_id, &key)
1504                    .is_some_and(|v| Self::matches_filter(&v, filter_value))
1505            })
1506            .collect()
1507    }
1508
1509    /// Checks if a node property value matches a filter value.
1510    ///
1511    /// - Scalar filter: equality check
1512    /// - Map filter with `$`-prefixed keys: operator evaluation
1513    fn matches_filter(node_value: &Value, filter_value: &Value) -> bool {
1514        match filter_value {
1515            Value::Map(ops) if ops.keys().any(|k| k.as_str().starts_with('$')) => {
1516                ops.iter().all(|(op_key, op_val)| {
1517                    match op_key.as_str() {
1518                        "$gt" => {
1519                            Self::compare_values(node_value, op_val)
1520                                == Some(std::cmp::Ordering::Greater)
1521                        }
1522                        "$gte" => matches!(
1523                            Self::compare_values(node_value, op_val),
1524                            Some(std::cmp::Ordering::Greater | std::cmp::Ordering::Equal)
1525                        ),
1526                        "$lt" => {
1527                            Self::compare_values(node_value, op_val)
1528                                == Some(std::cmp::Ordering::Less)
1529                        }
1530                        "$lte" => matches!(
1531                            Self::compare_values(node_value, op_val),
1532                            Some(std::cmp::Ordering::Less | std::cmp::Ordering::Equal)
1533                        ),
1534                        "$ne" => node_value != op_val,
1535                        "$in" => match op_val {
1536                            Value::List(items) => items.iter().any(|v| v == node_value),
1537                            _ => false,
1538                        },
1539                        "$nin" => match op_val {
1540                            Value::List(items) => !items.iter().any(|v| v == node_value),
1541                            _ => true,
1542                        },
1543                        "$contains" => match (node_value, op_val) {
1544                            (Value::String(a), Value::String(b)) => a.contains(b.as_str()),
1545                            _ => false,
1546                        },
1547                        _ => false, // Unknown operator — no match
1548                    }
1549                })
1550            }
1551            _ => node_value == filter_value, // Equality (backward compatible)
1552        }
1553    }
1554
1555    /// Compares two values for ordering (cross-type numeric comparison supported).
1556    fn compare_values(a: &Value, b: &Value) -> Option<std::cmp::Ordering> {
1557        match (a, b) {
1558            (Value::Int64(a), Value::Int64(b)) => Some(a.cmp(b)),
1559            (Value::Float64(a), Value::Float64(b)) => a.partial_cmp(b),
1560            (Value::String(a), Value::String(b)) => Some(a.cmp(b)),
1561            (Value::Bool(a), Value::Bool(b)) => Some(a.cmp(b)),
1562            (Value::Int64(a), Value::Float64(b)) => (*a as f64).partial_cmp(b),
1563            (Value::Float64(a), Value::Int64(b)) => a.partial_cmp(&(*b as f64)),
1564            _ => None,
1565        }
1566    }
1567
1568    /// Updates property indexes when a property is set.
1569    fn update_property_index_on_set(&self, node_id: NodeId, key: &PropertyKey, new_value: &Value) {
1570        let indexes = self.property_indexes.read();
1571        if let Some(index) = indexes.get(key) {
1572            // Get old value to remove from index
1573            if let Some(old_value) = self.node_properties.get(node_id, key) {
1574                let old_hv = HashableValue::new(old_value);
1575                if let Some(mut nodes) = index.get_mut(&old_hv) {
1576                    nodes.remove(&node_id);
1577                    if nodes.is_empty() {
1578                        drop(nodes);
1579                        index.remove(&old_hv);
1580                    }
1581                }
1582            }
1583
1584            // Add new value to index
1585            let new_hv = HashableValue::new(new_value.clone());
1586            index
1587                .entry(new_hv)
1588                .or_insert_with(FxHashSet::default)
1589                .insert(node_id);
1590        }
1591    }
1592
1593    /// Updates property indexes when a property is removed.
1594    fn update_property_index_on_remove(&self, node_id: NodeId, key: &PropertyKey) {
1595        let indexes = self.property_indexes.read();
1596        if let Some(index) = indexes.get(key) {
1597            // Get old value to remove from index
1598            if let Some(old_value) = self.node_properties.get(node_id, key) {
1599                let old_hv = HashableValue::new(old_value);
1600                if let Some(mut nodes) = index.get_mut(&old_hv) {
1601                    nodes.remove(&node_id);
1602                    if nodes.is_empty() {
1603                        drop(nodes);
1604                        index.remove(&old_hv);
1605                    }
1606                }
1607            }
1608        }
1609    }
1610
1611    /// Adds a label to a node.
1612    ///
1613    /// Returns true if the label was added, false if the node doesn't exist
1614    /// or already has the label.
1615    #[cfg(not(feature = "tiered-storage"))]
1616    pub fn add_label(&self, node_id: NodeId, label: &str) -> bool {
1617        let epoch = self.current_epoch();
1618
1619        // Check if node exists
1620        let nodes = self.nodes.read();
1621        if let Some(chain) = nodes.get(&node_id) {
1622            if chain.visible_at(epoch).map_or(true, |r| r.is_deleted()) {
1623                return false;
1624            }
1625        } else {
1626            return false;
1627        }
1628        drop(nodes);
1629
1630        // Get or create label ID
1631        let label_id = self.get_or_create_label_id(label);
1632
1633        // Add to node_labels map
1634        let mut node_labels = self.node_labels.write();
1635        let label_set = node_labels.entry(node_id).or_default();
1636
1637        if label_set.contains(&label_id) {
1638            return false; // Already has this label
1639        }
1640
1641        label_set.insert(label_id);
1642        drop(node_labels);
1643
1644        // Add to label_index
1645        let mut index = self.label_index.write();
1646        if (label_id as usize) >= index.len() {
1647            index.resize(label_id as usize + 1, FxHashMap::default());
1648        }
1649        index[label_id as usize].insert(node_id, ());
1650
1651        // Update label count in node record
1652        if let Some(chain) = self.nodes.write().get_mut(&node_id)
1653            && let Some(record) = chain.latest_mut()
1654        {
1655            let count = self.node_labels.read().get(&node_id).map_or(0, |s| s.len());
1656            record.set_label_count(count as u16);
1657        }
1658
1659        true
1660    }
1661
1662    /// Adds a label to a node.
1663    /// (Tiered storage version)
1664    #[cfg(feature = "tiered-storage")]
1665    pub fn add_label(&self, node_id: NodeId, label: &str) -> bool {
1666        let epoch = self.current_epoch();
1667
1668        // Check if node exists
1669        let versions = self.node_versions.read();
1670        if let Some(index) = versions.get(&node_id) {
1671            if let Some(vref) = index.visible_at(epoch) {
1672                if let Some(record) = self.read_node_record(&vref) {
1673                    if record.is_deleted() {
1674                        return false;
1675                    }
1676                } else {
1677                    return false;
1678                }
1679            } else {
1680                return false;
1681            }
1682        } else {
1683            return false;
1684        }
1685        drop(versions);
1686
1687        // Get or create label ID
1688        let label_id = self.get_or_create_label_id(label);
1689
1690        // Add to node_labels map
1691        let mut node_labels = self.node_labels.write();
1692        let label_set = node_labels.entry(node_id).or_default();
1693
1694        if label_set.contains(&label_id) {
1695            return false; // Already has this label
1696        }
1697
1698        label_set.insert(label_id);
1699        drop(node_labels);
1700
1701        // Add to label_index
1702        let mut index = self.label_index.write();
1703        if (label_id as usize) >= index.len() {
1704            index.resize(label_id as usize + 1, FxHashMap::default());
1705        }
1706        index[label_id as usize].insert(node_id, ());
1707
1708        // Note: label_count in record is not updated for tiered storage.
1709        // The record is immutable once allocated in the arena.
1710
1711        true
1712    }
1713
1714    /// Removes a label from a node.
1715    ///
1716    /// Returns true if the label was removed, false if the node doesn't exist
1717    /// or doesn't have the label.
1718    #[cfg(not(feature = "tiered-storage"))]
1719    pub fn remove_label(&self, node_id: NodeId, label: &str) -> bool {
1720        let epoch = self.current_epoch();
1721
1722        // Check if node exists
1723        let nodes = self.nodes.read();
1724        if let Some(chain) = nodes.get(&node_id) {
1725            if chain.visible_at(epoch).map_or(true, |r| r.is_deleted()) {
1726                return false;
1727            }
1728        } else {
1729            return false;
1730        }
1731        drop(nodes);
1732
1733        // Get label ID
1734        let label_id = {
1735            let label_ids = self.label_to_id.read();
1736            match label_ids.get(label) {
1737                Some(&id) => id,
1738                None => return false, // Label doesn't exist
1739            }
1740        };
1741
1742        // Remove from node_labels map
1743        let mut node_labels = self.node_labels.write();
1744        if let Some(label_set) = node_labels.get_mut(&node_id) {
1745            if !label_set.remove(&label_id) {
1746                return false; // Node doesn't have this label
1747            }
1748        } else {
1749            return false;
1750        }
1751        drop(node_labels);
1752
1753        // Remove from label_index
1754        let mut index = self.label_index.write();
1755        if (label_id as usize) < index.len() {
1756            index[label_id as usize].remove(&node_id);
1757        }
1758
1759        // Update label count in node record
1760        if let Some(chain) = self.nodes.write().get_mut(&node_id)
1761            && let Some(record) = chain.latest_mut()
1762        {
1763            let count = self.node_labels.read().get(&node_id).map_or(0, |s| s.len());
1764            record.set_label_count(count as u16);
1765        }
1766
1767        true
1768    }
1769
1770    /// Removes a label from a node.
1771    /// (Tiered storage version)
1772    #[cfg(feature = "tiered-storage")]
1773    pub fn remove_label(&self, node_id: NodeId, label: &str) -> bool {
1774        let epoch = self.current_epoch();
1775
1776        // Check if node exists
1777        let versions = self.node_versions.read();
1778        if let Some(index) = versions.get(&node_id) {
1779            if let Some(vref) = index.visible_at(epoch) {
1780                if let Some(record) = self.read_node_record(&vref) {
1781                    if record.is_deleted() {
1782                        return false;
1783                    }
1784                } else {
1785                    return false;
1786                }
1787            } else {
1788                return false;
1789            }
1790        } else {
1791            return false;
1792        }
1793        drop(versions);
1794
1795        // Get label ID
1796        let label_id = {
1797            let label_ids = self.label_to_id.read();
1798            match label_ids.get(label) {
1799                Some(&id) => id,
1800                None => return false, // Label doesn't exist
1801            }
1802        };
1803
1804        // Remove from node_labels map
1805        let mut node_labels = self.node_labels.write();
1806        if let Some(label_set) = node_labels.get_mut(&node_id) {
1807            if !label_set.remove(&label_id) {
1808                return false; // Node doesn't have this label
1809            }
1810        } else {
1811            return false;
1812        }
1813        drop(node_labels);
1814
1815        // Remove from label_index
1816        let mut index = self.label_index.write();
1817        if (label_id as usize) < index.len() {
1818            index[label_id as usize].remove(&node_id);
1819        }
1820
1821        // Note: label_count in record is not updated for tiered storage.
1822
1823        true
1824    }
1825
1826    /// Returns the number of nodes (non-deleted at current epoch).
1827    #[must_use]
1828    #[cfg(not(feature = "tiered-storage"))]
1829    pub fn node_count(&self) -> usize {
1830        let epoch = self.current_epoch();
1831        self.nodes
1832            .read()
1833            .values()
1834            .filter_map(|chain| chain.visible_at(epoch))
1835            .filter(|r| !r.is_deleted())
1836            .count()
1837    }
1838
1839    /// Returns the number of nodes (non-deleted at current epoch).
1840    /// (Tiered storage version)
1841    #[must_use]
1842    #[cfg(feature = "tiered-storage")]
1843    pub fn node_count(&self) -> usize {
1844        let epoch = self.current_epoch();
1845        let versions = self.node_versions.read();
1846        versions
1847            .iter()
1848            .filter(|(_, index)| {
1849                index.visible_at(epoch).map_or(false, |vref| {
1850                    self.read_node_record(&vref)
1851                        .map_or(false, |r| !r.is_deleted())
1852                })
1853            })
1854            .count()
1855    }
1856
1857    /// Returns all node IDs in the store.
1858    ///
1859    /// This returns a snapshot of current node IDs. The returned vector
1860    /// excludes deleted nodes. Results are sorted by NodeId for deterministic
1861    /// iteration order.
1862    #[must_use]
1863    #[cfg(not(feature = "tiered-storage"))]
1864    pub fn node_ids(&self) -> Vec<NodeId> {
1865        let epoch = self.current_epoch();
1866        let mut ids: Vec<NodeId> = self
1867            .nodes
1868            .read()
1869            .iter()
1870            .filter_map(|(id, chain)| {
1871                chain
1872                    .visible_at(epoch)
1873                    .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
1874            })
1875            .collect();
1876        ids.sort_unstable();
1877        ids
1878    }
1879
1880    /// Returns all node IDs in the store.
1881    /// (Tiered storage version)
1882    #[must_use]
1883    #[cfg(feature = "tiered-storage")]
1884    pub fn node_ids(&self) -> Vec<NodeId> {
1885        let epoch = self.current_epoch();
1886        let versions = self.node_versions.read();
1887        let mut ids: Vec<NodeId> = versions
1888            .iter()
1889            .filter_map(|(id, index)| {
1890                index.visible_at(epoch).and_then(|vref| {
1891                    self.read_node_record(&vref)
1892                        .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
1893                })
1894            })
1895            .collect();
1896        ids.sort_unstable();
1897        ids
1898    }
1899
1900    // === Edge Operations ===
1901
1902    /// Creates a new edge.
1903    pub fn create_edge(&self, src: NodeId, dst: NodeId, edge_type: &str) -> EdgeId {
1904        self.needs_stats_recompute.store(true, Ordering::Relaxed);
1905        self.create_edge_versioned(src, dst, edge_type, self.current_epoch(), TxId::SYSTEM)
1906    }
1907
1908    /// Creates a new edge within a transaction context.
1909    #[cfg(not(feature = "tiered-storage"))]
1910    pub fn create_edge_versioned(
1911        &self,
1912        src: NodeId,
1913        dst: NodeId,
1914        edge_type: &str,
1915        epoch: EpochId,
1916        tx_id: TxId,
1917    ) -> EdgeId {
1918        let id = EdgeId::new(self.next_edge_id.fetch_add(1, Ordering::Relaxed));
1919        let type_id = self.get_or_create_edge_type_id(edge_type);
1920
1921        let record = EdgeRecord::new(id, src, dst, type_id, epoch);
1922        let chain = VersionChain::with_initial(record, epoch, tx_id);
1923        self.edges.write().insert(id, chain);
1924
1925        // Update adjacency
1926        self.forward_adj.add_edge(src, dst, id);
1927        if let Some(ref backward) = self.backward_adj {
1928            backward.add_edge(dst, src, id);
1929        }
1930
1931        id
1932    }
1933
1934    /// Creates a new edge within a transaction context.
1935    /// (Tiered storage version)
1936    #[cfg(feature = "tiered-storage")]
1937    pub fn create_edge_versioned(
1938        &self,
1939        src: NodeId,
1940        dst: NodeId,
1941        edge_type: &str,
1942        epoch: EpochId,
1943        tx_id: TxId,
1944    ) -> EdgeId {
1945        let id = EdgeId::new(self.next_edge_id.fetch_add(1, Ordering::Relaxed));
1946        let type_id = self.get_or_create_edge_type_id(edge_type);
1947
1948        let record = EdgeRecord::new(id, src, dst, type_id, epoch);
1949
1950        // Allocate record in arena and get offset (create epoch if needed)
1951        let arena = self.arena_allocator.arena_or_create(epoch);
1952        let (offset, _stored) = arena.alloc_value_with_offset(record);
1953
1954        // Create HotVersionRef pointing to arena data
1955        let hot_ref = HotVersionRef::new(epoch, offset, tx_id);
1956
1957        // Create or update version index
1958        let mut versions = self.edge_versions.write();
1959        if let Some(index) = versions.get_mut(&id) {
1960            index.add_hot(hot_ref);
1961        } else {
1962            versions.insert(id, VersionIndex::with_initial(hot_ref));
1963        }
1964
1965        // Update adjacency
1966        self.forward_adj.add_edge(src, dst, id);
1967        if let Some(ref backward) = self.backward_adj {
1968            backward.add_edge(dst, src, id);
1969        }
1970
1971        id
1972    }
1973
1974    /// Creates a new edge with properties.
1975    pub fn create_edge_with_props(
1976        &self,
1977        src: NodeId,
1978        dst: NodeId,
1979        edge_type: &str,
1980        properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
1981    ) -> EdgeId {
1982        let id = self.create_edge(src, dst, edge_type);
1983
1984        for (key, value) in properties {
1985            self.edge_properties.set(id, key.into(), value.into());
1986        }
1987
1988        id
1989    }
1990
1991    /// Gets an edge by ID (latest visible version).
1992    #[must_use]
1993    pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
1994        self.get_edge_at_epoch(id, self.current_epoch())
1995    }
1996
1997    /// Gets an edge by ID at a specific epoch.
1998    #[must_use]
1999    #[cfg(not(feature = "tiered-storage"))]
2000    pub fn get_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> Option<Edge> {
2001        let edges = self.edges.read();
2002        let chain = edges.get(&id)?;
2003        let record = chain.visible_at(epoch)?;
2004
2005        if record.is_deleted() {
2006            return None;
2007        }
2008
2009        let edge_type = {
2010            let id_to_type = self.id_to_edge_type.read();
2011            id_to_type.get(record.type_id as usize)?.clone()
2012        };
2013
2014        let mut edge = Edge::new(id, record.src, record.dst, edge_type);
2015
2016        // Get properties
2017        edge.properties = self.edge_properties.get_all(id).into_iter().collect();
2018
2019        Some(edge)
2020    }
2021
2022    /// Gets an edge by ID at a specific epoch.
2023    /// (Tiered storage version)
2024    #[must_use]
2025    #[cfg(feature = "tiered-storage")]
2026    pub fn get_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> Option<Edge> {
2027        let versions = self.edge_versions.read();
2028        let index = versions.get(&id)?;
2029        let version_ref = index.visible_at(epoch)?;
2030
2031        let record = self.read_edge_record(&version_ref)?;
2032
2033        if record.is_deleted() {
2034            return None;
2035        }
2036
2037        let edge_type = {
2038            let id_to_type = self.id_to_edge_type.read();
2039            id_to_type.get(record.type_id as usize)?.clone()
2040        };
2041
2042        let mut edge = Edge::new(id, record.src, record.dst, edge_type);
2043
2044        // Get properties
2045        edge.properties = self.edge_properties.get_all(id).into_iter().collect();
2046
2047        Some(edge)
2048    }
2049
2050    /// Gets an edge visible to a specific transaction.
2051    #[must_use]
2052    #[cfg(not(feature = "tiered-storage"))]
2053    pub fn get_edge_versioned(&self, id: EdgeId, epoch: EpochId, tx_id: TxId) -> Option<Edge> {
2054        let edges = self.edges.read();
2055        let chain = edges.get(&id)?;
2056        let record = chain.visible_to(epoch, tx_id)?;
2057
2058        if record.is_deleted() {
2059            return None;
2060        }
2061
2062        let edge_type = {
2063            let id_to_type = self.id_to_edge_type.read();
2064            id_to_type.get(record.type_id as usize)?.clone()
2065        };
2066
2067        let mut edge = Edge::new(id, record.src, record.dst, edge_type);
2068
2069        // Get properties
2070        edge.properties = self.edge_properties.get_all(id).into_iter().collect();
2071
2072        Some(edge)
2073    }
2074
2075    /// Gets an edge visible to a specific transaction.
2076    /// (Tiered storage version)
2077    #[must_use]
2078    #[cfg(feature = "tiered-storage")]
2079    pub fn get_edge_versioned(&self, id: EdgeId, epoch: EpochId, tx_id: TxId) -> Option<Edge> {
2080        let versions = self.edge_versions.read();
2081        let index = versions.get(&id)?;
2082        let version_ref = index.visible_to(epoch, tx_id)?;
2083
2084        let record = self.read_edge_record(&version_ref)?;
2085
2086        if record.is_deleted() {
2087            return None;
2088        }
2089
2090        let edge_type = {
2091            let id_to_type = self.id_to_edge_type.read();
2092            id_to_type.get(record.type_id as usize)?.clone()
2093        };
2094
2095        let mut edge = Edge::new(id, record.src, record.dst, edge_type);
2096
2097        // Get properties
2098        edge.properties = self.edge_properties.get_all(id).into_iter().collect();
2099
2100        Some(edge)
2101    }
2102
2103    /// Reads an EdgeRecord from arena using a VersionRef.
2104    #[cfg(feature = "tiered-storage")]
2105    #[allow(unsafe_code)]
2106    fn read_edge_record(&self, version_ref: &VersionRef) -> Option<EdgeRecord> {
2107        match version_ref {
2108            VersionRef::Hot(hot_ref) => {
2109                let arena = self.arena_allocator.arena(hot_ref.epoch);
2110                // SAFETY: The offset was returned by alloc_value_with_offset for an EdgeRecord
2111                let record: &EdgeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
2112                Some(*record)
2113            }
2114            VersionRef::Cold(cold_ref) => {
2115                // Read from compressed epoch store
2116                self.epoch_store
2117                    .get_edge(cold_ref.epoch, cold_ref.block_offset, cold_ref.length)
2118            }
2119        }
2120    }
2121
2122    /// Deletes an edge (using latest epoch).
2123    pub fn delete_edge(&self, id: EdgeId) -> bool {
2124        self.needs_stats_recompute.store(true, Ordering::Relaxed);
2125        self.delete_edge_at_epoch(id, self.current_epoch())
2126    }
2127
2128    /// Deletes an edge at a specific epoch.
2129    #[cfg(not(feature = "tiered-storage"))]
2130    pub fn delete_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> bool {
2131        let mut edges = self.edges.write();
2132        if let Some(chain) = edges.get_mut(&id) {
2133            // Get the visible record to check if deleted and get src/dst
2134            let (src, dst) = {
2135                match chain.visible_at(epoch) {
2136                    Some(record) => {
2137                        if record.is_deleted() {
2138                            return false;
2139                        }
2140                        (record.src, record.dst)
2141                    }
2142                    None => return false, // Not visible at this epoch (already deleted)
2143                }
2144            };
2145
2146            // Mark the version chain as deleted
2147            chain.mark_deleted(epoch);
2148
2149            drop(edges); // Release lock
2150
2151            // Mark as deleted in adjacency (soft delete)
2152            self.forward_adj.mark_deleted(src, id);
2153            if let Some(ref backward) = self.backward_adj {
2154                backward.mark_deleted(dst, id);
2155            }
2156
2157            // Remove properties
2158            self.edge_properties.remove_all(id);
2159
2160            true
2161        } else {
2162            false
2163        }
2164    }
2165
2166    /// Deletes an edge at a specific epoch.
2167    /// (Tiered storage version)
2168    #[cfg(feature = "tiered-storage")]
2169    pub fn delete_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> bool {
2170        let mut versions = self.edge_versions.write();
2171        if let Some(index) = versions.get_mut(&id) {
2172            // Get the visible record to check if deleted and get src/dst
2173            let (src, dst) = {
2174                match index.visible_at(epoch) {
2175                    Some(version_ref) => {
2176                        if let Some(record) = self.read_edge_record(&version_ref) {
2177                            if record.is_deleted() {
2178                                return false;
2179                            }
2180                            (record.src, record.dst)
2181                        } else {
2182                            return false;
2183                        }
2184                    }
2185                    None => return false,
2186                }
2187            };
2188
2189            // Mark as deleted in version index
2190            index.mark_deleted(epoch);
2191
2192            drop(versions); // Release lock
2193
2194            // Mark as deleted in adjacency (soft delete)
2195            self.forward_adj.mark_deleted(src, id);
2196            if let Some(ref backward) = self.backward_adj {
2197                backward.mark_deleted(dst, id);
2198            }
2199
2200            // Remove properties
2201            self.edge_properties.remove_all(id);
2202
2203            true
2204        } else {
2205            false
2206        }
2207    }
2208
2209    /// Returns the number of edges (non-deleted at current epoch).
2210    #[must_use]
2211    #[cfg(not(feature = "tiered-storage"))]
2212    pub fn edge_count(&self) -> usize {
2213        let epoch = self.current_epoch();
2214        self.edges
2215            .read()
2216            .values()
2217            .filter_map(|chain| chain.visible_at(epoch))
2218            .filter(|r| !r.is_deleted())
2219            .count()
2220    }
2221
2222    /// Returns the number of edges (non-deleted at current epoch).
2223    /// (Tiered storage version)
2224    #[must_use]
2225    #[cfg(feature = "tiered-storage")]
2226    pub fn edge_count(&self) -> usize {
2227        let epoch = self.current_epoch();
2228        let versions = self.edge_versions.read();
2229        versions
2230            .iter()
2231            .filter(|(_, index)| {
2232                index.visible_at(epoch).map_or(false, |vref| {
2233                    self.read_edge_record(&vref)
2234                        .map_or(false, |r| !r.is_deleted())
2235                })
2236            })
2237            .count()
2238    }
2239
2240    /// Discards all uncommitted versions created by a transaction.
2241    ///
2242    /// This is called during transaction rollback to clean up uncommitted changes.
2243    /// The method removes version chain entries created by the specified transaction.
2244    #[cfg(not(feature = "tiered-storage"))]
2245    pub fn discard_uncommitted_versions(&self, tx_id: TxId) {
2246        // Remove uncommitted node versions
2247        {
2248            let mut nodes = self.nodes.write();
2249            for chain in nodes.values_mut() {
2250                chain.remove_versions_by(tx_id);
2251            }
2252            // Remove completely empty chains (no versions left)
2253            nodes.retain(|_, chain| !chain.is_empty());
2254        }
2255
2256        // Remove uncommitted edge versions
2257        {
2258            let mut edges = self.edges.write();
2259            for chain in edges.values_mut() {
2260                chain.remove_versions_by(tx_id);
2261            }
2262            // Remove completely empty chains (no versions left)
2263            edges.retain(|_, chain| !chain.is_empty());
2264        }
2265    }
2266
2267    /// Discards all uncommitted versions created by a transaction.
2268    /// (Tiered storage version)
2269    #[cfg(feature = "tiered-storage")]
2270    pub fn discard_uncommitted_versions(&self, tx_id: TxId) {
2271        // Remove uncommitted node versions
2272        {
2273            let mut versions = self.node_versions.write();
2274            for index in versions.values_mut() {
2275                index.remove_versions_by(tx_id);
2276            }
2277            // Remove completely empty indexes (no versions left)
2278            versions.retain(|_, index| !index.is_empty());
2279        }
2280
2281        // Remove uncommitted edge versions
2282        {
2283            let mut versions = self.edge_versions.write();
2284            for index in versions.values_mut() {
2285                index.remove_versions_by(tx_id);
2286            }
2287            // Remove completely empty indexes (no versions left)
2288            versions.retain(|_, index| !index.is_empty());
2289        }
2290    }
2291
2292    /// Garbage collects old versions that are no longer visible to any transaction.
2293    ///
2294    /// Versions older than `min_epoch` are pruned from version chains, keeping
2295    /// at most one old version per entity as a baseline. Empty chains are removed.
2296    #[cfg(not(feature = "tiered-storage"))]
2297    pub fn gc_versions(&self, min_epoch: EpochId) {
2298        {
2299            let mut nodes = self.nodes.write();
2300            for chain in nodes.values_mut() {
2301                chain.gc(min_epoch);
2302            }
2303            nodes.retain(|_, chain| !chain.is_empty());
2304        }
2305        {
2306            let mut edges = self.edges.write();
2307            for chain in edges.values_mut() {
2308                chain.gc(min_epoch);
2309            }
2310            edges.retain(|_, chain| !chain.is_empty());
2311        }
2312    }
2313
2314    /// Garbage collects old versions (tiered storage variant).
2315    #[cfg(feature = "tiered-storage")]
2316    pub fn gc_versions(&self, min_epoch: EpochId) {
2317        {
2318            let mut versions = self.node_versions.write();
2319            for index in versions.values_mut() {
2320                index.gc(min_epoch);
2321            }
2322            versions.retain(|_, index| !index.is_empty());
2323        }
2324        {
2325            let mut versions = self.edge_versions.write();
2326            for index in versions.values_mut() {
2327                index.gc(min_epoch);
2328            }
2329            versions.retain(|_, index| !index.is_empty());
2330        }
2331    }
2332
2333    /// Freezes an epoch from hot (arena) storage to cold (compressed) storage.
2334    ///
2335    /// This is called by the transaction manager when an epoch becomes eligible
2336    /// for freezing (no active transactions can see it). The freeze process:
2337    ///
2338    /// 1. Collects all hot version refs for the epoch
2339    /// 2. Reads the corresponding records from arena
2340    /// 3. Compresses them into a `CompressedEpochBlock`
2341    /// 4. Updates `VersionIndex` entries to point to cold storage
2342    /// 5. The arena can be deallocated after all epochs in it are frozen
2343    ///
2344    /// # Arguments
2345    ///
2346    /// * `epoch` - The epoch to freeze
2347    ///
2348    /// # Returns
2349    ///
2350    /// The number of records frozen (nodes + edges).
2351    #[cfg(feature = "tiered-storage")]
2352    #[allow(unsafe_code)]
2353    pub fn freeze_epoch(&self, epoch: EpochId) -> usize {
2354        // Collect node records to freeze
2355        let mut node_records: Vec<(u64, NodeRecord)> = Vec::new();
2356        let mut node_hot_refs: Vec<(NodeId, HotVersionRef)> = Vec::new();
2357
2358        {
2359            let versions = self.node_versions.read();
2360            for (node_id, index) in versions.iter() {
2361                for hot_ref in index.hot_refs_for_epoch(epoch) {
2362                    let arena = self.arena_allocator.arena(hot_ref.epoch);
2363                    // SAFETY: The offset was returned by alloc_value_with_offset for a NodeRecord
2364                    let record: &NodeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
2365                    node_records.push((node_id.as_u64(), *record));
2366                    node_hot_refs.push((*node_id, *hot_ref));
2367                }
2368            }
2369        }
2370
2371        // Collect edge records to freeze
2372        let mut edge_records: Vec<(u64, EdgeRecord)> = Vec::new();
2373        let mut edge_hot_refs: Vec<(EdgeId, HotVersionRef)> = Vec::new();
2374
2375        {
2376            let versions = self.edge_versions.read();
2377            for (edge_id, index) in versions.iter() {
2378                for hot_ref in index.hot_refs_for_epoch(epoch) {
2379                    let arena = self.arena_allocator.arena(hot_ref.epoch);
2380                    // SAFETY: The offset was returned by alloc_value_with_offset for an EdgeRecord
2381                    let record: &EdgeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
2382                    edge_records.push((edge_id.as_u64(), *record));
2383                    edge_hot_refs.push((*edge_id, *hot_ref));
2384                }
2385            }
2386        }
2387
2388        let total_frozen = node_records.len() + edge_records.len();
2389
2390        if total_frozen == 0 {
2391            return 0;
2392        }
2393
2394        // Freeze to compressed storage
2395        let (node_entries, edge_entries) =
2396            self.epoch_store
2397                .freeze_epoch(epoch, node_records, edge_records);
2398
2399        // Build lookup maps for index entries
2400        let node_entry_map: FxHashMap<u64, _> = node_entries
2401            .iter()
2402            .map(|e| (e.entity_id, (e.offset, e.length)))
2403            .collect();
2404        let edge_entry_map: FxHashMap<u64, _> = edge_entries
2405            .iter()
2406            .map(|e| (e.entity_id, (e.offset, e.length)))
2407            .collect();
2408
2409        // Update version indexes to use cold refs
2410        {
2411            let mut versions = self.node_versions.write();
2412            for (node_id, hot_ref) in &node_hot_refs {
2413                if let Some(index) = versions.get_mut(node_id)
2414                    && let Some(&(offset, length)) = node_entry_map.get(&node_id.as_u64())
2415                {
2416                    let cold_ref = ColdVersionRef {
2417                        epoch,
2418                        block_offset: offset,
2419                        length,
2420                        created_by: hot_ref.created_by,
2421                        deleted_epoch: hot_ref.deleted_epoch,
2422                    };
2423                    index.freeze_epoch(epoch, std::iter::once(cold_ref));
2424                }
2425            }
2426        }
2427
2428        {
2429            let mut versions = self.edge_versions.write();
2430            for (edge_id, hot_ref) in &edge_hot_refs {
2431                if let Some(index) = versions.get_mut(edge_id)
2432                    && let Some(&(offset, length)) = edge_entry_map.get(&edge_id.as_u64())
2433                {
2434                    let cold_ref = ColdVersionRef {
2435                        epoch,
2436                        block_offset: offset,
2437                        length,
2438                        created_by: hot_ref.created_by,
2439                        deleted_epoch: hot_ref.deleted_epoch,
2440                    };
2441                    index.freeze_epoch(epoch, std::iter::once(cold_ref));
2442                }
2443            }
2444        }
2445
2446        total_frozen
2447    }
2448
2449    /// Returns the epoch store for cold storage statistics.
2450    #[cfg(feature = "tiered-storage")]
2451    #[must_use]
2452    pub fn epoch_store(&self) -> &EpochStore {
2453        &self.epoch_store
2454    }
2455
2456    /// Returns the number of distinct labels in the store.
2457    #[must_use]
2458    pub fn label_count(&self) -> usize {
2459        self.id_to_label.read().len()
2460    }
2461
2462    /// Returns the number of distinct property keys in the store.
2463    ///
2464    /// This counts unique property keys across both nodes and edges.
2465    #[must_use]
2466    pub fn property_key_count(&self) -> usize {
2467        let node_keys = self.node_properties.column_count();
2468        let edge_keys = self.edge_properties.column_count();
2469        // Note: This may count some keys twice if the same key is used
2470        // for both nodes and edges. A more precise count would require
2471        // tracking unique keys across both storages.
2472        node_keys + edge_keys
2473    }
2474
2475    /// Returns the number of distinct edge types in the store.
2476    #[must_use]
2477    pub fn edge_type_count(&self) -> usize {
2478        self.id_to_edge_type.read().len()
2479    }
2480
2481    // === Traversal ===
2482
2483    /// Iterates over neighbors of a node in the specified direction.
2484    ///
2485    /// This is the fast path for graph traversal - goes straight to the
2486    /// adjacency index without loading full node data.
2487    pub fn neighbors(
2488        &self,
2489        node: NodeId,
2490        direction: Direction,
2491    ) -> impl Iterator<Item = NodeId> + '_ {
2492        let forward: Box<dyn Iterator<Item = NodeId>> = match direction {
2493            Direction::Outgoing | Direction::Both => {
2494                Box::new(self.forward_adj.neighbors(node).into_iter())
2495            }
2496            Direction::Incoming => Box::new(std::iter::empty()),
2497        };
2498
2499        let backward: Box<dyn Iterator<Item = NodeId>> = match direction {
2500            Direction::Incoming | Direction::Both => {
2501                if let Some(ref adj) = self.backward_adj {
2502                    Box::new(adj.neighbors(node).into_iter())
2503                } else {
2504                    Box::new(std::iter::empty())
2505                }
2506            }
2507            Direction::Outgoing => Box::new(std::iter::empty()),
2508        };
2509
2510        forward.chain(backward)
2511    }
2512
2513    /// Returns edges from a node with their targets.
2514    ///
2515    /// Returns an iterator of (target_node, edge_id) pairs.
2516    pub fn edges_from(
2517        &self,
2518        node: NodeId,
2519        direction: Direction,
2520    ) -> impl Iterator<Item = (NodeId, EdgeId)> + '_ {
2521        let forward: Box<dyn Iterator<Item = (NodeId, EdgeId)>> = match direction {
2522            Direction::Outgoing | Direction::Both => {
2523                Box::new(self.forward_adj.edges_from(node).into_iter())
2524            }
2525            Direction::Incoming => Box::new(std::iter::empty()),
2526        };
2527
2528        let backward: Box<dyn Iterator<Item = (NodeId, EdgeId)>> = match direction {
2529            Direction::Incoming | Direction::Both => {
2530                if let Some(ref adj) = self.backward_adj {
2531                    Box::new(adj.edges_from(node).into_iter())
2532                } else {
2533                    Box::new(std::iter::empty())
2534                }
2535            }
2536            Direction::Outgoing => Box::new(std::iter::empty()),
2537        };
2538
2539        forward.chain(backward)
2540    }
2541
2542    /// Returns edges to a node (where the node is the destination).
2543    ///
2544    /// Returns (source_node, edge_id) pairs for all edges pointing TO this node.
2545    /// Uses the backward adjacency index for O(degree) lookup.
2546    ///
2547    /// # Example
2548    ///
2549    /// ```ignore
2550    /// // For edges: A->B, C->B
2551    /// let incoming = store.edges_to(B);
2552    /// // Returns: [(A, edge1), (C, edge2)]
2553    /// ```
2554    pub fn edges_to(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
2555        if let Some(ref backward) = self.backward_adj {
2556            backward.edges_from(node)
2557        } else {
2558            // Fallback: scan all edges (slow but correct)
2559            self.all_edges()
2560                .filter_map(|edge| {
2561                    if edge.dst == node {
2562                        Some((edge.src, edge.id))
2563                    } else {
2564                        None
2565                    }
2566                })
2567                .collect()
2568        }
2569    }
2570
2571    /// Returns the out-degree of a node (number of outgoing edges).
2572    ///
2573    /// Uses the forward adjacency index for O(1) lookup.
2574    #[must_use]
2575    pub fn out_degree(&self, node: NodeId) -> usize {
2576        self.forward_adj.out_degree(node)
2577    }
2578
2579    /// Returns the in-degree of a node (number of incoming edges).
2580    ///
2581    /// Uses the backward adjacency index for O(1) lookup if available,
2582    /// otherwise falls back to scanning edges.
2583    #[must_use]
2584    pub fn in_degree(&self, node: NodeId) -> usize {
2585        if let Some(ref backward) = self.backward_adj {
2586            backward.in_degree(node)
2587        } else {
2588            // Fallback: count edges (slow)
2589            self.all_edges().filter(|edge| edge.dst == node).count()
2590        }
2591    }
2592
2593    /// Gets the type of an edge by ID.
2594    #[must_use]
2595    #[cfg(not(feature = "tiered-storage"))]
2596    pub fn edge_type(&self, id: EdgeId) -> Option<ArcStr> {
2597        let edges = self.edges.read();
2598        let chain = edges.get(&id)?;
2599        let epoch = self.current_epoch();
2600        let record = chain.visible_at(epoch)?;
2601        let id_to_type = self.id_to_edge_type.read();
2602        id_to_type.get(record.type_id as usize).cloned()
2603    }
2604
2605    /// Gets the type of an edge by ID.
2606    /// (Tiered storage version)
2607    #[must_use]
2608    #[cfg(feature = "tiered-storage")]
2609    pub fn edge_type(&self, id: EdgeId) -> Option<ArcStr> {
2610        let versions = self.edge_versions.read();
2611        let index = versions.get(&id)?;
2612        let epoch = self.current_epoch();
2613        let vref = index.visible_at(epoch)?;
2614        let record = self.read_edge_record(&vref)?;
2615        let id_to_type = self.id_to_edge_type.read();
2616        id_to_type.get(record.type_id as usize).cloned()
2617    }
2618
2619    /// Returns all nodes with a specific label.
2620    ///
2621    /// Uses the label index for O(1) lookup per label. Returns a snapshot -
2622    /// concurrent modifications won't affect the returned vector. Results are
2623    /// sorted by NodeId for deterministic iteration order.
2624    pub fn nodes_by_label(&self, label: &str) -> Vec<NodeId> {
2625        let label_to_id = self.label_to_id.read();
2626        if let Some(&label_id) = label_to_id.get(label) {
2627            let index = self.label_index.read();
2628            if let Some(set) = index.get(label_id as usize) {
2629                let mut ids: Vec<NodeId> = set.keys().copied().collect();
2630                ids.sort_unstable();
2631                return ids;
2632            }
2633        }
2634        Vec::new()
2635    }
2636
2637    // === Admin API: Iteration ===
2638
2639    /// Returns an iterator over all nodes in the database.
2640    ///
2641    /// This creates a snapshot of all visible nodes at the current epoch.
2642    /// Useful for dump/export operations.
2643    #[cfg(not(feature = "tiered-storage"))]
2644    pub fn all_nodes(&self) -> impl Iterator<Item = Node> + '_ {
2645        let epoch = self.current_epoch();
2646        let node_ids: Vec<NodeId> = self
2647            .nodes
2648            .read()
2649            .iter()
2650            .filter_map(|(id, chain)| {
2651                chain
2652                    .visible_at(epoch)
2653                    .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
2654            })
2655            .collect();
2656
2657        node_ids.into_iter().filter_map(move |id| self.get_node(id))
2658    }
2659
2660    /// Returns an iterator over all nodes in the database.
2661    /// (Tiered storage version)
2662    #[cfg(feature = "tiered-storage")]
2663    pub fn all_nodes(&self) -> impl Iterator<Item = Node> + '_ {
2664        let node_ids = self.node_ids();
2665        node_ids.into_iter().filter_map(move |id| self.get_node(id))
2666    }
2667
2668    /// Returns an iterator over all edges in the database.
2669    ///
2670    /// This creates a snapshot of all visible edges at the current epoch.
2671    /// Useful for dump/export operations.
2672    #[cfg(not(feature = "tiered-storage"))]
2673    pub fn all_edges(&self) -> impl Iterator<Item = Edge> + '_ {
2674        let epoch = self.current_epoch();
2675        let edge_ids: Vec<EdgeId> = self
2676            .edges
2677            .read()
2678            .iter()
2679            .filter_map(|(id, chain)| {
2680                chain
2681                    .visible_at(epoch)
2682                    .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
2683            })
2684            .collect();
2685
2686        edge_ids.into_iter().filter_map(move |id| self.get_edge(id))
2687    }
2688
2689    /// Returns an iterator over all edges in the database.
2690    /// (Tiered storage version)
2691    #[cfg(feature = "tiered-storage")]
2692    pub fn all_edges(&self) -> impl Iterator<Item = Edge> + '_ {
2693        let epoch = self.current_epoch();
2694        let versions = self.edge_versions.read();
2695        let edge_ids: Vec<EdgeId> = versions
2696            .iter()
2697            .filter_map(|(id, index)| {
2698                index.visible_at(epoch).and_then(|vref| {
2699                    self.read_edge_record(&vref)
2700                        .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
2701                })
2702            })
2703            .collect();
2704
2705        edge_ids.into_iter().filter_map(move |id| self.get_edge(id))
2706    }
2707
2708    /// Returns all label names in the database.
2709    pub fn all_labels(&self) -> Vec<String> {
2710        self.id_to_label
2711            .read()
2712            .iter()
2713            .map(|s| s.to_string())
2714            .collect()
2715    }
2716
2717    /// Returns all edge type names in the database.
2718    pub fn all_edge_types(&self) -> Vec<String> {
2719        self.id_to_edge_type
2720            .read()
2721            .iter()
2722            .map(|s| s.to_string())
2723            .collect()
2724    }
2725
2726    /// Returns all property keys used in the database.
2727    pub fn all_property_keys(&self) -> Vec<String> {
2728        let mut keys = std::collections::HashSet::new();
2729        for key in self.node_properties.keys() {
2730            keys.insert(key.to_string());
2731        }
2732        for key in self.edge_properties.keys() {
2733            keys.insert(key.to_string());
2734        }
2735        keys.into_iter().collect()
2736    }
2737
2738    /// Returns an iterator over nodes with a specific label.
2739    pub fn nodes_with_label<'a>(&'a self, label: &str) -> impl Iterator<Item = Node> + 'a {
2740        let node_ids = self.nodes_by_label(label);
2741        node_ids.into_iter().filter_map(move |id| self.get_node(id))
2742    }
2743
2744    /// Returns an iterator over edges with a specific type.
2745    #[cfg(not(feature = "tiered-storage"))]
2746    pub fn edges_with_type<'a>(&'a self, edge_type: &str) -> impl Iterator<Item = Edge> + 'a {
2747        let epoch = self.current_epoch();
2748        let type_to_id = self.edge_type_to_id.read();
2749
2750        if let Some(&type_id) = type_to_id.get(edge_type) {
2751            let edge_ids: Vec<EdgeId> = self
2752                .edges
2753                .read()
2754                .iter()
2755                .filter_map(|(id, chain)| {
2756                    chain.visible_at(epoch).and_then(|r| {
2757                        if !r.is_deleted() && r.type_id == type_id {
2758                            Some(*id)
2759                        } else {
2760                            None
2761                        }
2762                    })
2763                })
2764                .collect();
2765
2766            // Return a boxed iterator for the found edges
2767            Box::new(edge_ids.into_iter().filter_map(move |id| self.get_edge(id)))
2768                as Box<dyn Iterator<Item = Edge> + 'a>
2769        } else {
2770            // Return empty iterator
2771            Box::new(std::iter::empty()) as Box<dyn Iterator<Item = Edge> + 'a>
2772        }
2773    }
2774
2775    /// Returns an iterator over edges with a specific type.
2776    /// (Tiered storage version)
2777    #[cfg(feature = "tiered-storage")]
2778    pub fn edges_with_type<'a>(&'a self, edge_type: &str) -> impl Iterator<Item = Edge> + 'a {
2779        let epoch = self.current_epoch();
2780        let type_to_id = self.edge_type_to_id.read();
2781
2782        if let Some(&type_id) = type_to_id.get(edge_type) {
2783            let versions = self.edge_versions.read();
2784            let edge_ids: Vec<EdgeId> = versions
2785                .iter()
2786                .filter_map(|(id, index)| {
2787                    index.visible_at(epoch).and_then(|vref| {
2788                        self.read_edge_record(&vref).and_then(|r| {
2789                            if !r.is_deleted() && r.type_id == type_id {
2790                                Some(*id)
2791                            } else {
2792                                None
2793                            }
2794                        })
2795                    })
2796                })
2797                .collect();
2798
2799            Box::new(edge_ids.into_iter().filter_map(move |id| self.get_edge(id)))
2800                as Box<dyn Iterator<Item = Edge> + 'a>
2801        } else {
2802            Box::new(std::iter::empty()) as Box<dyn Iterator<Item = Edge> + 'a>
2803        }
2804    }
2805
2806    // === Zone Map Support ===
2807
2808    /// Checks if a node property predicate might match any nodes.
2809    ///
2810    /// Uses zone maps for early filtering. Returns `true` if there might be
2811    /// matching nodes, `false` if there definitely aren't.
2812    #[must_use]
2813    pub fn node_property_might_match(
2814        &self,
2815        property: &PropertyKey,
2816        op: CompareOp,
2817        value: &Value,
2818    ) -> bool {
2819        self.node_properties.might_match(property, op, value)
2820    }
2821
2822    /// Checks if an edge property predicate might match any edges.
2823    #[must_use]
2824    pub fn edge_property_might_match(
2825        &self,
2826        property: &PropertyKey,
2827        op: CompareOp,
2828        value: &Value,
2829    ) -> bool {
2830        self.edge_properties.might_match(property, op, value)
2831    }
2832
2833    /// Gets the zone map for a node property.
2834    #[must_use]
2835    pub fn node_property_zone_map(&self, property: &PropertyKey) -> Option<ZoneMapEntry> {
2836        self.node_properties.zone_map(property)
2837    }
2838
2839    /// Gets the zone map for an edge property.
2840    #[must_use]
2841    pub fn edge_property_zone_map(&self, property: &PropertyKey) -> Option<ZoneMapEntry> {
2842        self.edge_properties.zone_map(property)
2843    }
2844
2845    /// Rebuilds zone maps for all properties.
2846    pub fn rebuild_zone_maps(&self) {
2847        self.node_properties.rebuild_zone_maps();
2848        self.edge_properties.rebuild_zone_maps();
2849    }
2850
2851    // === Statistics ===
2852
2853    /// Returns the current statistics (cheap `Arc` clone, no deep copy).
2854    #[must_use]
2855    pub fn statistics(&self) -> Arc<Statistics> {
2856        Arc::clone(&self.statistics.read())
2857    }
2858
2859    /// Recomputes statistics if they are stale (i.e., after mutations).
2860    ///
2861    /// Call this before reading statistics for query optimization.
2862    /// Avoids redundant recomputation if no mutations occurred.
2863    pub fn ensure_statistics_fresh(&self) {
2864        if self.needs_stats_recompute.swap(false, Ordering::Relaxed) {
2865            self.compute_statistics();
2866        }
2867    }
2868
2869    /// Recomputes statistics from current data.
2870    ///
2871    /// Scans all labels and edge types to build cardinality estimates for the
2872    /// query optimizer. Call this periodically or after bulk data loads.
2873    #[cfg(not(feature = "tiered-storage"))]
2874    pub fn compute_statistics(&self) {
2875        let mut stats = Statistics::new();
2876
2877        // Compute total counts
2878        stats.total_nodes = self.node_count() as u64;
2879        stats.total_edges = self.edge_count() as u64;
2880
2881        // Compute per-label statistics
2882        let id_to_label = self.id_to_label.read();
2883        let label_index = self.label_index.read();
2884
2885        for (label_id, label_name) in id_to_label.iter().enumerate() {
2886            let node_count = label_index.get(label_id).map_or(0, |set| set.len() as u64);
2887
2888            if node_count > 0 {
2889                // Estimate average degree
2890                let avg_out_degree = if stats.total_nodes > 0 {
2891                    stats.total_edges as f64 / stats.total_nodes as f64
2892                } else {
2893                    0.0
2894                };
2895
2896                let label_stats =
2897                    LabelStatistics::new(node_count).with_degrees(avg_out_degree, avg_out_degree);
2898
2899                stats.update_label(label_name.as_ref(), label_stats);
2900            }
2901        }
2902
2903        // Compute per-edge-type statistics
2904        let id_to_edge_type = self.id_to_edge_type.read();
2905        let edges = self.edges.read();
2906        let epoch = self.current_epoch();
2907
2908        let mut edge_type_counts: FxHashMap<u32, u64> = FxHashMap::default();
2909        for chain in edges.values() {
2910            if let Some(record) = chain.visible_at(epoch)
2911                && !record.is_deleted()
2912            {
2913                *edge_type_counts.entry(record.type_id).or_default() += 1;
2914            }
2915        }
2916
2917        for (type_id, count) in edge_type_counts {
2918            if let Some(type_name) = id_to_edge_type.get(type_id as usize) {
2919                let avg_degree = if stats.total_nodes > 0 {
2920                    count as f64 / stats.total_nodes as f64
2921                } else {
2922                    0.0
2923                };
2924
2925                let edge_stats = EdgeTypeStatistics::new(count, avg_degree, avg_degree);
2926                stats.update_edge_type(type_name.as_ref(), edge_stats);
2927            }
2928        }
2929
2930        *self.statistics.write() = Arc::new(stats);
2931    }
2932
2933    /// Recomputes statistics from current data.
2934    /// (Tiered storage version)
2935    #[cfg(feature = "tiered-storage")]
2936    pub fn compute_statistics(&self) {
2937        let mut stats = Statistics::new();
2938
2939        // Compute total counts
2940        stats.total_nodes = self.node_count() as u64;
2941        stats.total_edges = self.edge_count() as u64;
2942
2943        // Compute per-label statistics
2944        let id_to_label = self.id_to_label.read();
2945        let label_index = self.label_index.read();
2946
2947        for (label_id, label_name) in id_to_label.iter().enumerate() {
2948            let node_count = label_index.get(label_id).map_or(0, |set| set.len() as u64);
2949
2950            if node_count > 0 {
2951                let avg_out_degree = if stats.total_nodes > 0 {
2952                    stats.total_edges as f64 / stats.total_nodes as f64
2953                } else {
2954                    0.0
2955                };
2956
2957                let label_stats =
2958                    LabelStatistics::new(node_count).with_degrees(avg_out_degree, avg_out_degree);
2959
2960                stats.update_label(label_name.as_ref(), label_stats);
2961            }
2962        }
2963
2964        // Compute per-edge-type statistics
2965        let id_to_edge_type = self.id_to_edge_type.read();
2966        let versions = self.edge_versions.read();
2967        let epoch = self.current_epoch();
2968
2969        let mut edge_type_counts: FxHashMap<u32, u64> = FxHashMap::default();
2970        for index in versions.values() {
2971            if let Some(vref) = index.visible_at(epoch)
2972                && let Some(record) = self.read_edge_record(&vref)
2973                && !record.is_deleted()
2974            {
2975                *edge_type_counts.entry(record.type_id).or_default() += 1;
2976            }
2977        }
2978
2979        for (type_id, count) in edge_type_counts {
2980            if let Some(type_name) = id_to_edge_type.get(type_id as usize) {
2981                let avg_degree = if stats.total_nodes > 0 {
2982                    count as f64 / stats.total_nodes as f64
2983                } else {
2984                    0.0
2985                };
2986
2987                let edge_stats = EdgeTypeStatistics::new(count, avg_degree, avg_degree);
2988                stats.update_edge_type(type_name.as_ref(), edge_stats);
2989            }
2990        }
2991
2992        *self.statistics.write() = Arc::new(stats);
2993    }
2994
2995    /// Estimates cardinality for a label scan.
2996    #[must_use]
2997    pub fn estimate_label_cardinality(&self, label: &str) -> f64 {
2998        self.statistics.read().estimate_label_cardinality(label)
2999    }
3000
3001    /// Estimates average degree for an edge type.
3002    #[must_use]
3003    pub fn estimate_avg_degree(&self, edge_type: &str, outgoing: bool) -> f64 {
3004        self.statistics
3005            .read()
3006            .estimate_avg_degree(edge_type, outgoing)
3007    }
3008
3009    // === Internal Helpers ===
3010
3011    fn get_or_create_label_id(&self, label: &str) -> u32 {
3012        {
3013            let label_to_id = self.label_to_id.read();
3014            if let Some(&id) = label_to_id.get(label) {
3015                return id;
3016            }
3017        }
3018
3019        let mut label_to_id = self.label_to_id.write();
3020        let mut id_to_label = self.id_to_label.write();
3021
3022        // Double-check after acquiring write lock
3023        if let Some(&id) = label_to_id.get(label) {
3024            return id;
3025        }
3026
3027        let id = id_to_label.len() as u32;
3028
3029        let label: ArcStr = label.into();
3030        label_to_id.insert(label.clone(), id);
3031        id_to_label.push(label);
3032
3033        id
3034    }
3035
3036    fn get_or_create_edge_type_id(&self, edge_type: &str) -> u32 {
3037        {
3038            let type_to_id = self.edge_type_to_id.read();
3039            if let Some(&id) = type_to_id.get(edge_type) {
3040                return id;
3041            }
3042        }
3043
3044        let mut type_to_id = self.edge_type_to_id.write();
3045        let mut id_to_type = self.id_to_edge_type.write();
3046
3047        // Double-check
3048        if let Some(&id) = type_to_id.get(edge_type) {
3049            return id;
3050        }
3051
3052        let id = id_to_type.len() as u32;
3053        let edge_type: ArcStr = edge_type.into();
3054        type_to_id.insert(edge_type.clone(), id);
3055        id_to_type.push(edge_type);
3056
3057        id
3058    }
3059
3060    // === Recovery Support ===
3061
3062    /// Creates a node with a specific ID during recovery.
3063    ///
3064    /// This is used for WAL recovery to restore nodes with their original IDs.
3065    /// The caller must ensure IDs don't conflict with existing nodes.
3066    #[cfg(not(feature = "tiered-storage"))]
3067    pub fn create_node_with_id(&self, id: NodeId, labels: &[&str]) {
3068        let epoch = self.current_epoch();
3069        let mut record = NodeRecord::new(id, epoch);
3070        record.set_label_count(labels.len() as u16);
3071
3072        // Store labels in node_labels map and label_index
3073        let mut node_label_set = FxHashSet::default();
3074        for label in labels {
3075            let label_id = self.get_or_create_label_id(*label);
3076            node_label_set.insert(label_id);
3077
3078            // Update label index
3079            let mut index = self.label_index.write();
3080            while index.len() <= label_id as usize {
3081                index.push(FxHashMap::default());
3082            }
3083            index[label_id as usize].insert(id, ());
3084        }
3085
3086        // Store node's labels
3087        self.node_labels.write().insert(id, node_label_set);
3088
3089        // Create version chain with initial version (using SYSTEM tx for recovery)
3090        let chain = VersionChain::with_initial(record, epoch, TxId::SYSTEM);
3091        self.nodes.write().insert(id, chain);
3092
3093        // Update next_node_id if necessary to avoid future collisions
3094        let id_val = id.as_u64();
3095        let _ = self
3096            .next_node_id
3097            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
3098                if id_val >= current {
3099                    Some(id_val + 1)
3100                } else {
3101                    None
3102                }
3103            });
3104    }
3105
3106    /// Creates a node with a specific ID during recovery.
3107    /// (Tiered storage version)
3108    #[cfg(feature = "tiered-storage")]
3109    pub fn create_node_with_id(&self, id: NodeId, labels: &[&str]) {
3110        let epoch = self.current_epoch();
3111        let mut record = NodeRecord::new(id, epoch);
3112        record.set_label_count(labels.len() as u16);
3113
3114        // Store labels in node_labels map and label_index
3115        let mut node_label_set = FxHashSet::default();
3116        for label in labels {
3117            let label_id = self.get_or_create_label_id(*label);
3118            node_label_set.insert(label_id);
3119
3120            // Update label index
3121            let mut index = self.label_index.write();
3122            while index.len() <= label_id as usize {
3123                index.push(FxHashMap::default());
3124            }
3125            index[label_id as usize].insert(id, ());
3126        }
3127
3128        // Store node's labels
3129        self.node_labels.write().insert(id, node_label_set);
3130
3131        // Allocate record in arena and get offset (create epoch if needed)
3132        let arena = self.arena_allocator.arena_or_create(epoch);
3133        let (offset, _stored) = arena.alloc_value_with_offset(record);
3134
3135        // Create HotVersionRef (using SYSTEM tx for recovery)
3136        let hot_ref = HotVersionRef::new(epoch, offset, TxId::SYSTEM);
3137        let mut versions = self.node_versions.write();
3138        versions.insert(id, VersionIndex::with_initial(hot_ref));
3139
3140        // Update next_node_id if necessary to avoid future collisions
3141        let id_val = id.as_u64();
3142        let _ = self
3143            .next_node_id
3144            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
3145                if id_val >= current {
3146                    Some(id_val + 1)
3147                } else {
3148                    None
3149                }
3150            });
3151    }
3152
3153    /// Creates an edge with a specific ID during recovery.
3154    ///
3155    /// This is used for WAL recovery to restore edges with their original IDs.
3156    #[cfg(not(feature = "tiered-storage"))]
3157    pub fn create_edge_with_id(&self, id: EdgeId, src: NodeId, dst: NodeId, edge_type: &str) {
3158        let epoch = self.current_epoch();
3159        let type_id = self.get_or_create_edge_type_id(edge_type);
3160
3161        let record = EdgeRecord::new(id, src, dst, type_id, epoch);
3162        let chain = VersionChain::with_initial(record, epoch, TxId::SYSTEM);
3163        self.edges.write().insert(id, chain);
3164
3165        // Update adjacency
3166        self.forward_adj.add_edge(src, dst, id);
3167        if let Some(ref backward) = self.backward_adj {
3168            backward.add_edge(dst, src, id);
3169        }
3170
3171        // Update next_edge_id if necessary
3172        let id_val = id.as_u64();
3173        let _ = self
3174            .next_edge_id
3175            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
3176                if id_val >= current {
3177                    Some(id_val + 1)
3178                } else {
3179                    None
3180                }
3181            });
3182    }
3183
3184    /// Creates an edge with a specific ID during recovery.
3185    /// (Tiered storage version)
3186    #[cfg(feature = "tiered-storage")]
3187    pub fn create_edge_with_id(&self, id: EdgeId, src: NodeId, dst: NodeId, edge_type: &str) {
3188        let epoch = self.current_epoch();
3189        let type_id = self.get_or_create_edge_type_id(edge_type);
3190
3191        let record = EdgeRecord::new(id, src, dst, type_id, epoch);
3192
3193        // Allocate record in arena and get offset (create epoch if needed)
3194        let arena = self.arena_allocator.arena_or_create(epoch);
3195        let (offset, _stored) = arena.alloc_value_with_offset(record);
3196
3197        // Create HotVersionRef (using SYSTEM tx for recovery)
3198        let hot_ref = HotVersionRef::new(epoch, offset, TxId::SYSTEM);
3199        let mut versions = self.edge_versions.write();
3200        versions.insert(id, VersionIndex::with_initial(hot_ref));
3201
3202        // Update adjacency
3203        self.forward_adj.add_edge(src, dst, id);
3204        if let Some(ref backward) = self.backward_adj {
3205            backward.add_edge(dst, src, id);
3206        }
3207
3208        // Update next_edge_id if necessary
3209        let id_val = id.as_u64();
3210        let _ = self
3211            .next_edge_id
3212            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
3213                if id_val >= current {
3214                    Some(id_val + 1)
3215                } else {
3216                    None
3217                }
3218            });
3219    }
3220
3221    /// Sets the current epoch during recovery.
3222    pub fn set_epoch(&self, epoch: EpochId) {
3223        self.current_epoch.store(epoch.as_u64(), Ordering::SeqCst);
3224    }
3225}
3226
3227impl Default for LpgStore {
3228    fn default() -> Self {
3229        Self::new()
3230    }
3231}
3232
3233#[cfg(test)]
3234mod tests {
3235    use super::*;
3236
3237    #[test]
3238    fn test_create_node() {
3239        let store = LpgStore::new();
3240
3241        let id = store.create_node(&["Person"]);
3242        assert!(id.is_valid());
3243
3244        let node = store.get_node(id).unwrap();
3245        assert!(node.has_label("Person"));
3246        assert!(!node.has_label("Animal"));
3247    }
3248
3249    #[test]
3250    fn test_create_node_with_props() {
3251        let store = LpgStore::new();
3252
3253        let id = store.create_node_with_props(
3254            &["Person"],
3255            [("name", Value::from("Alice")), ("age", Value::from(30i64))],
3256        );
3257
3258        let node = store.get_node(id).unwrap();
3259        assert_eq!(
3260            node.get_property("name").and_then(|v| v.as_str()),
3261            Some("Alice")
3262        );
3263        assert_eq!(
3264            node.get_property("age").and_then(|v| v.as_int64()),
3265            Some(30)
3266        );
3267    }
3268
3269    #[test]
3270    fn test_delete_node() {
3271        let store = LpgStore::new();
3272
3273        let id = store.create_node(&["Person"]);
3274        assert_eq!(store.node_count(), 1);
3275
3276        assert!(store.delete_node(id));
3277        assert_eq!(store.node_count(), 0);
3278        assert!(store.get_node(id).is_none());
3279
3280        // Double delete should return false
3281        assert!(!store.delete_node(id));
3282    }
3283
3284    #[test]
3285    fn test_create_edge() {
3286        let store = LpgStore::new();
3287
3288        let alice = store.create_node(&["Person"]);
3289        let bob = store.create_node(&["Person"]);
3290
3291        let edge_id = store.create_edge(alice, bob, "KNOWS");
3292        assert!(edge_id.is_valid());
3293
3294        let edge = store.get_edge(edge_id).unwrap();
3295        assert_eq!(edge.src, alice);
3296        assert_eq!(edge.dst, bob);
3297        assert_eq!(edge.edge_type.as_str(), "KNOWS");
3298    }
3299
3300    #[test]
3301    fn test_neighbors() {
3302        let store = LpgStore::new();
3303
3304        let a = store.create_node(&["Person"]);
3305        let b = store.create_node(&["Person"]);
3306        let c = store.create_node(&["Person"]);
3307
3308        store.create_edge(a, b, "KNOWS");
3309        store.create_edge(a, c, "KNOWS");
3310
3311        let outgoing: Vec<_> = store.neighbors(a, Direction::Outgoing).collect();
3312        assert_eq!(outgoing.len(), 2);
3313        assert!(outgoing.contains(&b));
3314        assert!(outgoing.contains(&c));
3315
3316        let incoming: Vec<_> = store.neighbors(b, Direction::Incoming).collect();
3317        assert_eq!(incoming.len(), 1);
3318        assert!(incoming.contains(&a));
3319    }
3320
3321    #[test]
3322    fn test_nodes_by_label() {
3323        let store = LpgStore::new();
3324
3325        let p1 = store.create_node(&["Person"]);
3326        let p2 = store.create_node(&["Person"]);
3327        let _a = store.create_node(&["Animal"]);
3328
3329        let persons = store.nodes_by_label("Person");
3330        assert_eq!(persons.len(), 2);
3331        assert!(persons.contains(&p1));
3332        assert!(persons.contains(&p2));
3333
3334        let animals = store.nodes_by_label("Animal");
3335        assert_eq!(animals.len(), 1);
3336    }
3337
3338    #[test]
3339    fn test_delete_edge() {
3340        let store = LpgStore::new();
3341
3342        let a = store.create_node(&["Person"]);
3343        let b = store.create_node(&["Person"]);
3344        let edge_id = store.create_edge(a, b, "KNOWS");
3345
3346        assert_eq!(store.edge_count(), 1);
3347
3348        assert!(store.delete_edge(edge_id));
3349        assert_eq!(store.edge_count(), 0);
3350        assert!(store.get_edge(edge_id).is_none());
3351    }
3352
3353    // === New tests for improved coverage ===
3354
3355    #[test]
3356    fn test_lpg_store_config() {
3357        // Test with_config
3358        let config = LpgStoreConfig {
3359            backward_edges: false,
3360            initial_node_capacity: 100,
3361            initial_edge_capacity: 200,
3362        };
3363        let store = LpgStore::with_config(config);
3364
3365        // Store should work but without backward adjacency
3366        let a = store.create_node(&["Person"]);
3367        let b = store.create_node(&["Person"]);
3368        store.create_edge(a, b, "KNOWS");
3369
3370        // Outgoing should work
3371        let outgoing: Vec<_> = store.neighbors(a, Direction::Outgoing).collect();
3372        assert_eq!(outgoing.len(), 1);
3373
3374        // Incoming should be empty (no backward adjacency)
3375        let incoming: Vec<_> = store.neighbors(b, Direction::Incoming).collect();
3376        assert_eq!(incoming.len(), 0);
3377    }
3378
3379    #[test]
3380    fn test_epoch_management() {
3381        let store = LpgStore::new();
3382
3383        let epoch0 = store.current_epoch();
3384        assert_eq!(epoch0.as_u64(), 0);
3385
3386        let epoch1 = store.new_epoch();
3387        assert_eq!(epoch1.as_u64(), 1);
3388
3389        let current = store.current_epoch();
3390        assert_eq!(current.as_u64(), 1);
3391    }
3392
3393    #[test]
3394    fn test_node_properties() {
3395        let store = LpgStore::new();
3396        let id = store.create_node(&["Person"]);
3397
3398        // Set and get property
3399        store.set_node_property(id, "name", Value::from("Alice"));
3400        let name = store.get_node_property(id, &"name".into());
3401        assert!(matches!(name, Some(Value::String(s)) if s.as_str() == "Alice"));
3402
3403        // Update property
3404        store.set_node_property(id, "name", Value::from("Bob"));
3405        let name = store.get_node_property(id, &"name".into());
3406        assert!(matches!(name, Some(Value::String(s)) if s.as_str() == "Bob"));
3407
3408        // Remove property
3409        let old = store.remove_node_property(id, "name");
3410        assert!(matches!(old, Some(Value::String(s)) if s.as_str() == "Bob"));
3411
3412        // Property should be gone
3413        let name = store.get_node_property(id, &"name".into());
3414        assert!(name.is_none());
3415
3416        // Remove non-existent property
3417        let none = store.remove_node_property(id, "nonexistent");
3418        assert!(none.is_none());
3419    }
3420
3421    #[test]
3422    fn test_edge_properties() {
3423        let store = LpgStore::new();
3424        let a = store.create_node(&["Person"]);
3425        let b = store.create_node(&["Person"]);
3426        let edge_id = store.create_edge(a, b, "KNOWS");
3427
3428        // Set and get property
3429        store.set_edge_property(edge_id, "since", Value::from(2020i64));
3430        let since = store.get_edge_property(edge_id, &"since".into());
3431        assert_eq!(since.and_then(|v| v.as_int64()), Some(2020));
3432
3433        // Remove property
3434        let old = store.remove_edge_property(edge_id, "since");
3435        assert_eq!(old.and_then(|v| v.as_int64()), Some(2020));
3436
3437        let since = store.get_edge_property(edge_id, &"since".into());
3438        assert!(since.is_none());
3439    }
3440
3441    #[test]
3442    fn test_add_remove_label() {
3443        let store = LpgStore::new();
3444        let id = store.create_node(&["Person"]);
3445
3446        // Add new label
3447        assert!(store.add_label(id, "Employee"));
3448
3449        let node = store.get_node(id).unwrap();
3450        assert!(node.has_label("Person"));
3451        assert!(node.has_label("Employee"));
3452
3453        // Adding same label again should fail
3454        assert!(!store.add_label(id, "Employee"));
3455
3456        // Remove label
3457        assert!(store.remove_label(id, "Employee"));
3458
3459        let node = store.get_node(id).unwrap();
3460        assert!(node.has_label("Person"));
3461        assert!(!node.has_label("Employee"));
3462
3463        // Removing non-existent label should fail
3464        assert!(!store.remove_label(id, "Employee"));
3465        assert!(!store.remove_label(id, "NonExistent"));
3466    }
3467
3468    #[test]
3469    fn test_add_label_to_nonexistent_node() {
3470        let store = LpgStore::new();
3471        let fake_id = NodeId::new(999);
3472        assert!(!store.add_label(fake_id, "Label"));
3473    }
3474
3475    #[test]
3476    fn test_remove_label_from_nonexistent_node() {
3477        let store = LpgStore::new();
3478        let fake_id = NodeId::new(999);
3479        assert!(!store.remove_label(fake_id, "Label"));
3480    }
3481
3482    #[test]
3483    fn test_node_ids() {
3484        let store = LpgStore::new();
3485
3486        let n1 = store.create_node(&["Person"]);
3487        let n2 = store.create_node(&["Person"]);
3488        let n3 = store.create_node(&["Person"]);
3489
3490        let ids = store.node_ids();
3491        assert_eq!(ids.len(), 3);
3492        assert!(ids.contains(&n1));
3493        assert!(ids.contains(&n2));
3494        assert!(ids.contains(&n3));
3495
3496        // Delete one
3497        store.delete_node(n2);
3498        let ids = store.node_ids();
3499        assert_eq!(ids.len(), 2);
3500        assert!(!ids.contains(&n2));
3501    }
3502
3503    #[test]
3504    fn test_delete_node_nonexistent() {
3505        let store = LpgStore::new();
3506        let fake_id = NodeId::new(999);
3507        assert!(!store.delete_node(fake_id));
3508    }
3509
3510    #[test]
3511    fn test_delete_edge_nonexistent() {
3512        let store = LpgStore::new();
3513        let fake_id = EdgeId::new(999);
3514        assert!(!store.delete_edge(fake_id));
3515    }
3516
3517    #[test]
3518    fn test_delete_edge_double() {
3519        let store = LpgStore::new();
3520        let a = store.create_node(&["Person"]);
3521        let b = store.create_node(&["Person"]);
3522        let edge_id = store.create_edge(a, b, "KNOWS");
3523
3524        assert!(store.delete_edge(edge_id));
3525        assert!(!store.delete_edge(edge_id)); // Double delete
3526    }
3527
3528    #[test]
3529    fn test_create_edge_with_props() {
3530        let store = LpgStore::new();
3531        let a = store.create_node(&["Person"]);
3532        let b = store.create_node(&["Person"]);
3533
3534        let edge_id = store.create_edge_with_props(
3535            a,
3536            b,
3537            "KNOWS",
3538            [
3539                ("since", Value::from(2020i64)),
3540                ("weight", Value::from(1.0)),
3541            ],
3542        );
3543
3544        let edge = store.get_edge(edge_id).unwrap();
3545        assert_eq!(
3546            edge.get_property("since").and_then(|v| v.as_int64()),
3547            Some(2020)
3548        );
3549        assert_eq!(
3550            edge.get_property("weight").and_then(|v| v.as_float64()),
3551            Some(1.0)
3552        );
3553    }
3554
3555    #[test]
3556    fn test_delete_node_edges() {
3557        let store = LpgStore::new();
3558
3559        let a = store.create_node(&["Person"]);
3560        let b = store.create_node(&["Person"]);
3561        let c = store.create_node(&["Person"]);
3562
3563        store.create_edge(a, b, "KNOWS"); // a -> b
3564        store.create_edge(c, a, "KNOWS"); // c -> a
3565
3566        assert_eq!(store.edge_count(), 2);
3567
3568        // Delete all edges connected to a
3569        store.delete_node_edges(a);
3570
3571        assert_eq!(store.edge_count(), 0);
3572    }
3573
3574    #[test]
3575    fn test_neighbors_both_directions() {
3576        let store = LpgStore::new();
3577
3578        let a = store.create_node(&["Person"]);
3579        let b = store.create_node(&["Person"]);
3580        let c = store.create_node(&["Person"]);
3581
3582        store.create_edge(a, b, "KNOWS"); // a -> b
3583        store.create_edge(c, a, "KNOWS"); // c -> a
3584
3585        // Direction::Both for node a
3586        let neighbors: Vec<_> = store.neighbors(a, Direction::Both).collect();
3587        assert_eq!(neighbors.len(), 2);
3588        assert!(neighbors.contains(&b)); // outgoing
3589        assert!(neighbors.contains(&c)); // incoming
3590    }
3591
3592    #[test]
3593    fn test_edges_from() {
3594        let store = LpgStore::new();
3595
3596        let a = store.create_node(&["Person"]);
3597        let b = store.create_node(&["Person"]);
3598        let c = store.create_node(&["Person"]);
3599
3600        let e1 = store.create_edge(a, b, "KNOWS");
3601        let e2 = store.create_edge(a, c, "KNOWS");
3602
3603        let edges: Vec<_> = store.edges_from(a, Direction::Outgoing).collect();
3604        assert_eq!(edges.len(), 2);
3605        assert!(edges.iter().any(|(_, e)| *e == e1));
3606        assert!(edges.iter().any(|(_, e)| *e == e2));
3607
3608        // Incoming edges to b
3609        let incoming: Vec<_> = store.edges_from(b, Direction::Incoming).collect();
3610        assert_eq!(incoming.len(), 1);
3611        assert_eq!(incoming[0].1, e1);
3612    }
3613
3614    #[test]
3615    fn test_edges_to() {
3616        let store = LpgStore::new();
3617
3618        let a = store.create_node(&["Person"]);
3619        let b = store.create_node(&["Person"]);
3620        let c = store.create_node(&["Person"]);
3621
3622        let e1 = store.create_edge(a, b, "KNOWS");
3623        let e2 = store.create_edge(c, b, "KNOWS");
3624
3625        // Edges pointing TO b
3626        let to_b = store.edges_to(b);
3627        assert_eq!(to_b.len(), 2);
3628        assert!(to_b.iter().any(|(src, e)| *src == a && *e == e1));
3629        assert!(to_b.iter().any(|(src, e)| *src == c && *e == e2));
3630    }
3631
3632    #[test]
3633    fn test_out_degree_in_degree() {
3634        let store = LpgStore::new();
3635
3636        let a = store.create_node(&["Person"]);
3637        let b = store.create_node(&["Person"]);
3638        let c = store.create_node(&["Person"]);
3639
3640        store.create_edge(a, b, "KNOWS");
3641        store.create_edge(a, c, "KNOWS");
3642        store.create_edge(c, b, "KNOWS");
3643
3644        assert_eq!(store.out_degree(a), 2);
3645        assert_eq!(store.out_degree(b), 0);
3646        assert_eq!(store.out_degree(c), 1);
3647
3648        assert_eq!(store.in_degree(a), 0);
3649        assert_eq!(store.in_degree(b), 2);
3650        assert_eq!(store.in_degree(c), 1);
3651    }
3652
3653    #[test]
3654    fn test_edge_type() {
3655        let store = LpgStore::new();
3656
3657        let a = store.create_node(&["Person"]);
3658        let b = store.create_node(&["Person"]);
3659        let edge_id = store.create_edge(a, b, "KNOWS");
3660
3661        let edge_type = store.edge_type(edge_id);
3662        assert_eq!(edge_type.as_deref(), Some("KNOWS"));
3663
3664        // Non-existent edge
3665        let fake_id = EdgeId::new(999);
3666        assert!(store.edge_type(fake_id).is_none());
3667    }
3668
3669    #[test]
3670    fn test_count_methods() {
3671        let store = LpgStore::new();
3672
3673        assert_eq!(store.label_count(), 0);
3674        assert_eq!(store.edge_type_count(), 0);
3675        assert_eq!(store.property_key_count(), 0);
3676
3677        let a = store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
3678        let b = store.create_node(&["Company"]);
3679        store.create_edge_with_props(a, b, "WORKS_AT", [("since", Value::from(2020i64))]);
3680
3681        assert_eq!(store.label_count(), 2); // Person, Company
3682        assert_eq!(store.edge_type_count(), 1); // WORKS_AT
3683        assert_eq!(store.property_key_count(), 2); // age, since
3684    }
3685
3686    #[test]
3687    fn test_all_nodes_and_edges() {
3688        let store = LpgStore::new();
3689
3690        let a = store.create_node(&["Person"]);
3691        let b = store.create_node(&["Person"]);
3692        store.create_edge(a, b, "KNOWS");
3693
3694        let nodes: Vec<_> = store.all_nodes().collect();
3695        assert_eq!(nodes.len(), 2);
3696
3697        let edges: Vec<_> = store.all_edges().collect();
3698        assert_eq!(edges.len(), 1);
3699    }
3700
3701    #[test]
3702    fn test_all_labels_and_edge_types() {
3703        let store = LpgStore::new();
3704
3705        store.create_node(&["Person"]);
3706        store.create_node(&["Company"]);
3707        let a = store.create_node(&["Animal"]);
3708        let b = store.create_node(&["Animal"]);
3709        store.create_edge(a, b, "EATS");
3710
3711        let labels = store.all_labels();
3712        assert_eq!(labels.len(), 3);
3713        assert!(labels.contains(&"Person".to_string()));
3714        assert!(labels.contains(&"Company".to_string()));
3715        assert!(labels.contains(&"Animal".to_string()));
3716
3717        let edge_types = store.all_edge_types();
3718        assert_eq!(edge_types.len(), 1);
3719        assert!(edge_types.contains(&"EATS".to_string()));
3720    }
3721
3722    #[test]
3723    fn test_all_property_keys() {
3724        let store = LpgStore::new();
3725
3726        let a = store.create_node_with_props(&["Person"], [("name", Value::from("Alice"))]);
3727        let b = store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
3728        store.create_edge_with_props(a, b, "KNOWS", [("since", Value::from(2020i64))]);
3729
3730        let keys = store.all_property_keys();
3731        assert!(keys.contains(&"name".to_string()));
3732        assert!(keys.contains(&"age".to_string()));
3733        assert!(keys.contains(&"since".to_string()));
3734    }
3735
3736    #[test]
3737    fn test_nodes_with_label() {
3738        let store = LpgStore::new();
3739
3740        store.create_node(&["Person"]);
3741        store.create_node(&["Person"]);
3742        store.create_node(&["Company"]);
3743
3744        let persons: Vec<_> = store.nodes_with_label("Person").collect();
3745        assert_eq!(persons.len(), 2);
3746
3747        let companies: Vec<_> = store.nodes_with_label("Company").collect();
3748        assert_eq!(companies.len(), 1);
3749
3750        let none: Vec<_> = store.nodes_with_label("NonExistent").collect();
3751        assert_eq!(none.len(), 0);
3752    }
3753
3754    #[test]
3755    fn test_edges_with_type() {
3756        let store = LpgStore::new();
3757
3758        let a = store.create_node(&["Person"]);
3759        let b = store.create_node(&["Person"]);
3760        let c = store.create_node(&["Company"]);
3761
3762        store.create_edge(a, b, "KNOWS");
3763        store.create_edge(a, c, "WORKS_AT");
3764
3765        let knows: Vec<_> = store.edges_with_type("KNOWS").collect();
3766        assert_eq!(knows.len(), 1);
3767
3768        let works_at: Vec<_> = store.edges_with_type("WORKS_AT").collect();
3769        assert_eq!(works_at.len(), 1);
3770
3771        let none: Vec<_> = store.edges_with_type("NonExistent").collect();
3772        assert_eq!(none.len(), 0);
3773    }
3774
3775    #[test]
3776    fn test_nodes_by_label_nonexistent() {
3777        let store = LpgStore::new();
3778        store.create_node(&["Person"]);
3779
3780        let empty = store.nodes_by_label("NonExistent");
3781        assert!(empty.is_empty());
3782    }
3783
3784    #[test]
3785    fn test_statistics() {
3786        let store = LpgStore::new();
3787
3788        let a = store.create_node(&["Person"]);
3789        let b = store.create_node(&["Person"]);
3790        let c = store.create_node(&["Company"]);
3791
3792        store.create_edge(a, b, "KNOWS");
3793        store.create_edge(a, c, "WORKS_AT");
3794
3795        store.compute_statistics();
3796        let stats = store.statistics();
3797
3798        assert_eq!(stats.total_nodes, 3);
3799        assert_eq!(stats.total_edges, 2);
3800
3801        // Estimates
3802        let person_card = store.estimate_label_cardinality("Person");
3803        assert!(person_card > 0.0);
3804
3805        let avg_degree = store.estimate_avg_degree("KNOWS", true);
3806        assert!(avg_degree >= 0.0);
3807    }
3808
3809    #[test]
3810    fn test_zone_maps() {
3811        let store = LpgStore::new();
3812
3813        store.create_node_with_props(&["Person"], [("age", Value::from(25i64))]);
3814        store.create_node_with_props(&["Person"], [("age", Value::from(35i64))]);
3815
3816        // Zone map should indicate possible matches (30 is within [25, 35] range)
3817        let might_match =
3818            store.node_property_might_match(&"age".into(), CompareOp::Eq, &Value::from(30i64));
3819        // Zone maps return true conservatively when value is within min/max range
3820        assert!(might_match);
3821
3822        let zone = store.node_property_zone_map(&"age".into());
3823        assert!(zone.is_some());
3824
3825        // Non-existent property
3826        let no_zone = store.node_property_zone_map(&"nonexistent".into());
3827        assert!(no_zone.is_none());
3828
3829        // Edge zone maps
3830        let a = store.create_node(&["A"]);
3831        let b = store.create_node(&["B"]);
3832        store.create_edge_with_props(a, b, "REL", [("weight", Value::from(1.0))]);
3833
3834        let edge_zone = store.edge_property_zone_map(&"weight".into());
3835        assert!(edge_zone.is_some());
3836    }
3837
3838    #[test]
3839    fn test_rebuild_zone_maps() {
3840        let store = LpgStore::new();
3841        store.create_node_with_props(&["Person"], [("age", Value::from(25i64))]);
3842
3843        // Should not panic
3844        store.rebuild_zone_maps();
3845    }
3846
3847    #[test]
3848    fn test_create_node_with_id() {
3849        let store = LpgStore::new();
3850
3851        let specific_id = NodeId::new(100);
3852        store.create_node_with_id(specific_id, &["Person", "Employee"]);
3853
3854        let node = store.get_node(specific_id).unwrap();
3855        assert!(node.has_label("Person"));
3856        assert!(node.has_label("Employee"));
3857
3858        // Next auto-generated ID should be > 100
3859        let next = store.create_node(&["Other"]);
3860        assert!(next.as_u64() > 100);
3861    }
3862
3863    #[test]
3864    fn test_create_edge_with_id() {
3865        let store = LpgStore::new();
3866
3867        let a = store.create_node(&["A"]);
3868        let b = store.create_node(&["B"]);
3869
3870        let specific_id = EdgeId::new(500);
3871        store.create_edge_with_id(specific_id, a, b, "REL");
3872
3873        let edge = store.get_edge(specific_id).unwrap();
3874        assert_eq!(edge.src, a);
3875        assert_eq!(edge.dst, b);
3876        assert_eq!(edge.edge_type.as_str(), "REL");
3877
3878        // Next auto-generated ID should be > 500
3879        let next = store.create_edge(a, b, "OTHER");
3880        assert!(next.as_u64() > 500);
3881    }
3882
3883    #[test]
3884    fn test_set_epoch() {
3885        let store = LpgStore::new();
3886
3887        assert_eq!(store.current_epoch().as_u64(), 0);
3888
3889        store.set_epoch(EpochId::new(42));
3890        assert_eq!(store.current_epoch().as_u64(), 42);
3891    }
3892
3893    #[test]
3894    fn test_get_node_nonexistent() {
3895        let store = LpgStore::new();
3896        let fake_id = NodeId::new(999);
3897        assert!(store.get_node(fake_id).is_none());
3898    }
3899
3900    #[test]
3901    fn test_get_edge_nonexistent() {
3902        let store = LpgStore::new();
3903        let fake_id = EdgeId::new(999);
3904        assert!(store.get_edge(fake_id).is_none());
3905    }
3906
3907    #[test]
3908    fn test_multiple_labels() {
3909        let store = LpgStore::new();
3910
3911        let id = store.create_node(&["Person", "Employee", "Manager"]);
3912        let node = store.get_node(id).unwrap();
3913
3914        assert!(node.has_label("Person"));
3915        assert!(node.has_label("Employee"));
3916        assert!(node.has_label("Manager"));
3917        assert!(!node.has_label("Other"));
3918    }
3919
3920    #[test]
3921    fn test_default_impl() {
3922        let store: LpgStore = Default::default();
3923        assert_eq!(store.node_count(), 0);
3924        assert_eq!(store.edge_count(), 0);
3925    }
3926
3927    #[test]
3928    fn test_edges_from_both_directions() {
3929        let store = LpgStore::new();
3930
3931        let a = store.create_node(&["A"]);
3932        let b = store.create_node(&["B"]);
3933        let c = store.create_node(&["C"]);
3934
3935        let e1 = store.create_edge(a, b, "R1"); // a -> b
3936        let e2 = store.create_edge(c, a, "R2"); // c -> a
3937
3938        // Both directions from a
3939        let edges: Vec<_> = store.edges_from(a, Direction::Both).collect();
3940        assert_eq!(edges.len(), 2);
3941        assert!(edges.iter().any(|(_, e)| *e == e1)); // outgoing
3942        assert!(edges.iter().any(|(_, e)| *e == e2)); // incoming
3943    }
3944
3945    #[test]
3946    fn test_no_backward_adj_in_degree() {
3947        let config = LpgStoreConfig {
3948            backward_edges: false,
3949            initial_node_capacity: 10,
3950            initial_edge_capacity: 10,
3951        };
3952        let store = LpgStore::with_config(config);
3953
3954        let a = store.create_node(&["A"]);
3955        let b = store.create_node(&["B"]);
3956        store.create_edge(a, b, "R");
3957
3958        // in_degree should still work (falls back to scanning)
3959        let degree = store.in_degree(b);
3960        assert_eq!(degree, 1);
3961    }
3962
3963    #[test]
3964    fn test_no_backward_adj_edges_to() {
3965        let config = LpgStoreConfig {
3966            backward_edges: false,
3967            initial_node_capacity: 10,
3968            initial_edge_capacity: 10,
3969        };
3970        let store = LpgStore::with_config(config);
3971
3972        let a = store.create_node(&["A"]);
3973        let b = store.create_node(&["B"]);
3974        let e = store.create_edge(a, b, "R");
3975
3976        // edges_to should still work (falls back to scanning)
3977        let edges = store.edges_to(b);
3978        assert_eq!(edges.len(), 1);
3979        assert_eq!(edges[0].1, e);
3980    }
3981
3982    #[test]
3983    fn test_node_versioned_creation() {
3984        let store = LpgStore::new();
3985
3986        let epoch = store.new_epoch();
3987        let tx_id = TxId::new(1);
3988
3989        let id = store.create_node_versioned(&["Person"], epoch, tx_id);
3990        assert!(store.get_node(id).is_some());
3991    }
3992
3993    #[test]
3994    fn test_edge_versioned_creation() {
3995        let store = LpgStore::new();
3996
3997        let a = store.create_node(&["A"]);
3998        let b = store.create_node(&["B"]);
3999
4000        let epoch = store.new_epoch();
4001        let tx_id = TxId::new(1);
4002
4003        let edge_id = store.create_edge_versioned(a, b, "REL", epoch, tx_id);
4004        assert!(store.get_edge(edge_id).is_some());
4005    }
4006
4007    #[test]
4008    fn test_node_with_props_versioned() {
4009        let store = LpgStore::new();
4010
4011        let epoch = store.new_epoch();
4012        let tx_id = TxId::new(1);
4013
4014        let id = store.create_node_with_props_versioned(
4015            &["Person"],
4016            [("name", Value::from("Alice"))],
4017            epoch,
4018            tx_id,
4019        );
4020
4021        let node = store.get_node(id).unwrap();
4022        assert_eq!(
4023            node.get_property("name").and_then(|v| v.as_str()),
4024            Some("Alice")
4025        );
4026    }
4027
4028    #[test]
4029    fn test_discard_uncommitted_versions() {
4030        let store = LpgStore::new();
4031
4032        let epoch = store.new_epoch();
4033        let tx_id = TxId::new(42);
4034
4035        // Create node with specific tx
4036        let node_id = store.create_node_versioned(&["Person"], epoch, tx_id);
4037        assert!(store.get_node(node_id).is_some());
4038
4039        // Discard uncommitted versions for this tx
4040        store.discard_uncommitted_versions(tx_id);
4041
4042        // Node should be gone (version chain was removed)
4043        assert!(store.get_node(node_id).is_none());
4044    }
4045
4046    // === Property Index Tests ===
4047
4048    #[test]
4049    fn test_property_index_create_and_lookup() {
4050        let store = LpgStore::new();
4051
4052        // Create nodes with properties
4053        let alice = store.create_node(&["Person"]);
4054        let bob = store.create_node(&["Person"]);
4055        let charlie = store.create_node(&["Person"]);
4056
4057        store.set_node_property(alice, "city", Value::from("NYC"));
4058        store.set_node_property(bob, "city", Value::from("NYC"));
4059        store.set_node_property(charlie, "city", Value::from("LA"));
4060
4061        // Before indexing, lookup still works (via scan)
4062        let nyc_people = store.find_nodes_by_property("city", &Value::from("NYC"));
4063        assert_eq!(nyc_people.len(), 2);
4064
4065        // Create index
4066        store.create_property_index("city");
4067        assert!(store.has_property_index("city"));
4068
4069        // Indexed lookup should return same results
4070        let nyc_people = store.find_nodes_by_property("city", &Value::from("NYC"));
4071        assert_eq!(nyc_people.len(), 2);
4072        assert!(nyc_people.contains(&alice));
4073        assert!(nyc_people.contains(&bob));
4074
4075        let la_people = store.find_nodes_by_property("city", &Value::from("LA"));
4076        assert_eq!(la_people.len(), 1);
4077        assert!(la_people.contains(&charlie));
4078    }
4079
4080    #[test]
4081    fn test_property_index_maintained_on_update() {
4082        let store = LpgStore::new();
4083
4084        // Create index first
4085        store.create_property_index("status");
4086
4087        let node = store.create_node(&["Task"]);
4088        store.set_node_property(node, "status", Value::from("pending"));
4089
4090        // Should find by initial value
4091        let pending = store.find_nodes_by_property("status", &Value::from("pending"));
4092        assert_eq!(pending.len(), 1);
4093        assert!(pending.contains(&node));
4094
4095        // Update the property
4096        store.set_node_property(node, "status", Value::from("done"));
4097
4098        // Old value should not find it
4099        let pending = store.find_nodes_by_property("status", &Value::from("pending"));
4100        assert!(pending.is_empty());
4101
4102        // New value should find it
4103        let done = store.find_nodes_by_property("status", &Value::from("done"));
4104        assert_eq!(done.len(), 1);
4105        assert!(done.contains(&node));
4106    }
4107
4108    #[test]
4109    fn test_property_index_maintained_on_remove() {
4110        let store = LpgStore::new();
4111
4112        store.create_property_index("tag");
4113
4114        let node = store.create_node(&["Item"]);
4115        store.set_node_property(node, "tag", Value::from("important"));
4116
4117        // Should find it
4118        let found = store.find_nodes_by_property("tag", &Value::from("important"));
4119        assert_eq!(found.len(), 1);
4120
4121        // Remove the property
4122        store.remove_node_property(node, "tag");
4123
4124        // Should no longer find it
4125        let found = store.find_nodes_by_property("tag", &Value::from("important"));
4126        assert!(found.is_empty());
4127    }
4128
4129    #[test]
4130    fn test_property_index_drop() {
4131        let store = LpgStore::new();
4132
4133        store.create_property_index("key");
4134        assert!(store.has_property_index("key"));
4135
4136        assert!(store.drop_property_index("key"));
4137        assert!(!store.has_property_index("key"));
4138
4139        // Dropping non-existent index returns false
4140        assert!(!store.drop_property_index("key"));
4141    }
4142
4143    #[test]
4144    fn test_property_index_multiple_values() {
4145        let store = LpgStore::new();
4146
4147        store.create_property_index("age");
4148
4149        // Create multiple nodes with same and different ages
4150        let n1 = store.create_node(&["Person"]);
4151        let n2 = store.create_node(&["Person"]);
4152        let n3 = store.create_node(&["Person"]);
4153        let n4 = store.create_node(&["Person"]);
4154
4155        store.set_node_property(n1, "age", Value::from(25i64));
4156        store.set_node_property(n2, "age", Value::from(25i64));
4157        store.set_node_property(n3, "age", Value::from(30i64));
4158        store.set_node_property(n4, "age", Value::from(25i64));
4159
4160        let age_25 = store.find_nodes_by_property("age", &Value::from(25i64));
4161        assert_eq!(age_25.len(), 3);
4162
4163        let age_30 = store.find_nodes_by_property("age", &Value::from(30i64));
4164        assert_eq!(age_30.len(), 1);
4165
4166        let age_40 = store.find_nodes_by_property("age", &Value::from(40i64));
4167        assert!(age_40.is_empty());
4168    }
4169
4170    #[test]
4171    fn test_property_index_builds_from_existing_data() {
4172        let store = LpgStore::new();
4173
4174        // Create nodes first
4175        let n1 = store.create_node(&["Person"]);
4176        let n2 = store.create_node(&["Person"]);
4177        store.set_node_property(n1, "email", Value::from("alice@example.com"));
4178        store.set_node_property(n2, "email", Value::from("bob@example.com"));
4179
4180        // Create index after data exists
4181        store.create_property_index("email");
4182
4183        // Index should include existing data
4184        let alice = store.find_nodes_by_property("email", &Value::from("alice@example.com"));
4185        assert_eq!(alice.len(), 1);
4186        assert!(alice.contains(&n1));
4187
4188        let bob = store.find_nodes_by_property("email", &Value::from("bob@example.com"));
4189        assert_eq!(bob.len(), 1);
4190        assert!(bob.contains(&n2));
4191    }
4192
4193    #[test]
4194    fn test_get_node_property_batch() {
4195        let store = LpgStore::new();
4196
4197        let n1 = store.create_node(&["Person"]);
4198        let n2 = store.create_node(&["Person"]);
4199        let n3 = store.create_node(&["Person"]);
4200
4201        store.set_node_property(n1, "age", Value::from(25i64));
4202        store.set_node_property(n2, "age", Value::from(30i64));
4203        // n3 has no age property
4204
4205        let age_key = PropertyKey::new("age");
4206        let values = store.get_node_property_batch(&[n1, n2, n3], &age_key);
4207
4208        assert_eq!(values.len(), 3);
4209        assert_eq!(values[0], Some(Value::from(25i64)));
4210        assert_eq!(values[1], Some(Value::from(30i64)));
4211        assert_eq!(values[2], None);
4212    }
4213
4214    #[test]
4215    fn test_get_node_property_batch_empty() {
4216        let store = LpgStore::new();
4217        let key = PropertyKey::new("any");
4218
4219        let values = store.get_node_property_batch(&[], &key);
4220        assert!(values.is_empty());
4221    }
4222
4223    #[test]
4224    fn test_get_nodes_properties_batch() {
4225        let store = LpgStore::new();
4226
4227        let n1 = store.create_node(&["Person"]);
4228        let n2 = store.create_node(&["Person"]);
4229        let n3 = store.create_node(&["Person"]);
4230
4231        store.set_node_property(n1, "name", Value::from("Alice"));
4232        store.set_node_property(n1, "age", Value::from(25i64));
4233        store.set_node_property(n2, "name", Value::from("Bob"));
4234        // n3 has no properties
4235
4236        let all_props = store.get_nodes_properties_batch(&[n1, n2, n3]);
4237
4238        assert_eq!(all_props.len(), 3);
4239        assert_eq!(all_props[0].len(), 2); // name and age
4240        assert_eq!(all_props[1].len(), 1); // name only
4241        assert_eq!(all_props[2].len(), 0); // no properties
4242
4243        assert_eq!(
4244            all_props[0].get(&PropertyKey::new("name")),
4245            Some(&Value::from("Alice"))
4246        );
4247        assert_eq!(
4248            all_props[1].get(&PropertyKey::new("name")),
4249            Some(&Value::from("Bob"))
4250        );
4251    }
4252
4253    #[test]
4254    fn test_get_nodes_properties_batch_empty() {
4255        let store = LpgStore::new();
4256
4257        let all_props = store.get_nodes_properties_batch(&[]);
4258        assert!(all_props.is_empty());
4259    }
4260
4261    #[test]
4262    fn test_get_nodes_properties_selective_batch() {
4263        let store = LpgStore::new();
4264
4265        let n1 = store.create_node(&["Person"]);
4266        let n2 = store.create_node(&["Person"]);
4267
4268        // Set multiple properties
4269        store.set_node_property(n1, "name", Value::from("Alice"));
4270        store.set_node_property(n1, "age", Value::from(25i64));
4271        store.set_node_property(n1, "email", Value::from("alice@example.com"));
4272        store.set_node_property(n2, "name", Value::from("Bob"));
4273        store.set_node_property(n2, "age", Value::from(30i64));
4274        store.set_node_property(n2, "city", Value::from("NYC"));
4275
4276        // Request only name and age (not email or city)
4277        let keys = vec![PropertyKey::new("name"), PropertyKey::new("age")];
4278        let props = store.get_nodes_properties_selective_batch(&[n1, n2], &keys);
4279
4280        assert_eq!(props.len(), 2);
4281
4282        // n1: should have name and age, but NOT email
4283        assert_eq!(props[0].len(), 2);
4284        assert_eq!(
4285            props[0].get(&PropertyKey::new("name")),
4286            Some(&Value::from("Alice"))
4287        );
4288        assert_eq!(
4289            props[0].get(&PropertyKey::new("age")),
4290            Some(&Value::from(25i64))
4291        );
4292        assert_eq!(props[0].get(&PropertyKey::new("email")), None);
4293
4294        // n2: should have name and age, but NOT city
4295        assert_eq!(props[1].len(), 2);
4296        assert_eq!(
4297            props[1].get(&PropertyKey::new("name")),
4298            Some(&Value::from("Bob"))
4299        );
4300        assert_eq!(
4301            props[1].get(&PropertyKey::new("age")),
4302            Some(&Value::from(30i64))
4303        );
4304        assert_eq!(props[1].get(&PropertyKey::new("city")), None);
4305    }
4306
4307    #[test]
4308    fn test_get_nodes_properties_selective_batch_empty_keys() {
4309        let store = LpgStore::new();
4310
4311        let n1 = store.create_node(&["Person"]);
4312        store.set_node_property(n1, "name", Value::from("Alice"));
4313
4314        // Request no properties
4315        let props = store.get_nodes_properties_selective_batch(&[n1], &[]);
4316
4317        assert_eq!(props.len(), 1);
4318        assert!(props[0].is_empty()); // Empty map when no keys requested
4319    }
4320
4321    #[test]
4322    fn test_get_nodes_properties_selective_batch_missing_keys() {
4323        let store = LpgStore::new();
4324
4325        let n1 = store.create_node(&["Person"]);
4326        store.set_node_property(n1, "name", Value::from("Alice"));
4327
4328        // Request a property that doesn't exist
4329        let keys = vec![PropertyKey::new("nonexistent"), PropertyKey::new("name")];
4330        let props = store.get_nodes_properties_selective_batch(&[n1], &keys);
4331
4332        assert_eq!(props.len(), 1);
4333        assert_eq!(props[0].len(), 1); // Only name exists
4334        assert_eq!(
4335            props[0].get(&PropertyKey::new("name")),
4336            Some(&Value::from("Alice"))
4337        );
4338    }
4339
4340    // === Range Query Tests ===
4341
4342    #[test]
4343    fn test_find_nodes_in_range_inclusive() {
4344        let store = LpgStore::new();
4345
4346        let n1 = store.create_node_with_props(&["Person"], [("age", Value::from(20i64))]);
4347        let n2 = store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
4348        let n3 = store.create_node_with_props(&["Person"], [("age", Value::from(40i64))]);
4349        let _n4 = store.create_node_with_props(&["Person"], [("age", Value::from(50i64))]);
4350
4351        // age >= 20 AND age <= 40 (inclusive both sides)
4352        let result = store.find_nodes_in_range(
4353            "age",
4354            Some(&Value::from(20i64)),
4355            Some(&Value::from(40i64)),
4356            true,
4357            true,
4358        );
4359        assert_eq!(result.len(), 3);
4360        assert!(result.contains(&n1));
4361        assert!(result.contains(&n2));
4362        assert!(result.contains(&n3));
4363    }
4364
4365    #[test]
4366    fn test_find_nodes_in_range_exclusive() {
4367        let store = LpgStore::new();
4368
4369        store.create_node_with_props(&["Person"], [("age", Value::from(20i64))]);
4370        let n2 = store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
4371        store.create_node_with_props(&["Person"], [("age", Value::from(40i64))]);
4372
4373        // age > 20 AND age < 40 (exclusive both sides)
4374        let result = store.find_nodes_in_range(
4375            "age",
4376            Some(&Value::from(20i64)),
4377            Some(&Value::from(40i64)),
4378            false,
4379            false,
4380        );
4381        assert_eq!(result.len(), 1);
4382        assert!(result.contains(&n2));
4383    }
4384
4385    #[test]
4386    fn test_find_nodes_in_range_open_ended() {
4387        let store = LpgStore::new();
4388
4389        store.create_node_with_props(&["Person"], [("age", Value::from(20i64))]);
4390        store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
4391        let n3 = store.create_node_with_props(&["Person"], [("age", Value::from(40i64))]);
4392        let n4 = store.create_node_with_props(&["Person"], [("age", Value::from(50i64))]);
4393
4394        // age >= 35 (no upper bound)
4395        let result = store.find_nodes_in_range("age", Some(&Value::from(35i64)), None, true, true);
4396        assert_eq!(result.len(), 2);
4397        assert!(result.contains(&n3));
4398        assert!(result.contains(&n4));
4399
4400        // age <= 25 (no lower bound)
4401        let result = store.find_nodes_in_range("age", None, Some(&Value::from(25i64)), true, true);
4402        assert_eq!(result.len(), 1);
4403    }
4404
4405    #[test]
4406    fn test_find_nodes_in_range_empty_result() {
4407        let store = LpgStore::new();
4408
4409        store.create_node_with_props(&["Person"], [("age", Value::from(20i64))]);
4410
4411        // Range that doesn't match anything
4412        let result = store.find_nodes_in_range(
4413            "age",
4414            Some(&Value::from(100i64)),
4415            Some(&Value::from(200i64)),
4416            true,
4417            true,
4418        );
4419        assert!(result.is_empty());
4420    }
4421
4422    #[test]
4423    fn test_find_nodes_in_range_nonexistent_property() {
4424        let store = LpgStore::new();
4425
4426        store.create_node_with_props(&["Person"], [("age", Value::from(20i64))]);
4427
4428        let result = store.find_nodes_in_range(
4429            "weight",
4430            Some(&Value::from(50i64)),
4431            Some(&Value::from(100i64)),
4432            true,
4433            true,
4434        );
4435        assert!(result.is_empty());
4436    }
4437
4438    // === Multi-Property Query Tests ===
4439
4440    #[test]
4441    fn test_find_nodes_by_properties_multiple_conditions() {
4442        let store = LpgStore::new();
4443
4444        let alice = store.create_node_with_props(
4445            &["Person"],
4446            [("name", Value::from("Alice")), ("city", Value::from("NYC"))],
4447        );
4448        store.create_node_with_props(
4449            &["Person"],
4450            [("name", Value::from("Bob")), ("city", Value::from("NYC"))],
4451        );
4452        store.create_node_with_props(
4453            &["Person"],
4454            [("name", Value::from("Alice")), ("city", Value::from("LA"))],
4455        );
4456
4457        // Match name="Alice" AND city="NYC"
4458        let result = store.find_nodes_by_properties(&[
4459            ("name", Value::from("Alice")),
4460            ("city", Value::from("NYC")),
4461        ]);
4462        assert_eq!(result.len(), 1);
4463        assert!(result.contains(&alice));
4464    }
4465
4466    #[test]
4467    fn test_find_nodes_by_properties_empty_conditions() {
4468        let store = LpgStore::new();
4469
4470        store.create_node(&["Person"]);
4471        store.create_node(&["Person"]);
4472
4473        // Empty conditions should return all nodes
4474        let result = store.find_nodes_by_properties(&[]);
4475        assert_eq!(result.len(), 2);
4476    }
4477
4478    #[test]
4479    fn test_find_nodes_by_properties_no_match() {
4480        let store = LpgStore::new();
4481
4482        store.create_node_with_props(&["Person"], [("name", Value::from("Alice"))]);
4483
4484        let result = store.find_nodes_by_properties(&[("name", Value::from("Nobody"))]);
4485        assert!(result.is_empty());
4486    }
4487
4488    #[test]
4489    fn test_find_nodes_by_properties_with_index() {
4490        let store = LpgStore::new();
4491
4492        // Create index on name
4493        store.create_property_index("name");
4494
4495        let alice = store.create_node_with_props(
4496            &["Person"],
4497            [("name", Value::from("Alice")), ("age", Value::from(30i64))],
4498        );
4499        store.create_node_with_props(
4500            &["Person"],
4501            [("name", Value::from("Bob")), ("age", Value::from(30i64))],
4502        );
4503
4504        // Index should accelerate the lookup
4505        let result = store.find_nodes_by_properties(&[
4506            ("name", Value::from("Alice")),
4507            ("age", Value::from(30i64)),
4508        ]);
4509        assert_eq!(result.len(), 1);
4510        assert!(result.contains(&alice));
4511    }
4512
4513    // === Cardinality Estimation Tests ===
4514
4515    #[test]
4516    fn test_estimate_label_cardinality() {
4517        let store = LpgStore::new();
4518
4519        store.create_node(&["Person"]);
4520        store.create_node(&["Person"]);
4521        store.create_node(&["Animal"]);
4522
4523        store.ensure_statistics_fresh();
4524
4525        let person_est = store.estimate_label_cardinality("Person");
4526        let animal_est = store.estimate_label_cardinality("Animal");
4527        let unknown_est = store.estimate_label_cardinality("Unknown");
4528
4529        assert!(
4530            person_est >= 1.0,
4531            "Person should have cardinality >= 1, got {person_est}"
4532        );
4533        assert!(
4534            animal_est >= 1.0,
4535            "Animal should have cardinality >= 1, got {animal_est}"
4536        );
4537        // Unknown label should return some default (not panic)
4538        assert!(unknown_est >= 0.0);
4539    }
4540
4541    #[test]
4542    fn test_estimate_avg_degree() {
4543        let store = LpgStore::new();
4544
4545        let a = store.create_node(&["Person"]);
4546        let b = store.create_node(&["Person"]);
4547        let c = store.create_node(&["Person"]);
4548
4549        store.create_edge(a, b, "KNOWS");
4550        store.create_edge(a, c, "KNOWS");
4551        store.create_edge(b, c, "KNOWS");
4552
4553        store.ensure_statistics_fresh();
4554
4555        let outgoing = store.estimate_avg_degree("KNOWS", true);
4556        let incoming = store.estimate_avg_degree("KNOWS", false);
4557
4558        assert!(
4559            outgoing > 0.0,
4560            "Outgoing degree should be > 0, got {outgoing}"
4561        );
4562        assert!(
4563            incoming > 0.0,
4564            "Incoming degree should be > 0, got {incoming}"
4565        );
4566    }
4567
4568    // === Delete operations ===
4569
4570    #[test]
4571    fn test_delete_node_does_not_cascade() {
4572        let store = LpgStore::new();
4573
4574        let a = store.create_node(&["A"]);
4575        let b = store.create_node(&["B"]);
4576        let e = store.create_edge(a, b, "KNOWS");
4577
4578        assert!(store.delete_node(a));
4579        assert!(store.get_node(a).is_none());
4580
4581        // Edges are NOT automatically deleted (non-detach delete)
4582        assert!(
4583            store.get_edge(e).is_some(),
4584            "Edge should survive non-detach node delete"
4585        );
4586    }
4587
4588    #[test]
4589    fn test_delete_already_deleted_node() {
4590        let store = LpgStore::new();
4591        let a = store.create_node(&["A"]);
4592
4593        assert!(store.delete_node(a));
4594        // Second delete should return false (already deleted)
4595        assert!(!store.delete_node(a));
4596    }
4597
4598    #[test]
4599    fn test_delete_nonexistent_node() {
4600        let store = LpgStore::new();
4601        assert!(!store.delete_node(NodeId::new(999)));
4602    }
4603}