Skip to main content

grafeo_core/graph/lpg/
store.rs

1//! The in-memory LPG graph store.
2//!
3//! This is where your nodes and edges actually live. Most users interact
4//! through [`GrafeoDB`](grafeo_engine::GrafeoDB), but algorithm implementers
5//! sometimes need the raw [`LpgStore`] for direct adjacency traversal.
6//!
7//! Key features:
8//! - MVCC versioning - concurrent readers don't block each other
9//! - Columnar properties with zone maps for fast filtering
10//! - Forward and backward adjacency indexes
11
12use super::property::CompareOp;
13use super::{Edge, EdgeRecord, Node, NodeRecord, PropertyStorage};
14use crate::graph::Direction;
15use crate::index::adjacency::ChunkedAdjacency;
16use crate::index::zone_map::ZoneMapEntry;
17use crate::statistics::{EdgeTypeStatistics, LabelStatistics, Statistics};
18use arcstr::ArcStr;
19use dashmap::DashMap;
20#[cfg(not(feature = "tiered-storage"))]
21use grafeo_common::mvcc::VersionChain;
22use grafeo_common::types::{EdgeId, EpochId, HashableValue, NodeId, PropertyKey, TxId, Value};
23use grafeo_common::utils::hash::{FxHashMap, FxHashSet};
24use parking_lot::RwLock;
25use std::cmp::Ordering as CmpOrdering;
26#[cfg(any(feature = "tiered-storage", feature = "vector-index"))]
27use std::sync::Arc;
28use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
29
30#[cfg(feature = "vector-index")]
31use crate::index::vector::HnswIndex;
32
33/// Compares two values for ordering (used for range checks).
34fn compare_values_for_range(a: &Value, b: &Value) -> Option<CmpOrdering> {
35    match (a, b) {
36        (Value::Int64(a), Value::Int64(b)) => Some(a.cmp(b)),
37        (Value::Float64(a), Value::Float64(b)) => a.partial_cmp(b),
38        (Value::String(a), Value::String(b)) => Some(a.cmp(b)),
39        (Value::Bool(a), Value::Bool(b)) => Some(a.cmp(b)),
40        _ => None,
41    }
42}
43
44/// Checks if a value is within a range.
45fn value_in_range(
46    value: &Value,
47    min: Option<&Value>,
48    max: Option<&Value>,
49    min_inclusive: bool,
50    max_inclusive: bool,
51) -> bool {
52    // Check lower bound
53    if let Some(min_val) = min {
54        match compare_values_for_range(value, min_val) {
55            Some(CmpOrdering::Less) => return false,
56            Some(CmpOrdering::Equal) if !min_inclusive => return false,
57            None => return false, // Can't compare
58            _ => {}
59        }
60    }
61
62    // Check upper bound
63    if let Some(max_val) = max {
64        match compare_values_for_range(value, max_val) {
65            Some(CmpOrdering::Greater) => return false,
66            Some(CmpOrdering::Equal) if !max_inclusive => return false,
67            None => return false,
68            _ => {}
69        }
70    }
71
72    true
73}
74
75// Tiered storage imports
76#[cfg(feature = "tiered-storage")]
77use crate::storage::EpochStore;
78#[cfg(feature = "tiered-storage")]
79use grafeo_common::memory::arena::ArenaAllocator;
80#[cfg(feature = "tiered-storage")]
81use grafeo_common::mvcc::{ColdVersionRef, HotVersionRef, VersionIndex, VersionRef};
82
83/// Configuration for the LPG store.
84///
85/// The defaults work well for most cases. Tune `backward_edges` if you only
86/// traverse in one direction (saves memory), or adjust capacities if you know
87/// your graph size upfront (avoids reallocations).
88#[derive(Debug, Clone)]
89pub struct LpgStoreConfig {
90    /// Maintain backward adjacency for incoming edge queries. Turn off if
91    /// you only traverse outgoing edges - saves ~50% adjacency memory.
92    pub backward_edges: bool,
93    /// Initial capacity for nodes (avoids early reallocations).
94    pub initial_node_capacity: usize,
95    /// Initial capacity for edges (avoids early reallocations).
96    pub initial_edge_capacity: usize,
97}
98
99impl Default for LpgStoreConfig {
100    fn default() -> Self {
101        Self {
102            backward_edges: true,
103            initial_node_capacity: 1024,
104            initial_edge_capacity: 4096,
105        }
106    }
107}
108
109/// The core in-memory graph storage.
110///
111/// Everything lives here: nodes, edges, properties, adjacency indexes, and
112/// version chains for MVCC. Concurrent reads never block each other.
113///
114/// Most users should go through `GrafeoDB` (from the `grafeo_engine` crate) which
115/// adds transaction management and query execution. Use `LpgStore` directly
116/// when you need raw performance for algorithm implementations.
117///
118/// # Example
119///
120/// ```
121/// use grafeo_core::graph::lpg::LpgStore;
122/// use grafeo_core::graph::Direction;
123///
124/// let store = LpgStore::new();
125///
126/// // Create a small social network
127/// let alice = store.create_node(&["Person"]);
128/// let bob = store.create_node(&["Person"]);
129/// store.create_edge(alice, bob, "KNOWS");
130///
131/// // Traverse outgoing edges
132/// for neighbor in store.neighbors(alice, Direction::Outgoing) {
133///     println!("Alice knows node {:?}", neighbor);
134/// }
135/// ```
136///
137/// # Lock Ordering
138///
139/// `LpgStore` contains multiple `RwLock` fields that must be acquired in a
140/// consistent order to prevent deadlocks. Always acquire locks in this order:
141///
142/// ## Level 1 - Entity Storage (mutually exclusive via feature flag)
143/// 1. `nodes` / `node_versions`
144/// 2. `edges` / `edge_versions`
145///
146/// ## Level 2 - Catalogs (acquire as pairs when writing)
147/// 3. `label_to_id` + `id_to_label`
148/// 4. `edge_type_to_id` + `id_to_edge_type`
149///
150/// ## Level 3 - Indexes
151/// 5. `label_index`
152/// 6. `node_labels`
153/// 7. `property_indexes`
154///
155/// ## Level 4 - Statistics
156/// 8. `statistics`
157///
158/// ## Level 5 - Nested Locks (internal to other structs)
159/// 9. `PropertyStorage::columns` (via `node_properties`/`edge_properties`)
160/// 10. `ChunkedAdjacency::lists` (via `forward_adj`/`backward_adj`)
161///
162/// ## Rules
163/// - Catalog pairs must be acquired together when writing.
164/// - Never hold entity locks while acquiring catalog locks in a different scope.
165/// - Statistics lock is always last.
166/// - Read locks are generally safe, but avoid read-to-write upgrades.
167pub struct LpgStore {
168    /// Configuration.
169    #[allow(dead_code)]
170    config: LpgStoreConfig,
171
172    /// Node records indexed by NodeId, with version chains for MVCC.
173    /// Used when `tiered-storage` feature is disabled.
174    /// Lock order: 1
175    #[cfg(not(feature = "tiered-storage"))]
176    nodes: RwLock<FxHashMap<NodeId, VersionChain<NodeRecord>>>,
177
178    /// Edge records indexed by EdgeId, with version chains for MVCC.
179    /// Used when `tiered-storage` feature is disabled.
180    /// Lock order: 2
181    #[cfg(not(feature = "tiered-storage"))]
182    edges: RwLock<FxHashMap<EdgeId, VersionChain<EdgeRecord>>>,
183
184    // === Tiered Storage Fields (feature-gated) ===
185    //
186    // Lock ordering for arena access:
187    //   version_lock (read/write) → arena read lock (via arena_allocator.arena())
188    //
189    // Rules:
190    // - Acquire arena read lock *after* version locks, never before.
191    // - Multiple threads may call arena.read_at() concurrently (shared refs only).
192    // - Never acquire arena write lock (alloc_new_chunk) while holding version locks.
193    // - freeze_epoch order: node_versions.read() → arena.read_at(),
194    //   then edge_versions.read() → arena.read_at().
195    /// Arena allocator for hot data storage.
196    /// Data is stored in per-epoch arenas for fast allocation and bulk deallocation.
197    #[cfg(feature = "tiered-storage")]
198    arena_allocator: Arc<ArenaAllocator>,
199
200    /// Node version indexes - store metadata and arena offsets.
201    /// The actual NodeRecord data is stored in the arena.
202    /// Lock order: 1
203    #[cfg(feature = "tiered-storage")]
204    node_versions: RwLock<FxHashMap<NodeId, VersionIndex>>,
205
206    /// Edge version indexes - store metadata and arena offsets.
207    /// The actual EdgeRecord data is stored in the arena.
208    /// Lock order: 2
209    #[cfg(feature = "tiered-storage")]
210    edge_versions: RwLock<FxHashMap<EdgeId, VersionIndex>>,
211
212    /// Cold storage for frozen epochs.
213    /// Contains compressed epoch blocks for historical data.
214    #[cfg(feature = "tiered-storage")]
215    epoch_store: Arc<EpochStore>,
216
217    /// Property storage for nodes.
218    node_properties: PropertyStorage<NodeId>,
219
220    /// Property storage for edges.
221    edge_properties: PropertyStorage<EdgeId>,
222
223    /// Label name to ID mapping.
224    /// Lock order: 3 (acquire with id_to_label)
225    label_to_id: RwLock<FxHashMap<ArcStr, u32>>,
226
227    /// Label ID to name mapping.
228    /// Lock order: 3 (acquire with label_to_id)
229    id_to_label: RwLock<Vec<ArcStr>>,
230
231    /// Edge type name to ID mapping.
232    /// Lock order: 4 (acquire with id_to_edge_type)
233    edge_type_to_id: RwLock<FxHashMap<ArcStr, u32>>,
234
235    /// Edge type ID to name mapping.
236    /// Lock order: 4 (acquire with edge_type_to_id)
237    id_to_edge_type: RwLock<Vec<ArcStr>>,
238
239    /// Forward adjacency lists (outgoing edges).
240    forward_adj: ChunkedAdjacency,
241
242    /// Backward adjacency lists (incoming edges).
243    /// Only populated if config.backward_edges is true.
244    backward_adj: Option<ChunkedAdjacency>,
245
246    /// Label index: label_id -> set of node IDs.
247    /// Lock order: 5
248    label_index: RwLock<Vec<FxHashMap<NodeId, ()>>>,
249
250    /// Node labels: node_id -> set of label IDs.
251    /// Reverse mapping to efficiently get labels for a node.
252    /// Lock order: 6
253    node_labels: RwLock<FxHashMap<NodeId, FxHashSet<u32>>>,
254
255    /// Property indexes: property_key -> (value -> set of node IDs).
256    ///
257    /// When a property is indexed, lookups by value are O(1) instead of O(n).
258    /// Use [`create_property_index`] to enable indexing for a property.
259    /// Lock order: 7
260    property_indexes: RwLock<FxHashMap<PropertyKey, DashMap<HashableValue, FxHashSet<NodeId>>>>,
261
262    /// Vector indexes: "label:property" -> HNSW index.
263    ///
264    /// Created via [`GrafeoDB::create_vector_index`](grafeo_engine::GrafeoDB::create_vector_index).
265    /// Lock order: 7 (same level as property_indexes, disjoint keys)
266    #[cfg(feature = "vector-index")]
267    vector_indexes: RwLock<FxHashMap<String, Arc<HnswIndex>>>,
268
269    /// Next node ID.
270    next_node_id: AtomicU64,
271
272    /// Next edge ID.
273    next_edge_id: AtomicU64,
274
275    /// Current epoch.
276    current_epoch: AtomicU64,
277
278    /// Statistics for cost-based optimization.
279    /// Lock order: 8 (always last)
280    statistics: RwLock<Statistics>,
281
282    /// Whether statistics need recomputation after mutations.
283    needs_stats_recompute: AtomicBool,
284}
285
286impl LpgStore {
287    /// Creates a new LPG store with default configuration.
288    #[must_use]
289    pub fn new() -> Self {
290        Self::with_config(LpgStoreConfig::default())
291    }
292
293    /// Creates a new LPG store with custom configuration.
294    #[must_use]
295    pub fn with_config(config: LpgStoreConfig) -> Self {
296        let backward_adj = if config.backward_edges {
297            Some(ChunkedAdjacency::new())
298        } else {
299            None
300        };
301
302        Self {
303            #[cfg(not(feature = "tiered-storage"))]
304            nodes: RwLock::new(FxHashMap::default()),
305            #[cfg(not(feature = "tiered-storage"))]
306            edges: RwLock::new(FxHashMap::default()),
307            #[cfg(feature = "tiered-storage")]
308            arena_allocator: Arc::new(ArenaAllocator::new()),
309            #[cfg(feature = "tiered-storage")]
310            node_versions: RwLock::new(FxHashMap::default()),
311            #[cfg(feature = "tiered-storage")]
312            edge_versions: RwLock::new(FxHashMap::default()),
313            #[cfg(feature = "tiered-storage")]
314            epoch_store: Arc::new(EpochStore::new()),
315            node_properties: PropertyStorage::new(),
316            edge_properties: PropertyStorage::new(),
317            label_to_id: RwLock::new(FxHashMap::default()),
318            id_to_label: RwLock::new(Vec::new()),
319            edge_type_to_id: RwLock::new(FxHashMap::default()),
320            id_to_edge_type: RwLock::new(Vec::new()),
321            forward_adj: ChunkedAdjacency::new(),
322            backward_adj,
323            label_index: RwLock::new(Vec::new()),
324            node_labels: RwLock::new(FxHashMap::default()),
325            property_indexes: RwLock::new(FxHashMap::default()),
326            #[cfg(feature = "vector-index")]
327            vector_indexes: RwLock::new(FxHashMap::default()),
328            next_node_id: AtomicU64::new(0),
329            next_edge_id: AtomicU64::new(0),
330            current_epoch: AtomicU64::new(0),
331            statistics: RwLock::new(Statistics::new()),
332            needs_stats_recompute: AtomicBool::new(true),
333            config,
334        }
335    }
336
337    /// Returns the current epoch.
338    #[must_use]
339    pub fn current_epoch(&self) -> EpochId {
340        EpochId::new(self.current_epoch.load(Ordering::Acquire))
341    }
342
343    /// Creates a new epoch.
344    pub fn new_epoch(&self) -> EpochId {
345        let id = self.current_epoch.fetch_add(1, Ordering::AcqRel) + 1;
346        EpochId::new(id)
347    }
348
349    // === Node Operations ===
350
351    /// Creates a new node with the given labels.
352    ///
353    /// Uses the system transaction for non-transactional operations.
354    pub fn create_node(&self, labels: &[&str]) -> NodeId {
355        self.needs_stats_recompute.store(true, Ordering::Relaxed);
356        self.create_node_versioned(labels, self.current_epoch(), TxId::SYSTEM)
357    }
358
359    /// Creates a new node with the given labels within a transaction context.
360    #[cfg(not(feature = "tiered-storage"))]
361    pub fn create_node_versioned(&self, labels: &[&str], epoch: EpochId, tx_id: TxId) -> NodeId {
362        let id = NodeId::new(self.next_node_id.fetch_add(1, Ordering::Relaxed));
363
364        let mut record = NodeRecord::new(id, epoch);
365        record.set_label_count(labels.len() as u16);
366
367        // Store labels in node_labels map and label_index
368        let mut node_label_set = FxHashSet::default();
369        for label in labels {
370            let label_id = self.get_or_create_label_id(*label);
371            node_label_set.insert(label_id);
372
373            // Update label index
374            let mut index = self.label_index.write();
375            while index.len() <= label_id as usize {
376                index.push(FxHashMap::default());
377            }
378            index[label_id as usize].insert(id, ());
379        }
380
381        // Store node's labels
382        self.node_labels.write().insert(id, node_label_set);
383
384        // Create version chain with initial version
385        let chain = VersionChain::with_initial(record, epoch, tx_id);
386        self.nodes.write().insert(id, chain);
387        id
388    }
389
390    /// Creates a new node with the given labels within a transaction context.
391    /// (Tiered storage version: stores data in arena, metadata in VersionIndex)
392    #[cfg(feature = "tiered-storage")]
393    pub fn create_node_versioned(&self, labels: &[&str], epoch: EpochId, tx_id: TxId) -> NodeId {
394        let id = NodeId::new(self.next_node_id.fetch_add(1, Ordering::Relaxed));
395
396        let mut record = NodeRecord::new(id, epoch);
397        record.set_label_count(labels.len() as u16);
398
399        // Store labels in node_labels map and label_index
400        let mut node_label_set = FxHashSet::default();
401        for label in labels {
402            let label_id = self.get_or_create_label_id(*label);
403            node_label_set.insert(label_id);
404
405            // Update label index
406            let mut index = self.label_index.write();
407            while index.len() <= label_id as usize {
408                index.push(FxHashMap::default());
409            }
410            index[label_id as usize].insert(id, ());
411        }
412
413        // Store node's labels
414        self.node_labels.write().insert(id, node_label_set);
415
416        // Allocate record in arena and get offset (create epoch if needed)
417        let arena = self.arena_allocator.arena_or_create(epoch);
418        let (offset, _stored) = arena.alloc_value_with_offset(record);
419
420        // Create HotVersionRef pointing to arena data
421        let hot_ref = HotVersionRef::new(epoch, offset, tx_id);
422
423        // Create or update version index
424        let mut versions = self.node_versions.write();
425        if let Some(index) = versions.get_mut(&id) {
426            index.add_hot(hot_ref);
427        } else {
428            versions.insert(id, VersionIndex::with_initial(hot_ref));
429        }
430
431        id
432    }
433
434    /// Creates a new node with labels and properties.
435    pub fn create_node_with_props(
436        &self,
437        labels: &[&str],
438        properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
439    ) -> NodeId {
440        self.create_node_with_props_versioned(
441            labels,
442            properties,
443            self.current_epoch(),
444            TxId::SYSTEM,
445        )
446    }
447
448    /// Creates a new node with labels and properties within a transaction context.
449    #[cfg(not(feature = "tiered-storage"))]
450    pub fn create_node_with_props_versioned(
451        &self,
452        labels: &[&str],
453        properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
454        epoch: EpochId,
455        tx_id: TxId,
456    ) -> NodeId {
457        let id = self.create_node_versioned(labels, epoch, tx_id);
458
459        for (key, value) in properties {
460            self.node_properties.set(id, key.into(), value.into());
461        }
462
463        // Update props_count in record
464        let count = self.node_properties.get_all(id).len() as u16;
465        if let Some(chain) = self.nodes.write().get_mut(&id) {
466            if let Some(record) = chain.latest_mut() {
467                record.props_count = count;
468            }
469        }
470
471        id
472    }
473
474    /// Creates a new node with labels and properties within a transaction context.
475    /// (Tiered storage version)
476    #[cfg(feature = "tiered-storage")]
477    pub fn create_node_with_props_versioned(
478        &self,
479        labels: &[&str],
480        properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
481        epoch: EpochId,
482        tx_id: TxId,
483    ) -> NodeId {
484        let id = self.create_node_versioned(labels, epoch, tx_id);
485
486        for (key, value) in properties {
487            self.node_properties.set(id, key.into(), value.into());
488        }
489
490        // Note: props_count in record is not updated for tiered storage.
491        // The record is immutable once allocated in the arena.
492
493        id
494    }
495
496    /// Gets a node by ID (latest visible version).
497    #[must_use]
498    pub fn get_node(&self, id: NodeId) -> Option<Node> {
499        self.get_node_at_epoch(id, self.current_epoch())
500    }
501
502    /// Gets a node by ID at a specific epoch.
503    #[must_use]
504    #[cfg(not(feature = "tiered-storage"))]
505    pub fn get_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> Option<Node> {
506        let nodes = self.nodes.read();
507        let chain = nodes.get(&id)?;
508        let record = chain.visible_at(epoch)?;
509
510        if record.is_deleted() {
511            return None;
512        }
513
514        let mut node = Node::new(id);
515
516        // Get labels from node_labels map
517        let id_to_label = self.id_to_label.read();
518        let node_labels = self.node_labels.read();
519        if let Some(label_ids) = node_labels.get(&id) {
520            for &label_id in label_ids {
521                if let Some(label) = id_to_label.get(label_id as usize) {
522                    node.labels.push(label.clone());
523                }
524            }
525        }
526
527        // Get properties
528        node.properties = self.node_properties.get_all(id).into_iter().collect();
529
530        Some(node)
531    }
532
533    /// Gets a node by ID at a specific epoch.
534    /// (Tiered storage version: reads from arena via VersionIndex)
535    #[must_use]
536    #[cfg(feature = "tiered-storage")]
537    pub fn get_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> Option<Node> {
538        let versions = self.node_versions.read();
539        let index = versions.get(&id)?;
540        let version_ref = index.visible_at(epoch)?;
541
542        // Read the record from arena
543        let record = self.read_node_record(&version_ref)?;
544
545        if record.is_deleted() {
546            return None;
547        }
548
549        let mut node = Node::new(id);
550
551        // Get labels from node_labels map
552        let id_to_label = self.id_to_label.read();
553        let node_labels = self.node_labels.read();
554        if let Some(label_ids) = node_labels.get(&id) {
555            for &label_id in label_ids {
556                if let Some(label) = id_to_label.get(label_id as usize) {
557                    node.labels.push(label.clone());
558                }
559            }
560        }
561
562        // Get properties
563        node.properties = self.node_properties.get_all(id).into_iter().collect();
564
565        Some(node)
566    }
567
568    /// Gets a node visible to a specific transaction.
569    #[must_use]
570    #[cfg(not(feature = "tiered-storage"))]
571    pub fn get_node_versioned(&self, id: NodeId, epoch: EpochId, tx_id: TxId) -> Option<Node> {
572        let nodes = self.nodes.read();
573        let chain = nodes.get(&id)?;
574        let record = chain.visible_to(epoch, tx_id)?;
575
576        if record.is_deleted() {
577            return None;
578        }
579
580        let mut node = Node::new(id);
581
582        // Get labels from node_labels map
583        let id_to_label = self.id_to_label.read();
584        let node_labels = self.node_labels.read();
585        if let Some(label_ids) = node_labels.get(&id) {
586            for &label_id in label_ids {
587                if let Some(label) = id_to_label.get(label_id as usize) {
588                    node.labels.push(label.clone());
589                }
590            }
591        }
592
593        // Get properties
594        node.properties = self.node_properties.get_all(id).into_iter().collect();
595
596        Some(node)
597    }
598
599    /// Gets a node visible to a specific transaction.
600    /// (Tiered storage version: reads from arena via VersionIndex)
601    #[must_use]
602    #[cfg(feature = "tiered-storage")]
603    pub fn get_node_versioned(&self, id: NodeId, epoch: EpochId, tx_id: TxId) -> Option<Node> {
604        let versions = self.node_versions.read();
605        let index = versions.get(&id)?;
606        let version_ref = index.visible_to(epoch, tx_id)?;
607
608        // Read the record from arena
609        let record = self.read_node_record(&version_ref)?;
610
611        if record.is_deleted() {
612            return None;
613        }
614
615        let mut node = Node::new(id);
616
617        // Get labels from node_labels map
618        let id_to_label = self.id_to_label.read();
619        let node_labels = self.node_labels.read();
620        if let Some(label_ids) = node_labels.get(&id) {
621            for &label_id in label_ids {
622                if let Some(label) = id_to_label.get(label_id as usize) {
623                    node.labels.push(label.clone());
624                }
625            }
626        }
627
628        // Get properties
629        node.properties = self.node_properties.get_all(id).into_iter().collect();
630
631        Some(node)
632    }
633
634    /// Reads a NodeRecord from arena (hot) or epoch store (cold) using a VersionRef.
635    #[cfg(feature = "tiered-storage")]
636    #[allow(unsafe_code)]
637    fn read_node_record(&self, version_ref: &VersionRef) -> Option<NodeRecord> {
638        match version_ref {
639            VersionRef::Hot(hot_ref) => {
640                let arena = self.arena_allocator.arena(hot_ref.epoch);
641                // SAFETY: The offset was returned by alloc_value_with_offset for a NodeRecord
642                let record: &NodeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
643                Some(*record)
644            }
645            VersionRef::Cold(cold_ref) => {
646                // Read from compressed epoch store
647                self.epoch_store
648                    .get_node(cold_ref.epoch, cold_ref.block_offset, cold_ref.length)
649            }
650        }
651    }
652
653    /// Deletes a node and all its edges (using latest epoch).
654    pub fn delete_node(&self, id: NodeId) -> bool {
655        self.needs_stats_recompute.store(true, Ordering::Relaxed);
656        self.delete_node_at_epoch(id, self.current_epoch())
657    }
658
659    /// Deletes a node at a specific epoch.
660    #[cfg(not(feature = "tiered-storage"))]
661    pub fn delete_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> bool {
662        let mut nodes = self.nodes.write();
663        if let Some(chain) = nodes.get_mut(&id) {
664            // Check if visible at this epoch (not already deleted)
665            if let Some(record) = chain.visible_at(epoch) {
666                if record.is_deleted() {
667                    return false;
668                }
669            } else {
670                // Not visible at this epoch (already deleted or doesn't exist)
671                return false;
672            }
673
674            // Mark the version chain as deleted at this epoch
675            chain.mark_deleted(epoch);
676
677            // Remove from label index using node_labels map
678            let mut index = self.label_index.write();
679            let mut node_labels = self.node_labels.write();
680            if let Some(label_ids) = node_labels.remove(&id) {
681                for label_id in label_ids {
682                    if let Some(set) = index.get_mut(label_id as usize) {
683                        set.remove(&id);
684                    }
685                }
686            }
687
688            // Remove properties
689            drop(nodes); // Release lock before removing properties
690            drop(index);
691            drop(node_labels);
692            self.node_properties.remove_all(id);
693
694            // Note: Caller should use delete_node_edges() first if detach is needed
695
696            true
697        } else {
698            false
699        }
700    }
701
702    /// Deletes a node at a specific epoch.
703    /// (Tiered storage version)
704    #[cfg(feature = "tiered-storage")]
705    pub fn delete_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> bool {
706        let mut versions = self.node_versions.write();
707        if let Some(index) = versions.get_mut(&id) {
708            // Check if visible at this epoch
709            if let Some(version_ref) = index.visible_at(epoch) {
710                if let Some(record) = self.read_node_record(&version_ref) {
711                    if record.is_deleted() {
712                        return false;
713                    }
714                } else {
715                    return false;
716                }
717            } else {
718                return false;
719            }
720
721            // Mark as deleted in version index
722            index.mark_deleted(epoch);
723
724            // Remove from label index using node_labels map
725            let mut label_index = self.label_index.write();
726            let mut node_labels = self.node_labels.write();
727            if let Some(label_ids) = node_labels.remove(&id) {
728                for label_id in label_ids {
729                    if let Some(set) = label_index.get_mut(label_id as usize) {
730                        set.remove(&id);
731                    }
732                }
733            }
734
735            // Remove properties
736            drop(versions);
737            drop(label_index);
738            drop(node_labels);
739            self.node_properties.remove_all(id);
740
741            true
742        } else {
743            false
744        }
745    }
746
747    /// Deletes all edges connected to a node (implements DETACH DELETE).
748    ///
749    /// Call this before `delete_node()` if you want to remove a node that
750    /// has edges. Grafeo doesn't auto-delete edges - you have to be explicit.
751    #[cfg(not(feature = "tiered-storage"))]
752    pub fn delete_node_edges(&self, node_id: NodeId) {
753        // Get outgoing edges
754        let outgoing: Vec<EdgeId> = self
755            .forward_adj
756            .edges_from(node_id)
757            .into_iter()
758            .map(|(_, edge_id)| edge_id)
759            .collect();
760
761        // Get incoming edges
762        let incoming: Vec<EdgeId> = if let Some(ref backward) = self.backward_adj {
763            backward
764                .edges_from(node_id)
765                .into_iter()
766                .map(|(_, edge_id)| edge_id)
767                .collect()
768        } else {
769            // No backward adjacency - scan all edges
770            let epoch = self.current_epoch();
771            self.edges
772                .read()
773                .iter()
774                .filter_map(|(id, chain)| {
775                    chain.visible_at(epoch).and_then(|r| {
776                        if !r.is_deleted() && r.dst == node_id {
777                            Some(*id)
778                        } else {
779                            None
780                        }
781                    })
782                })
783                .collect()
784        };
785
786        // Delete all edges
787        for edge_id in outgoing.into_iter().chain(incoming) {
788            self.delete_edge(edge_id);
789        }
790    }
791
792    /// Deletes all edges connected to a node (implements DETACH DELETE).
793    /// (Tiered storage version)
794    #[cfg(feature = "tiered-storage")]
795    pub fn delete_node_edges(&self, node_id: NodeId) {
796        // Get outgoing edges
797        let outgoing: Vec<EdgeId> = self
798            .forward_adj
799            .edges_from(node_id)
800            .into_iter()
801            .map(|(_, edge_id)| edge_id)
802            .collect();
803
804        // Get incoming edges
805        let incoming: Vec<EdgeId> = if let Some(ref backward) = self.backward_adj {
806            backward
807                .edges_from(node_id)
808                .into_iter()
809                .map(|(_, edge_id)| edge_id)
810                .collect()
811        } else {
812            // No backward adjacency - scan all edges
813            let epoch = self.current_epoch();
814            let versions = self.edge_versions.read();
815            versions
816                .iter()
817                .filter_map(|(id, index)| {
818                    index.visible_at(epoch).and_then(|vref| {
819                        self.read_edge_record(&vref).and_then(|r| {
820                            if !r.is_deleted() && r.dst == node_id {
821                                Some(*id)
822                            } else {
823                                None
824                            }
825                        })
826                    })
827                })
828                .collect()
829        };
830
831        // Delete all edges
832        for edge_id in outgoing.into_iter().chain(incoming) {
833            self.delete_edge(edge_id);
834        }
835    }
836
837    /// Sets a property on a node.
838    #[cfg(not(feature = "tiered-storage"))]
839    pub fn set_node_property(&self, id: NodeId, key: &str, value: Value) {
840        let prop_key: PropertyKey = key.into();
841
842        // Update property index before setting the property (needs to read old value)
843        self.update_property_index_on_set(id, &prop_key, &value);
844
845        self.node_properties.set(id, prop_key, value);
846
847        // Update props_count in record
848        let count = self.node_properties.get_all(id).len() as u16;
849        if let Some(chain) = self.nodes.write().get_mut(&id) {
850            if let Some(record) = chain.latest_mut() {
851                record.props_count = count;
852            }
853        }
854    }
855
856    /// Sets a property on a node.
857    /// (Tiered storage version: properties stored separately, record is immutable)
858    #[cfg(feature = "tiered-storage")]
859    pub fn set_node_property(&self, id: NodeId, key: &str, value: Value) {
860        let prop_key: PropertyKey = key.into();
861
862        // Update property index before setting the property (needs to read old value)
863        self.update_property_index_on_set(id, &prop_key, &value);
864
865        self.node_properties.set(id, prop_key, value);
866        // Note: props_count in record is not updated for tiered storage.
867        // The record is immutable once allocated in the arena.
868        // Property count can be derived from PropertyStorage if needed.
869    }
870
871    /// Sets a property on an edge.
872    pub fn set_edge_property(&self, id: EdgeId, key: &str, value: Value) {
873        self.edge_properties.set(id, key.into(), value);
874    }
875
876    /// Removes a property from a node.
877    ///
878    /// Returns the previous value if it existed, or None if the property didn't exist.
879    #[cfg(not(feature = "tiered-storage"))]
880    pub fn remove_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
881        let prop_key: PropertyKey = key.into();
882
883        // Update property index before removing (needs to read old value)
884        self.update_property_index_on_remove(id, &prop_key);
885
886        let result = self.node_properties.remove(id, &prop_key);
887
888        // Update props_count in record
889        let count = self.node_properties.get_all(id).len() as u16;
890        if let Some(chain) = self.nodes.write().get_mut(&id) {
891            if let Some(record) = chain.latest_mut() {
892                record.props_count = count;
893            }
894        }
895
896        result
897    }
898
899    /// Removes a property from a node.
900    /// (Tiered storage version)
901    #[cfg(feature = "tiered-storage")]
902    pub fn remove_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
903        let prop_key: PropertyKey = key.into();
904
905        // Update property index before removing (needs to read old value)
906        self.update_property_index_on_remove(id, &prop_key);
907
908        self.node_properties.remove(id, &prop_key)
909        // Note: props_count in record is not updated for tiered storage.
910    }
911
912    /// Removes a property from an edge.
913    ///
914    /// Returns the previous value if it existed, or None if the property didn't exist.
915    pub fn remove_edge_property(&self, id: EdgeId, key: &str) -> Option<Value> {
916        self.edge_properties.remove(id, &key.into())
917    }
918
919    /// Gets a single property from a node without loading all properties.
920    ///
921    /// This is O(1) vs O(properties) for `get_node().get_property()`.
922    /// Use this for filter predicates where you only need one property value.
923    ///
924    /// # Example
925    ///
926    /// ```ignore
927    /// // Fast: Direct single-property lookup
928    /// let age = store.get_node_property(node_id, "age");
929    ///
930    /// // Slow: Loads all properties, then extracts one
931    /// let age = store.get_node(node_id).and_then(|n| n.get_property("age").cloned());
932    /// ```
933    #[must_use]
934    pub fn get_node_property(&self, id: NodeId, key: &PropertyKey) -> Option<Value> {
935        self.node_properties.get(id, key)
936    }
937
938    /// Gets a single property from an edge without loading all properties.
939    ///
940    /// This is O(1) vs O(properties) for `get_edge().get_property()`.
941    #[must_use]
942    pub fn get_edge_property(&self, id: EdgeId, key: &PropertyKey) -> Option<Value> {
943        self.edge_properties.get(id, key)
944    }
945
946    // === Batch Property Operations ===
947
948    /// Gets a property for multiple nodes in a single batch operation.
949    ///
950    /// More efficient than calling [`Self::get_node_property`] in a loop because it
951    /// reduces lock overhead and enables better cache utilization.
952    ///
953    /// # Example
954    ///
955    /// ```
956    /// use grafeo_core::graph::lpg::LpgStore;
957    /// use grafeo_common::types::{NodeId, PropertyKey, Value};
958    ///
959    /// let store = LpgStore::new();
960    /// let n1 = store.create_node(&["Person"]);
961    /// let n2 = store.create_node(&["Person"]);
962    /// store.set_node_property(n1, "age", Value::from(25i64));
963    /// store.set_node_property(n2, "age", Value::from(30i64));
964    ///
965    /// let ages = store.get_node_property_batch(&[n1, n2], &PropertyKey::new("age"));
966    /// assert_eq!(ages, vec![Some(Value::from(25i64)), Some(Value::from(30i64))]);
967    /// ```
968    #[must_use]
969    pub fn get_node_property_batch(&self, ids: &[NodeId], key: &PropertyKey) -> Vec<Option<Value>> {
970        self.node_properties.get_batch(ids, key)
971    }
972
973    /// Gets all properties for multiple nodes in a single batch operation.
974    ///
975    /// Returns a vector of property maps, one per node ID (empty map if no properties).
976    /// More efficient than calling [`Self::get_node`] in a loop.
977    #[must_use]
978    pub fn get_nodes_properties_batch(&self, ids: &[NodeId]) -> Vec<FxHashMap<PropertyKey, Value>> {
979        self.node_properties.get_all_batch(ids)
980    }
981
982    /// Gets selected properties for multiple nodes (projection pushdown).
983    ///
984    /// This is more efficient than [`Self::get_nodes_properties_batch`] when you only
985    /// need a subset of properties. It only iterates the requested columns instead of
986    /// all columns.
987    ///
988    /// **Use this for**: Queries with explicit projections like `RETURN n.name, n.age`
989    /// instead of `RETURN n` (which requires all properties).
990    ///
991    /// # Example
992    ///
993    /// ```
994    /// use grafeo_core::graph::lpg::LpgStore;
995    /// use grafeo_common::types::{PropertyKey, Value};
996    ///
997    /// let store = LpgStore::new();
998    /// let n1 = store.create_node(&["Person"]);
999    /// store.set_node_property(n1, "name", Value::from("Alice"));
1000    /// store.set_node_property(n1, "age", Value::from(30i64));
1001    /// store.set_node_property(n1, "email", Value::from("alice@example.com"));
1002    ///
1003    /// // Only fetch name and age (faster than get_nodes_properties_batch)
1004    /// let keys = vec![PropertyKey::new("name"), PropertyKey::new("age")];
1005    /// let props = store.get_nodes_properties_selective_batch(&[n1], &keys);
1006    ///
1007    /// assert_eq!(props[0].len(), 2); // Only name and age, not email
1008    /// ```
1009    #[must_use]
1010    pub fn get_nodes_properties_selective_batch(
1011        &self,
1012        ids: &[NodeId],
1013        keys: &[PropertyKey],
1014    ) -> Vec<FxHashMap<PropertyKey, Value>> {
1015        self.node_properties.get_selective_batch(ids, keys)
1016    }
1017
1018    /// Gets selected properties for multiple edges (projection pushdown).
1019    ///
1020    /// Edge-property version of [`Self::get_nodes_properties_selective_batch`].
1021    #[must_use]
1022    pub fn get_edges_properties_selective_batch(
1023        &self,
1024        ids: &[EdgeId],
1025        keys: &[PropertyKey],
1026    ) -> Vec<FxHashMap<PropertyKey, Value>> {
1027        self.edge_properties.get_selective_batch(ids, keys)
1028    }
1029
1030    /// Finds nodes where a property value is in a range.
1031    ///
1032    /// This is useful for queries like `n.age > 30` or `n.price BETWEEN 10 AND 100`.
1033    /// Uses zone maps to skip scanning when the range definitely doesn't match.
1034    ///
1035    /// # Arguments
1036    ///
1037    /// * `property` - The property to check
1038    /// * `min` - Optional lower bound (None for unbounded)
1039    /// * `max` - Optional upper bound (None for unbounded)
1040    /// * `min_inclusive` - Whether lower bound is inclusive (>= vs >)
1041    /// * `max_inclusive` - Whether upper bound is inclusive (<= vs <)
1042    ///
1043    /// # Example
1044    ///
1045    /// ```
1046    /// use grafeo_core::graph::lpg::LpgStore;
1047    /// use grafeo_common::types::Value;
1048    ///
1049    /// let store = LpgStore::new();
1050    /// let n1 = store.create_node(&["Person"]);
1051    /// let n2 = store.create_node(&["Person"]);
1052    /// store.set_node_property(n1, "age", Value::from(25i64));
1053    /// store.set_node_property(n2, "age", Value::from(35i64));
1054    ///
1055    /// // Find nodes where age > 30
1056    /// let result = store.find_nodes_in_range(
1057    ///     "age",
1058    ///     Some(&Value::from(30i64)),
1059    ///     None,
1060    ///     false, // exclusive lower bound
1061    ///     true,  // inclusive upper bound (doesn't matter since None)
1062    /// );
1063    /// assert_eq!(result.len(), 1); // Only n2 matches
1064    /// ```
1065    #[must_use]
1066    pub fn find_nodes_in_range(
1067        &self,
1068        property: &str,
1069        min: Option<&Value>,
1070        max: Option<&Value>,
1071        min_inclusive: bool,
1072        max_inclusive: bool,
1073    ) -> Vec<NodeId> {
1074        let key = PropertyKey::new(property);
1075
1076        // Check zone map first - if no values could match, return empty
1077        if !self
1078            .node_properties
1079            .might_match_range(&key, min, max, min_inclusive, max_inclusive)
1080        {
1081            return Vec::new();
1082        }
1083
1084        // Scan all nodes and filter by range
1085        self.node_ids()
1086            .into_iter()
1087            .filter(|&node_id| {
1088                self.node_properties
1089                    .get(node_id, &key)
1090                    .is_some_and(|v| value_in_range(&v, min, max, min_inclusive, max_inclusive))
1091            })
1092            .collect()
1093    }
1094
1095    /// Finds nodes matching multiple property equality conditions.
1096    ///
1097    /// This is more efficient than intersecting multiple single-property lookups
1098    /// because it can use indexes when available and short-circuits on the first
1099    /// miss.
1100    ///
1101    /// # Example
1102    ///
1103    /// ```
1104    /// use grafeo_core::graph::lpg::LpgStore;
1105    /// use grafeo_common::types::Value;
1106    ///
1107    /// let store = LpgStore::new();
1108    /// let alice = store.create_node(&["Person"]);
1109    /// store.set_node_property(alice, "name", Value::from("Alice"));
1110    /// store.set_node_property(alice, "city", Value::from("NYC"));
1111    ///
1112    /// // Find nodes where name = "Alice" AND city = "NYC"
1113    /// let matches = store.find_nodes_by_properties(&[
1114    ///     ("name", Value::from("Alice")),
1115    ///     ("city", Value::from("NYC")),
1116    /// ]);
1117    /// assert!(matches.contains(&alice));
1118    /// ```
1119    #[must_use]
1120    pub fn find_nodes_by_properties(&self, conditions: &[(&str, Value)]) -> Vec<NodeId> {
1121        if conditions.is_empty() {
1122            return self.node_ids();
1123        }
1124
1125        // Find the most selective condition (smallest result set) to start
1126        // If any condition has an index, use that first
1127        let mut best_start: Option<(usize, Vec<NodeId>)> = None;
1128        let indexes = self.property_indexes.read();
1129
1130        for (i, (prop, value)) in conditions.iter().enumerate() {
1131            let key = PropertyKey::new(*prop);
1132            let hv = HashableValue::new(value.clone());
1133
1134            if let Some(index) = indexes.get(&key) {
1135                let matches: Vec<NodeId> = index
1136                    .get(&hv)
1137                    .map(|nodes| nodes.iter().copied().collect())
1138                    .unwrap_or_default();
1139
1140                // Short-circuit if any indexed condition has no matches
1141                if matches.is_empty() {
1142                    return Vec::new();
1143                }
1144
1145                // Use smallest indexed result as starting point
1146                if best_start
1147                    .as_ref()
1148                    .is_none_or(|(_, best)| matches.len() < best.len())
1149                {
1150                    best_start = Some((i, matches));
1151                }
1152            }
1153        }
1154        drop(indexes);
1155
1156        // Start from best indexed result or fall back to full node scan
1157        let (start_idx, mut candidates) = best_start.unwrap_or_else(|| {
1158            // No indexes available, start with first condition via full scan
1159            let (prop, value) = &conditions[0];
1160            (0, self.find_nodes_by_property(prop, value))
1161        });
1162
1163        // Filter candidates through remaining conditions
1164        for (i, (prop, value)) in conditions.iter().enumerate() {
1165            if i == start_idx {
1166                continue;
1167            }
1168
1169            let key = PropertyKey::new(*prop);
1170            candidates.retain(|&node_id| {
1171                self.node_properties
1172                    .get(node_id, &key)
1173                    .is_some_and(|v| v == *value)
1174            });
1175
1176            // Short-circuit if no candidates remain
1177            if candidates.is_empty() {
1178                return Vec::new();
1179            }
1180        }
1181
1182        candidates
1183    }
1184
1185    // === Property Index Operations ===
1186
1187    /// Creates an index on a node property for O(1) lookups by value.
1188    ///
1189    /// After creating an index, calls to [`Self::find_nodes_by_property`] will be
1190    /// O(1) instead of O(n) for this property. The index is automatically
1191    /// maintained when properties are set or removed.
1192    ///
1193    /// # Example
1194    ///
1195    /// ```
1196    /// use grafeo_core::graph::lpg::LpgStore;
1197    /// use grafeo_common::types::Value;
1198    ///
1199    /// let store = LpgStore::new();
1200    ///
1201    /// // Create nodes with an 'id' property
1202    /// let alice = store.create_node(&["Person"]);
1203    /// store.set_node_property(alice, "id", Value::from("alice_123"));
1204    ///
1205    /// // Create an index on the 'id' property
1206    /// store.create_property_index("id");
1207    ///
1208    /// // Now lookups by 'id' are O(1)
1209    /// let found = store.find_nodes_by_property("id", &Value::from("alice_123"));
1210    /// assert!(found.contains(&alice));
1211    /// ```
1212    pub fn create_property_index(&self, property: &str) {
1213        let key = PropertyKey::new(property);
1214
1215        let mut indexes = self.property_indexes.write();
1216        if indexes.contains_key(&key) {
1217            return; // Already indexed
1218        }
1219
1220        // Create the index and populate it with existing data
1221        let index: DashMap<HashableValue, FxHashSet<NodeId>> = DashMap::new();
1222
1223        // Scan all nodes to build the index
1224        for node_id in self.node_ids() {
1225            if let Some(value) = self.node_properties.get(node_id, &key) {
1226                let hv = HashableValue::new(value);
1227                index
1228                    .entry(hv)
1229                    .or_insert_with(FxHashSet::default)
1230                    .insert(node_id);
1231            }
1232        }
1233
1234        indexes.insert(key, index);
1235    }
1236
1237    /// Drops an index on a node property.
1238    ///
1239    /// Returns `true` if the index existed and was removed.
1240    pub fn drop_property_index(&self, property: &str) -> bool {
1241        let key = PropertyKey::new(property);
1242        self.property_indexes.write().remove(&key).is_some()
1243    }
1244
1245    /// Returns `true` if the property has an index.
1246    #[must_use]
1247    pub fn has_property_index(&self, property: &str) -> bool {
1248        let key = PropertyKey::new(property);
1249        self.property_indexes.read().contains_key(&key)
1250    }
1251
1252    /// Stores a vector index for a label+property pair.
1253    #[cfg(feature = "vector-index")]
1254    pub fn add_vector_index(&self, label: &str, property: &str, index: Arc<HnswIndex>) {
1255        let key = format!("{label}:{property}");
1256        self.vector_indexes.write().insert(key, index);
1257    }
1258
1259    /// Retrieves the vector index for a label+property pair.
1260    #[cfg(feature = "vector-index")]
1261    #[must_use]
1262    pub fn get_vector_index(&self, label: &str, property: &str) -> Option<Arc<HnswIndex>> {
1263        let key = format!("{label}:{property}");
1264        self.vector_indexes.read().get(&key).cloned()
1265    }
1266
1267    /// Finds all nodes that have a specific property value.
1268    ///
1269    /// If the property is indexed, this is O(1). Otherwise, it scans all nodes
1270    /// which is O(n). Use [`Self::create_property_index`] for frequently queried properties.
1271    ///
1272    /// # Example
1273    ///
1274    /// ```
1275    /// use grafeo_core::graph::lpg::LpgStore;
1276    /// use grafeo_common::types::Value;
1277    ///
1278    /// let store = LpgStore::new();
1279    /// store.create_property_index("city"); // Optional but makes lookups fast
1280    ///
1281    /// let alice = store.create_node(&["Person"]);
1282    /// let bob = store.create_node(&["Person"]);
1283    /// store.set_node_property(alice, "city", Value::from("NYC"));
1284    /// store.set_node_property(bob, "city", Value::from("NYC"));
1285    ///
1286    /// let nyc_people = store.find_nodes_by_property("city", &Value::from("NYC"));
1287    /// assert_eq!(nyc_people.len(), 2);
1288    /// ```
1289    #[must_use]
1290    pub fn find_nodes_by_property(&self, property: &str, value: &Value) -> Vec<NodeId> {
1291        let key = PropertyKey::new(property);
1292        let hv = HashableValue::new(value.clone());
1293
1294        // Try indexed lookup first
1295        let indexes = self.property_indexes.read();
1296        if let Some(index) = indexes.get(&key) {
1297            if let Some(nodes) = index.get(&hv) {
1298                return nodes.iter().copied().collect();
1299            }
1300            return Vec::new();
1301        }
1302        drop(indexes);
1303
1304        // Fall back to full scan
1305        self.node_ids()
1306            .into_iter()
1307            .filter(|&node_id| {
1308                self.node_properties
1309                    .get(node_id, &key)
1310                    .is_some_and(|v| v == *value)
1311            })
1312            .collect()
1313    }
1314
1315    /// Updates property indexes when a property is set.
1316    fn update_property_index_on_set(&self, node_id: NodeId, key: &PropertyKey, new_value: &Value) {
1317        let indexes = self.property_indexes.read();
1318        if let Some(index) = indexes.get(key) {
1319            // Get old value to remove from index
1320            if let Some(old_value) = self.node_properties.get(node_id, key) {
1321                let old_hv = HashableValue::new(old_value);
1322                if let Some(mut nodes) = index.get_mut(&old_hv) {
1323                    nodes.remove(&node_id);
1324                    if nodes.is_empty() {
1325                        drop(nodes);
1326                        index.remove(&old_hv);
1327                    }
1328                }
1329            }
1330
1331            // Add new value to index
1332            let new_hv = HashableValue::new(new_value.clone());
1333            index
1334                .entry(new_hv)
1335                .or_insert_with(FxHashSet::default)
1336                .insert(node_id);
1337        }
1338    }
1339
1340    /// Updates property indexes when a property is removed.
1341    fn update_property_index_on_remove(&self, node_id: NodeId, key: &PropertyKey) {
1342        let indexes = self.property_indexes.read();
1343        if let Some(index) = indexes.get(key) {
1344            // Get old value to remove from index
1345            if let Some(old_value) = self.node_properties.get(node_id, key) {
1346                let old_hv = HashableValue::new(old_value);
1347                if let Some(mut nodes) = index.get_mut(&old_hv) {
1348                    nodes.remove(&node_id);
1349                    if nodes.is_empty() {
1350                        drop(nodes);
1351                        index.remove(&old_hv);
1352                    }
1353                }
1354            }
1355        }
1356    }
1357
1358    /// Adds a label to a node.
1359    ///
1360    /// Returns true if the label was added, false if the node doesn't exist
1361    /// or already has the label.
1362    #[cfg(not(feature = "tiered-storage"))]
1363    pub fn add_label(&self, node_id: NodeId, label: &str) -> bool {
1364        let epoch = self.current_epoch();
1365
1366        // Check if node exists
1367        let nodes = self.nodes.read();
1368        if let Some(chain) = nodes.get(&node_id) {
1369            if chain.visible_at(epoch).map_or(true, |r| r.is_deleted()) {
1370                return false;
1371            }
1372        } else {
1373            return false;
1374        }
1375        drop(nodes);
1376
1377        // Get or create label ID
1378        let label_id = self.get_or_create_label_id(label);
1379
1380        // Add to node_labels map
1381        let mut node_labels = self.node_labels.write();
1382        let label_set = node_labels
1383            .entry(node_id)
1384            .or_insert_with(FxHashSet::default);
1385
1386        if label_set.contains(&label_id) {
1387            return false; // Already has this label
1388        }
1389
1390        label_set.insert(label_id);
1391        drop(node_labels);
1392
1393        // Add to label_index
1394        let mut index = self.label_index.write();
1395        if (label_id as usize) >= index.len() {
1396            index.resize(label_id as usize + 1, FxHashMap::default());
1397        }
1398        index[label_id as usize].insert(node_id, ());
1399
1400        // Update label count in node record
1401        if let Some(chain) = self.nodes.write().get_mut(&node_id) {
1402            if let Some(record) = chain.latest_mut() {
1403                let count = self.node_labels.read().get(&node_id).map_or(0, |s| s.len());
1404                record.set_label_count(count as u16);
1405            }
1406        }
1407
1408        true
1409    }
1410
1411    /// Adds a label to a node.
1412    /// (Tiered storage version)
1413    #[cfg(feature = "tiered-storage")]
1414    pub fn add_label(&self, node_id: NodeId, label: &str) -> bool {
1415        let epoch = self.current_epoch();
1416
1417        // Check if node exists
1418        let versions = self.node_versions.read();
1419        if let Some(index) = versions.get(&node_id) {
1420            if let Some(vref) = index.visible_at(epoch) {
1421                if let Some(record) = self.read_node_record(&vref) {
1422                    if record.is_deleted() {
1423                        return false;
1424                    }
1425                } else {
1426                    return false;
1427                }
1428            } else {
1429                return false;
1430            }
1431        } else {
1432            return false;
1433        }
1434        drop(versions);
1435
1436        // Get or create label ID
1437        let label_id = self.get_or_create_label_id(label);
1438
1439        // Add to node_labels map
1440        let mut node_labels = self.node_labels.write();
1441        let label_set = node_labels
1442            .entry(node_id)
1443            .or_insert_with(FxHashSet::default);
1444
1445        if label_set.contains(&label_id) {
1446            return false; // Already has this label
1447        }
1448
1449        label_set.insert(label_id);
1450        drop(node_labels);
1451
1452        // Add to label_index
1453        let mut index = self.label_index.write();
1454        if (label_id as usize) >= index.len() {
1455            index.resize(label_id as usize + 1, FxHashMap::default());
1456        }
1457        index[label_id as usize].insert(node_id, ());
1458
1459        // Note: label_count in record is not updated for tiered storage.
1460        // The record is immutable once allocated in the arena.
1461
1462        true
1463    }
1464
1465    /// Removes a label from a node.
1466    ///
1467    /// Returns true if the label was removed, false if the node doesn't exist
1468    /// or doesn't have the label.
1469    #[cfg(not(feature = "tiered-storage"))]
1470    pub fn remove_label(&self, node_id: NodeId, label: &str) -> bool {
1471        let epoch = self.current_epoch();
1472
1473        // Check if node exists
1474        let nodes = self.nodes.read();
1475        if let Some(chain) = nodes.get(&node_id) {
1476            if chain.visible_at(epoch).map_or(true, |r| r.is_deleted()) {
1477                return false;
1478            }
1479        } else {
1480            return false;
1481        }
1482        drop(nodes);
1483
1484        // Get label ID
1485        let label_id = {
1486            let label_ids = self.label_to_id.read();
1487            match label_ids.get(label) {
1488                Some(&id) => id,
1489                None => return false, // Label doesn't exist
1490            }
1491        };
1492
1493        // Remove from node_labels map
1494        let mut node_labels = self.node_labels.write();
1495        if let Some(label_set) = node_labels.get_mut(&node_id) {
1496            if !label_set.remove(&label_id) {
1497                return false; // Node doesn't have this label
1498            }
1499        } else {
1500            return false;
1501        }
1502        drop(node_labels);
1503
1504        // Remove from label_index
1505        let mut index = self.label_index.write();
1506        if (label_id as usize) < index.len() {
1507            index[label_id as usize].remove(&node_id);
1508        }
1509
1510        // Update label count in node record
1511        if let Some(chain) = self.nodes.write().get_mut(&node_id) {
1512            if let Some(record) = chain.latest_mut() {
1513                let count = self.node_labels.read().get(&node_id).map_or(0, |s| s.len());
1514                record.set_label_count(count as u16);
1515            }
1516        }
1517
1518        true
1519    }
1520
1521    /// Removes a label from a node.
1522    /// (Tiered storage version)
1523    #[cfg(feature = "tiered-storage")]
1524    pub fn remove_label(&self, node_id: NodeId, label: &str) -> bool {
1525        let epoch = self.current_epoch();
1526
1527        // Check if node exists
1528        let versions = self.node_versions.read();
1529        if let Some(index) = versions.get(&node_id) {
1530            if let Some(vref) = index.visible_at(epoch) {
1531                if let Some(record) = self.read_node_record(&vref) {
1532                    if record.is_deleted() {
1533                        return false;
1534                    }
1535                } else {
1536                    return false;
1537                }
1538            } else {
1539                return false;
1540            }
1541        } else {
1542            return false;
1543        }
1544        drop(versions);
1545
1546        // Get label ID
1547        let label_id = {
1548            let label_ids = self.label_to_id.read();
1549            match label_ids.get(label) {
1550                Some(&id) => id,
1551                None => return false, // Label doesn't exist
1552            }
1553        };
1554
1555        // Remove from node_labels map
1556        let mut node_labels = self.node_labels.write();
1557        if let Some(label_set) = node_labels.get_mut(&node_id) {
1558            if !label_set.remove(&label_id) {
1559                return false; // Node doesn't have this label
1560            }
1561        } else {
1562            return false;
1563        }
1564        drop(node_labels);
1565
1566        // Remove from label_index
1567        let mut index = self.label_index.write();
1568        if (label_id as usize) < index.len() {
1569            index[label_id as usize].remove(&node_id);
1570        }
1571
1572        // Note: label_count in record is not updated for tiered storage.
1573
1574        true
1575    }
1576
1577    /// Returns the number of nodes (non-deleted at current epoch).
1578    #[must_use]
1579    #[cfg(not(feature = "tiered-storage"))]
1580    pub fn node_count(&self) -> usize {
1581        let epoch = self.current_epoch();
1582        self.nodes
1583            .read()
1584            .values()
1585            .filter_map(|chain| chain.visible_at(epoch))
1586            .filter(|r| !r.is_deleted())
1587            .count()
1588    }
1589
1590    /// Returns the number of nodes (non-deleted at current epoch).
1591    /// (Tiered storage version)
1592    #[must_use]
1593    #[cfg(feature = "tiered-storage")]
1594    pub fn node_count(&self) -> usize {
1595        let epoch = self.current_epoch();
1596        let versions = self.node_versions.read();
1597        versions
1598            .iter()
1599            .filter(|(_, index)| {
1600                index.visible_at(epoch).map_or(false, |vref| {
1601                    self.read_node_record(&vref)
1602                        .map_or(false, |r| !r.is_deleted())
1603                })
1604            })
1605            .count()
1606    }
1607
1608    /// Returns all node IDs in the store.
1609    ///
1610    /// This returns a snapshot of current node IDs. The returned vector
1611    /// excludes deleted nodes. Results are sorted by NodeId for deterministic
1612    /// iteration order.
1613    #[must_use]
1614    #[cfg(not(feature = "tiered-storage"))]
1615    pub fn node_ids(&self) -> Vec<NodeId> {
1616        let epoch = self.current_epoch();
1617        let mut ids: Vec<NodeId> = self
1618            .nodes
1619            .read()
1620            .iter()
1621            .filter_map(|(id, chain)| {
1622                chain
1623                    .visible_at(epoch)
1624                    .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
1625            })
1626            .collect();
1627        ids.sort_unstable();
1628        ids
1629    }
1630
1631    /// Returns all node IDs in the store.
1632    /// (Tiered storage version)
1633    #[must_use]
1634    #[cfg(feature = "tiered-storage")]
1635    pub fn node_ids(&self) -> Vec<NodeId> {
1636        let epoch = self.current_epoch();
1637        let versions = self.node_versions.read();
1638        let mut ids: Vec<NodeId> = versions
1639            .iter()
1640            .filter_map(|(id, index)| {
1641                index.visible_at(epoch).and_then(|vref| {
1642                    self.read_node_record(&vref)
1643                        .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
1644                })
1645            })
1646            .collect();
1647        ids.sort_unstable();
1648        ids
1649    }
1650
1651    // === Edge Operations ===
1652
1653    /// Creates a new edge.
1654    pub fn create_edge(&self, src: NodeId, dst: NodeId, edge_type: &str) -> EdgeId {
1655        self.needs_stats_recompute.store(true, Ordering::Relaxed);
1656        self.create_edge_versioned(src, dst, edge_type, self.current_epoch(), TxId::SYSTEM)
1657    }
1658
1659    /// Creates a new edge within a transaction context.
1660    #[cfg(not(feature = "tiered-storage"))]
1661    pub fn create_edge_versioned(
1662        &self,
1663        src: NodeId,
1664        dst: NodeId,
1665        edge_type: &str,
1666        epoch: EpochId,
1667        tx_id: TxId,
1668    ) -> EdgeId {
1669        let id = EdgeId::new(self.next_edge_id.fetch_add(1, Ordering::Relaxed));
1670        let type_id = self.get_or_create_edge_type_id(edge_type);
1671
1672        let record = EdgeRecord::new(id, src, dst, type_id, epoch);
1673        let chain = VersionChain::with_initial(record, epoch, tx_id);
1674        self.edges.write().insert(id, chain);
1675
1676        // Update adjacency
1677        self.forward_adj.add_edge(src, dst, id);
1678        if let Some(ref backward) = self.backward_adj {
1679            backward.add_edge(dst, src, id);
1680        }
1681
1682        id
1683    }
1684
1685    /// Creates a new edge within a transaction context.
1686    /// (Tiered storage version)
1687    #[cfg(feature = "tiered-storage")]
1688    pub fn create_edge_versioned(
1689        &self,
1690        src: NodeId,
1691        dst: NodeId,
1692        edge_type: &str,
1693        epoch: EpochId,
1694        tx_id: TxId,
1695    ) -> EdgeId {
1696        let id = EdgeId::new(self.next_edge_id.fetch_add(1, Ordering::Relaxed));
1697        let type_id = self.get_or_create_edge_type_id(edge_type);
1698
1699        let record = EdgeRecord::new(id, src, dst, type_id, epoch);
1700
1701        // Allocate record in arena and get offset (create epoch if needed)
1702        let arena = self.arena_allocator.arena_or_create(epoch);
1703        let (offset, _stored) = arena.alloc_value_with_offset(record);
1704
1705        // Create HotVersionRef pointing to arena data
1706        let hot_ref = HotVersionRef::new(epoch, offset, tx_id);
1707
1708        // Create or update version index
1709        let mut versions = self.edge_versions.write();
1710        if let Some(index) = versions.get_mut(&id) {
1711            index.add_hot(hot_ref);
1712        } else {
1713            versions.insert(id, VersionIndex::with_initial(hot_ref));
1714        }
1715
1716        // Update adjacency
1717        self.forward_adj.add_edge(src, dst, id);
1718        if let Some(ref backward) = self.backward_adj {
1719            backward.add_edge(dst, src, id);
1720        }
1721
1722        id
1723    }
1724
1725    /// Creates a new edge with properties.
1726    pub fn create_edge_with_props(
1727        &self,
1728        src: NodeId,
1729        dst: NodeId,
1730        edge_type: &str,
1731        properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
1732    ) -> EdgeId {
1733        let id = self.create_edge(src, dst, edge_type);
1734
1735        for (key, value) in properties {
1736            self.edge_properties.set(id, key.into(), value.into());
1737        }
1738
1739        id
1740    }
1741
1742    /// Gets an edge by ID (latest visible version).
1743    #[must_use]
1744    pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
1745        self.get_edge_at_epoch(id, self.current_epoch())
1746    }
1747
1748    /// Gets an edge by ID at a specific epoch.
1749    #[must_use]
1750    #[cfg(not(feature = "tiered-storage"))]
1751    pub fn get_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> Option<Edge> {
1752        let edges = self.edges.read();
1753        let chain = edges.get(&id)?;
1754        let record = chain.visible_at(epoch)?;
1755
1756        if record.is_deleted() {
1757            return None;
1758        }
1759
1760        let edge_type = {
1761            let id_to_type = self.id_to_edge_type.read();
1762            id_to_type.get(record.type_id as usize)?.clone()
1763        };
1764
1765        let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1766
1767        // Get properties
1768        edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1769
1770        Some(edge)
1771    }
1772
1773    /// Gets an edge by ID at a specific epoch.
1774    /// (Tiered storage version)
1775    #[must_use]
1776    #[cfg(feature = "tiered-storage")]
1777    pub fn get_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> Option<Edge> {
1778        let versions = self.edge_versions.read();
1779        let index = versions.get(&id)?;
1780        let version_ref = index.visible_at(epoch)?;
1781
1782        let record = self.read_edge_record(&version_ref)?;
1783
1784        if record.is_deleted() {
1785            return None;
1786        }
1787
1788        let edge_type = {
1789            let id_to_type = self.id_to_edge_type.read();
1790            id_to_type.get(record.type_id as usize)?.clone()
1791        };
1792
1793        let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1794
1795        // Get properties
1796        edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1797
1798        Some(edge)
1799    }
1800
1801    /// Gets an edge visible to a specific transaction.
1802    #[must_use]
1803    #[cfg(not(feature = "tiered-storage"))]
1804    pub fn get_edge_versioned(&self, id: EdgeId, epoch: EpochId, tx_id: TxId) -> Option<Edge> {
1805        let edges = self.edges.read();
1806        let chain = edges.get(&id)?;
1807        let record = chain.visible_to(epoch, tx_id)?;
1808
1809        if record.is_deleted() {
1810            return None;
1811        }
1812
1813        let edge_type = {
1814            let id_to_type = self.id_to_edge_type.read();
1815            id_to_type.get(record.type_id as usize)?.clone()
1816        };
1817
1818        let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1819
1820        // Get properties
1821        edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1822
1823        Some(edge)
1824    }
1825
1826    /// Gets an edge visible to a specific transaction.
1827    /// (Tiered storage version)
1828    #[must_use]
1829    #[cfg(feature = "tiered-storage")]
1830    pub fn get_edge_versioned(&self, id: EdgeId, epoch: EpochId, tx_id: TxId) -> Option<Edge> {
1831        let versions = self.edge_versions.read();
1832        let index = versions.get(&id)?;
1833        let version_ref = index.visible_to(epoch, tx_id)?;
1834
1835        let record = self.read_edge_record(&version_ref)?;
1836
1837        if record.is_deleted() {
1838            return None;
1839        }
1840
1841        let edge_type = {
1842            let id_to_type = self.id_to_edge_type.read();
1843            id_to_type.get(record.type_id as usize)?.clone()
1844        };
1845
1846        let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1847
1848        // Get properties
1849        edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1850
1851        Some(edge)
1852    }
1853
1854    /// Reads an EdgeRecord from arena using a VersionRef.
1855    #[cfg(feature = "tiered-storage")]
1856    #[allow(unsafe_code)]
1857    fn read_edge_record(&self, version_ref: &VersionRef) -> Option<EdgeRecord> {
1858        match version_ref {
1859            VersionRef::Hot(hot_ref) => {
1860                let arena = self.arena_allocator.arena(hot_ref.epoch);
1861                // SAFETY: The offset was returned by alloc_value_with_offset for an EdgeRecord
1862                let record: &EdgeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
1863                Some(*record)
1864            }
1865            VersionRef::Cold(cold_ref) => {
1866                // Read from compressed epoch store
1867                self.epoch_store
1868                    .get_edge(cold_ref.epoch, cold_ref.block_offset, cold_ref.length)
1869            }
1870        }
1871    }
1872
1873    /// Deletes an edge (using latest epoch).
1874    pub fn delete_edge(&self, id: EdgeId) -> bool {
1875        self.needs_stats_recompute.store(true, Ordering::Relaxed);
1876        self.delete_edge_at_epoch(id, self.current_epoch())
1877    }
1878
1879    /// Deletes an edge at a specific epoch.
1880    #[cfg(not(feature = "tiered-storage"))]
1881    pub fn delete_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> bool {
1882        let mut edges = self.edges.write();
1883        if let Some(chain) = edges.get_mut(&id) {
1884            // Get the visible record to check if deleted and get src/dst
1885            let (src, dst) = {
1886                match chain.visible_at(epoch) {
1887                    Some(record) => {
1888                        if record.is_deleted() {
1889                            return false;
1890                        }
1891                        (record.src, record.dst)
1892                    }
1893                    None => return false, // Not visible at this epoch (already deleted)
1894                }
1895            };
1896
1897            // Mark the version chain as deleted
1898            chain.mark_deleted(epoch);
1899
1900            drop(edges); // Release lock
1901
1902            // Mark as deleted in adjacency (soft delete)
1903            self.forward_adj.mark_deleted(src, id);
1904            if let Some(ref backward) = self.backward_adj {
1905                backward.mark_deleted(dst, id);
1906            }
1907
1908            // Remove properties
1909            self.edge_properties.remove_all(id);
1910
1911            true
1912        } else {
1913            false
1914        }
1915    }
1916
1917    /// Deletes an edge at a specific epoch.
1918    /// (Tiered storage version)
1919    #[cfg(feature = "tiered-storage")]
1920    pub fn delete_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> bool {
1921        let mut versions = self.edge_versions.write();
1922        if let Some(index) = versions.get_mut(&id) {
1923            // Get the visible record to check if deleted and get src/dst
1924            let (src, dst) = {
1925                match index.visible_at(epoch) {
1926                    Some(version_ref) => {
1927                        if let Some(record) = self.read_edge_record(&version_ref) {
1928                            if record.is_deleted() {
1929                                return false;
1930                            }
1931                            (record.src, record.dst)
1932                        } else {
1933                            return false;
1934                        }
1935                    }
1936                    None => return false,
1937                }
1938            };
1939
1940            // Mark as deleted in version index
1941            index.mark_deleted(epoch);
1942
1943            drop(versions); // Release lock
1944
1945            // Mark as deleted in adjacency (soft delete)
1946            self.forward_adj.mark_deleted(src, id);
1947            if let Some(ref backward) = self.backward_adj {
1948                backward.mark_deleted(dst, id);
1949            }
1950
1951            // Remove properties
1952            self.edge_properties.remove_all(id);
1953
1954            true
1955        } else {
1956            false
1957        }
1958    }
1959
1960    /// Returns the number of edges (non-deleted at current epoch).
1961    #[must_use]
1962    #[cfg(not(feature = "tiered-storage"))]
1963    pub fn edge_count(&self) -> usize {
1964        let epoch = self.current_epoch();
1965        self.edges
1966            .read()
1967            .values()
1968            .filter_map(|chain| chain.visible_at(epoch))
1969            .filter(|r| !r.is_deleted())
1970            .count()
1971    }
1972
1973    /// Returns the number of edges (non-deleted at current epoch).
1974    /// (Tiered storage version)
1975    #[must_use]
1976    #[cfg(feature = "tiered-storage")]
1977    pub fn edge_count(&self) -> usize {
1978        let epoch = self.current_epoch();
1979        let versions = self.edge_versions.read();
1980        versions
1981            .iter()
1982            .filter(|(_, index)| {
1983                index.visible_at(epoch).map_or(false, |vref| {
1984                    self.read_edge_record(&vref)
1985                        .map_or(false, |r| !r.is_deleted())
1986                })
1987            })
1988            .count()
1989    }
1990
1991    /// Discards all uncommitted versions created by a transaction.
1992    ///
1993    /// This is called during transaction rollback to clean up uncommitted changes.
1994    /// The method removes version chain entries created by the specified transaction.
1995    #[cfg(not(feature = "tiered-storage"))]
1996    pub fn discard_uncommitted_versions(&self, tx_id: TxId) {
1997        // Remove uncommitted node versions
1998        {
1999            let mut nodes = self.nodes.write();
2000            for chain in nodes.values_mut() {
2001                chain.remove_versions_by(tx_id);
2002            }
2003            // Remove completely empty chains (no versions left)
2004            nodes.retain(|_, chain| !chain.is_empty());
2005        }
2006
2007        // Remove uncommitted edge versions
2008        {
2009            let mut edges = self.edges.write();
2010            for chain in edges.values_mut() {
2011                chain.remove_versions_by(tx_id);
2012            }
2013            // Remove completely empty chains (no versions left)
2014            edges.retain(|_, chain| !chain.is_empty());
2015        }
2016    }
2017
2018    /// Discards all uncommitted versions created by a transaction.
2019    /// (Tiered storage version)
2020    #[cfg(feature = "tiered-storage")]
2021    pub fn discard_uncommitted_versions(&self, tx_id: TxId) {
2022        // Remove uncommitted node versions
2023        {
2024            let mut versions = self.node_versions.write();
2025            for index in versions.values_mut() {
2026                index.remove_versions_by(tx_id);
2027            }
2028            // Remove completely empty indexes (no versions left)
2029            versions.retain(|_, index| !index.is_empty());
2030        }
2031
2032        // Remove uncommitted edge versions
2033        {
2034            let mut versions = self.edge_versions.write();
2035            for index in versions.values_mut() {
2036                index.remove_versions_by(tx_id);
2037            }
2038            // Remove completely empty indexes (no versions left)
2039            versions.retain(|_, index| !index.is_empty());
2040        }
2041    }
2042
2043    /// Freezes an epoch from hot (arena) storage to cold (compressed) storage.
2044    ///
2045    /// This is called by the transaction manager when an epoch becomes eligible
2046    /// for freezing (no active transactions can see it). The freeze process:
2047    ///
2048    /// 1. Collects all hot version refs for the epoch
2049    /// 2. Reads the corresponding records from arena
2050    /// 3. Compresses them into a `CompressedEpochBlock`
2051    /// 4. Updates `VersionIndex` entries to point to cold storage
2052    /// 5. The arena can be deallocated after all epochs in it are frozen
2053    ///
2054    /// # Arguments
2055    ///
2056    /// * `epoch` - The epoch to freeze
2057    ///
2058    /// # Returns
2059    ///
2060    /// The number of records frozen (nodes + edges).
2061    #[cfg(feature = "tiered-storage")]
2062    #[allow(unsafe_code)]
2063    pub fn freeze_epoch(&self, epoch: EpochId) -> usize {
2064        // Collect node records to freeze
2065        let mut node_records: Vec<(u64, NodeRecord)> = Vec::new();
2066        let mut node_hot_refs: Vec<(NodeId, HotVersionRef)> = Vec::new();
2067
2068        {
2069            let versions = self.node_versions.read();
2070            for (node_id, index) in versions.iter() {
2071                for hot_ref in index.hot_refs_for_epoch(epoch) {
2072                    let arena = self.arena_allocator.arena(hot_ref.epoch);
2073                    // SAFETY: The offset was returned by alloc_value_with_offset for a NodeRecord
2074                    let record: &NodeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
2075                    node_records.push((node_id.as_u64(), *record));
2076                    node_hot_refs.push((*node_id, *hot_ref));
2077                }
2078            }
2079        }
2080
2081        // Collect edge records to freeze
2082        let mut edge_records: Vec<(u64, EdgeRecord)> = Vec::new();
2083        let mut edge_hot_refs: Vec<(EdgeId, HotVersionRef)> = Vec::new();
2084
2085        {
2086            let versions = self.edge_versions.read();
2087            for (edge_id, index) in versions.iter() {
2088                for hot_ref in index.hot_refs_for_epoch(epoch) {
2089                    let arena = self.arena_allocator.arena(hot_ref.epoch);
2090                    // SAFETY: The offset was returned by alloc_value_with_offset for an EdgeRecord
2091                    let record: &EdgeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
2092                    edge_records.push((edge_id.as_u64(), *record));
2093                    edge_hot_refs.push((*edge_id, *hot_ref));
2094                }
2095            }
2096        }
2097
2098        let total_frozen = node_records.len() + edge_records.len();
2099
2100        if total_frozen == 0 {
2101            return 0;
2102        }
2103
2104        // Freeze to compressed storage
2105        let (node_entries, edge_entries) =
2106            self.epoch_store
2107                .freeze_epoch(epoch, node_records, edge_records);
2108
2109        // Build lookup maps for index entries
2110        let node_entry_map: FxHashMap<u64, _> = node_entries
2111            .iter()
2112            .map(|e| (e.entity_id, (e.offset, e.length)))
2113            .collect();
2114        let edge_entry_map: FxHashMap<u64, _> = edge_entries
2115            .iter()
2116            .map(|e| (e.entity_id, (e.offset, e.length)))
2117            .collect();
2118
2119        // Update version indexes to use cold refs
2120        {
2121            let mut versions = self.node_versions.write();
2122            for (node_id, hot_ref) in &node_hot_refs {
2123                if let Some(index) = versions.get_mut(node_id) {
2124                    if let Some(&(offset, length)) = node_entry_map.get(&node_id.as_u64()) {
2125                        let cold_ref = ColdVersionRef {
2126                            epoch,
2127                            block_offset: offset,
2128                            length,
2129                            created_by: hot_ref.created_by,
2130                            deleted_epoch: hot_ref.deleted_epoch,
2131                        };
2132                        index.freeze_epoch(epoch, std::iter::once(cold_ref));
2133                    }
2134                }
2135            }
2136        }
2137
2138        {
2139            let mut versions = self.edge_versions.write();
2140            for (edge_id, hot_ref) in &edge_hot_refs {
2141                if let Some(index) = versions.get_mut(edge_id) {
2142                    if let Some(&(offset, length)) = edge_entry_map.get(&edge_id.as_u64()) {
2143                        let cold_ref = ColdVersionRef {
2144                            epoch,
2145                            block_offset: offset,
2146                            length,
2147                            created_by: hot_ref.created_by,
2148                            deleted_epoch: hot_ref.deleted_epoch,
2149                        };
2150                        index.freeze_epoch(epoch, std::iter::once(cold_ref));
2151                    }
2152                }
2153            }
2154        }
2155
2156        total_frozen
2157    }
2158
2159    /// Returns the epoch store for cold storage statistics.
2160    #[cfg(feature = "tiered-storage")]
2161    #[must_use]
2162    pub fn epoch_store(&self) -> &EpochStore {
2163        &self.epoch_store
2164    }
2165
2166    /// Returns the number of distinct labels in the store.
2167    #[must_use]
2168    pub fn label_count(&self) -> usize {
2169        self.id_to_label.read().len()
2170    }
2171
2172    /// Returns the number of distinct property keys in the store.
2173    ///
2174    /// This counts unique property keys across both nodes and edges.
2175    #[must_use]
2176    pub fn property_key_count(&self) -> usize {
2177        let node_keys = self.node_properties.column_count();
2178        let edge_keys = self.edge_properties.column_count();
2179        // Note: This may count some keys twice if the same key is used
2180        // for both nodes and edges. A more precise count would require
2181        // tracking unique keys across both storages.
2182        node_keys + edge_keys
2183    }
2184
2185    /// Returns the number of distinct edge types in the store.
2186    #[must_use]
2187    pub fn edge_type_count(&self) -> usize {
2188        self.id_to_edge_type.read().len()
2189    }
2190
2191    // === Traversal ===
2192
2193    /// Iterates over neighbors of a node in the specified direction.
2194    ///
2195    /// This is the fast path for graph traversal - goes straight to the
2196    /// adjacency index without loading full node data.
2197    pub fn neighbors(
2198        &self,
2199        node: NodeId,
2200        direction: Direction,
2201    ) -> impl Iterator<Item = NodeId> + '_ {
2202        let forward: Box<dyn Iterator<Item = NodeId>> = match direction {
2203            Direction::Outgoing | Direction::Both => {
2204                Box::new(self.forward_adj.neighbors(node).into_iter())
2205            }
2206            Direction::Incoming => Box::new(std::iter::empty()),
2207        };
2208
2209        let backward: Box<dyn Iterator<Item = NodeId>> = match direction {
2210            Direction::Incoming | Direction::Both => {
2211                if let Some(ref adj) = self.backward_adj {
2212                    Box::new(adj.neighbors(node).into_iter())
2213                } else {
2214                    Box::new(std::iter::empty())
2215                }
2216            }
2217            Direction::Outgoing => Box::new(std::iter::empty()),
2218        };
2219
2220        forward.chain(backward)
2221    }
2222
2223    /// Returns edges from a node with their targets.
2224    ///
2225    /// Returns an iterator of (target_node, edge_id) pairs.
2226    pub fn edges_from(
2227        &self,
2228        node: NodeId,
2229        direction: Direction,
2230    ) -> impl Iterator<Item = (NodeId, EdgeId)> + '_ {
2231        let forward: Box<dyn Iterator<Item = (NodeId, EdgeId)>> = match direction {
2232            Direction::Outgoing | Direction::Both => {
2233                Box::new(self.forward_adj.edges_from(node).into_iter())
2234            }
2235            Direction::Incoming => Box::new(std::iter::empty()),
2236        };
2237
2238        let backward: Box<dyn Iterator<Item = (NodeId, EdgeId)>> = match direction {
2239            Direction::Incoming | Direction::Both => {
2240                if let Some(ref adj) = self.backward_adj {
2241                    Box::new(adj.edges_from(node).into_iter())
2242                } else {
2243                    Box::new(std::iter::empty())
2244                }
2245            }
2246            Direction::Outgoing => Box::new(std::iter::empty()),
2247        };
2248
2249        forward.chain(backward)
2250    }
2251
2252    /// Returns edges to a node (where the node is the destination).
2253    ///
2254    /// Returns (source_node, edge_id) pairs for all edges pointing TO this node.
2255    /// Uses the backward adjacency index for O(degree) lookup.
2256    ///
2257    /// # Example
2258    ///
2259    /// ```ignore
2260    /// // For edges: A->B, C->B
2261    /// let incoming = store.edges_to(B);
2262    /// // Returns: [(A, edge1), (C, edge2)]
2263    /// ```
2264    pub fn edges_to(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
2265        if let Some(ref backward) = self.backward_adj {
2266            backward.edges_from(node)
2267        } else {
2268            // Fallback: scan all edges (slow but correct)
2269            self.all_edges()
2270                .filter_map(|edge| {
2271                    if edge.dst == node {
2272                        Some((edge.src, edge.id))
2273                    } else {
2274                        None
2275                    }
2276                })
2277                .collect()
2278        }
2279    }
2280
2281    /// Returns the out-degree of a node (number of outgoing edges).
2282    ///
2283    /// Uses the forward adjacency index for O(1) lookup.
2284    #[must_use]
2285    pub fn out_degree(&self, node: NodeId) -> usize {
2286        self.forward_adj.out_degree(node)
2287    }
2288
2289    /// Returns the in-degree of a node (number of incoming edges).
2290    ///
2291    /// Uses the backward adjacency index for O(1) lookup if available,
2292    /// otherwise falls back to scanning edges.
2293    #[must_use]
2294    pub fn in_degree(&self, node: NodeId) -> usize {
2295        if let Some(ref backward) = self.backward_adj {
2296            backward.in_degree(node)
2297        } else {
2298            // Fallback: count edges (slow)
2299            self.all_edges().filter(|edge| edge.dst == node).count()
2300        }
2301    }
2302
2303    /// Gets the type of an edge by ID.
2304    #[must_use]
2305    #[cfg(not(feature = "tiered-storage"))]
2306    pub fn edge_type(&self, id: EdgeId) -> Option<ArcStr> {
2307        let edges = self.edges.read();
2308        let chain = edges.get(&id)?;
2309        let epoch = self.current_epoch();
2310        let record = chain.visible_at(epoch)?;
2311        let id_to_type = self.id_to_edge_type.read();
2312        id_to_type.get(record.type_id as usize).cloned()
2313    }
2314
2315    /// Gets the type of an edge by ID.
2316    /// (Tiered storage version)
2317    #[must_use]
2318    #[cfg(feature = "tiered-storage")]
2319    pub fn edge_type(&self, id: EdgeId) -> Option<ArcStr> {
2320        let versions = self.edge_versions.read();
2321        let index = versions.get(&id)?;
2322        let epoch = self.current_epoch();
2323        let vref = index.visible_at(epoch)?;
2324        let record = self.read_edge_record(&vref)?;
2325        let id_to_type = self.id_to_edge_type.read();
2326        id_to_type.get(record.type_id as usize).cloned()
2327    }
2328
2329    /// Returns all nodes with a specific label.
2330    ///
2331    /// Uses the label index for O(1) lookup per label. Returns a snapshot -
2332    /// concurrent modifications won't affect the returned vector. Results are
2333    /// sorted by NodeId for deterministic iteration order.
2334    pub fn nodes_by_label(&self, label: &str) -> Vec<NodeId> {
2335        let label_to_id = self.label_to_id.read();
2336        if let Some(&label_id) = label_to_id.get(label) {
2337            let index = self.label_index.read();
2338            if let Some(set) = index.get(label_id as usize) {
2339                let mut ids: Vec<NodeId> = set.keys().copied().collect();
2340                ids.sort_unstable();
2341                return ids;
2342            }
2343        }
2344        Vec::new()
2345    }
2346
2347    // === Admin API: Iteration ===
2348
2349    /// Returns an iterator over all nodes in the database.
2350    ///
2351    /// This creates a snapshot of all visible nodes at the current epoch.
2352    /// Useful for dump/export operations.
2353    #[cfg(not(feature = "tiered-storage"))]
2354    pub fn all_nodes(&self) -> impl Iterator<Item = Node> + '_ {
2355        let epoch = self.current_epoch();
2356        let node_ids: Vec<NodeId> = self
2357            .nodes
2358            .read()
2359            .iter()
2360            .filter_map(|(id, chain)| {
2361                chain
2362                    .visible_at(epoch)
2363                    .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
2364            })
2365            .collect();
2366
2367        node_ids.into_iter().filter_map(move |id| self.get_node(id))
2368    }
2369
2370    /// Returns an iterator over all nodes in the database.
2371    /// (Tiered storage version)
2372    #[cfg(feature = "tiered-storage")]
2373    pub fn all_nodes(&self) -> impl Iterator<Item = Node> + '_ {
2374        let node_ids = self.node_ids();
2375        node_ids.into_iter().filter_map(move |id| self.get_node(id))
2376    }
2377
2378    /// Returns an iterator over all edges in the database.
2379    ///
2380    /// This creates a snapshot of all visible edges at the current epoch.
2381    /// Useful for dump/export operations.
2382    #[cfg(not(feature = "tiered-storage"))]
2383    pub fn all_edges(&self) -> impl Iterator<Item = Edge> + '_ {
2384        let epoch = self.current_epoch();
2385        let edge_ids: Vec<EdgeId> = self
2386            .edges
2387            .read()
2388            .iter()
2389            .filter_map(|(id, chain)| {
2390                chain
2391                    .visible_at(epoch)
2392                    .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
2393            })
2394            .collect();
2395
2396        edge_ids.into_iter().filter_map(move |id| self.get_edge(id))
2397    }
2398
2399    /// Returns an iterator over all edges in the database.
2400    /// (Tiered storage version)
2401    #[cfg(feature = "tiered-storage")]
2402    pub fn all_edges(&self) -> impl Iterator<Item = Edge> + '_ {
2403        let epoch = self.current_epoch();
2404        let versions = self.edge_versions.read();
2405        let edge_ids: Vec<EdgeId> = versions
2406            .iter()
2407            .filter_map(|(id, index)| {
2408                index.visible_at(epoch).and_then(|vref| {
2409                    self.read_edge_record(&vref)
2410                        .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
2411                })
2412            })
2413            .collect();
2414
2415        edge_ids.into_iter().filter_map(move |id| self.get_edge(id))
2416    }
2417
2418    /// Returns all label names in the database.
2419    pub fn all_labels(&self) -> Vec<String> {
2420        self.id_to_label
2421            .read()
2422            .iter()
2423            .map(|s| s.to_string())
2424            .collect()
2425    }
2426
2427    /// Returns all edge type names in the database.
2428    pub fn all_edge_types(&self) -> Vec<String> {
2429        self.id_to_edge_type
2430            .read()
2431            .iter()
2432            .map(|s| s.to_string())
2433            .collect()
2434    }
2435
2436    /// Returns all property keys used in the database.
2437    pub fn all_property_keys(&self) -> Vec<String> {
2438        let mut keys = std::collections::HashSet::new();
2439        for key in self.node_properties.keys() {
2440            keys.insert(key.to_string());
2441        }
2442        for key in self.edge_properties.keys() {
2443            keys.insert(key.to_string());
2444        }
2445        keys.into_iter().collect()
2446    }
2447
2448    /// Returns an iterator over nodes with a specific label.
2449    pub fn nodes_with_label<'a>(&'a self, label: &str) -> impl Iterator<Item = Node> + 'a {
2450        let node_ids = self.nodes_by_label(label);
2451        node_ids.into_iter().filter_map(move |id| self.get_node(id))
2452    }
2453
2454    /// Returns an iterator over edges with a specific type.
2455    #[cfg(not(feature = "tiered-storage"))]
2456    pub fn edges_with_type<'a>(&'a self, edge_type: &str) -> impl Iterator<Item = Edge> + 'a {
2457        let epoch = self.current_epoch();
2458        let type_to_id = self.edge_type_to_id.read();
2459
2460        if let Some(&type_id) = type_to_id.get(edge_type) {
2461            let edge_ids: Vec<EdgeId> = self
2462                .edges
2463                .read()
2464                .iter()
2465                .filter_map(|(id, chain)| {
2466                    chain.visible_at(epoch).and_then(|r| {
2467                        if !r.is_deleted() && r.type_id == type_id {
2468                            Some(*id)
2469                        } else {
2470                            None
2471                        }
2472                    })
2473                })
2474                .collect();
2475
2476            // Return a boxed iterator for the found edges
2477            Box::new(edge_ids.into_iter().filter_map(move |id| self.get_edge(id)))
2478                as Box<dyn Iterator<Item = Edge> + 'a>
2479        } else {
2480            // Return empty iterator
2481            Box::new(std::iter::empty()) as Box<dyn Iterator<Item = Edge> + 'a>
2482        }
2483    }
2484
2485    /// Returns an iterator over edges with a specific type.
2486    /// (Tiered storage version)
2487    #[cfg(feature = "tiered-storage")]
2488    pub fn edges_with_type<'a>(&'a self, edge_type: &str) -> impl Iterator<Item = Edge> + 'a {
2489        let epoch = self.current_epoch();
2490        let type_to_id = self.edge_type_to_id.read();
2491
2492        if let Some(&type_id) = type_to_id.get(edge_type) {
2493            let versions = self.edge_versions.read();
2494            let edge_ids: Vec<EdgeId> = versions
2495                .iter()
2496                .filter_map(|(id, index)| {
2497                    index.visible_at(epoch).and_then(|vref| {
2498                        self.read_edge_record(&vref).and_then(|r| {
2499                            if !r.is_deleted() && r.type_id == type_id {
2500                                Some(*id)
2501                            } else {
2502                                None
2503                            }
2504                        })
2505                    })
2506                })
2507                .collect();
2508
2509            Box::new(edge_ids.into_iter().filter_map(move |id| self.get_edge(id)))
2510                as Box<dyn Iterator<Item = Edge> + 'a>
2511        } else {
2512            Box::new(std::iter::empty()) as Box<dyn Iterator<Item = Edge> + 'a>
2513        }
2514    }
2515
2516    // === Zone Map Support ===
2517
2518    /// Checks if a node property predicate might match any nodes.
2519    ///
2520    /// Uses zone maps for early filtering. Returns `true` if there might be
2521    /// matching nodes, `false` if there definitely aren't.
2522    #[must_use]
2523    pub fn node_property_might_match(
2524        &self,
2525        property: &PropertyKey,
2526        op: CompareOp,
2527        value: &Value,
2528    ) -> bool {
2529        self.node_properties.might_match(property, op, value)
2530    }
2531
2532    /// Checks if an edge property predicate might match any edges.
2533    #[must_use]
2534    pub fn edge_property_might_match(
2535        &self,
2536        property: &PropertyKey,
2537        op: CompareOp,
2538        value: &Value,
2539    ) -> bool {
2540        self.edge_properties.might_match(property, op, value)
2541    }
2542
2543    /// Gets the zone map for a node property.
2544    #[must_use]
2545    pub fn node_property_zone_map(&self, property: &PropertyKey) -> Option<ZoneMapEntry> {
2546        self.node_properties.zone_map(property)
2547    }
2548
2549    /// Gets the zone map for an edge property.
2550    #[must_use]
2551    pub fn edge_property_zone_map(&self, property: &PropertyKey) -> Option<ZoneMapEntry> {
2552        self.edge_properties.zone_map(property)
2553    }
2554
2555    /// Rebuilds zone maps for all properties.
2556    pub fn rebuild_zone_maps(&self) {
2557        self.node_properties.rebuild_zone_maps();
2558        self.edge_properties.rebuild_zone_maps();
2559    }
2560
2561    // === Statistics ===
2562
2563    /// Returns the current statistics.
2564    #[must_use]
2565    pub fn statistics(&self) -> Statistics {
2566        self.statistics.read().clone()
2567    }
2568
2569    /// Recomputes statistics if they are stale (i.e., after mutations).
2570    ///
2571    /// Call this before reading statistics for query optimization.
2572    /// Avoids redundant recomputation if no mutations occurred.
2573    pub fn ensure_statistics_fresh(&self) {
2574        if self.needs_stats_recompute.swap(false, Ordering::Relaxed) {
2575            self.compute_statistics();
2576        }
2577    }
2578
2579    /// Recomputes statistics from current data.
2580    ///
2581    /// Scans all labels and edge types to build cardinality estimates for the
2582    /// query optimizer. Call this periodically or after bulk data loads.
2583    #[cfg(not(feature = "tiered-storage"))]
2584    pub fn compute_statistics(&self) {
2585        let mut stats = Statistics::new();
2586
2587        // Compute total counts
2588        stats.total_nodes = self.node_count() as u64;
2589        stats.total_edges = self.edge_count() as u64;
2590
2591        // Compute per-label statistics
2592        let id_to_label = self.id_to_label.read();
2593        let label_index = self.label_index.read();
2594
2595        for (label_id, label_name) in id_to_label.iter().enumerate() {
2596            let node_count = label_index
2597                .get(label_id)
2598                .map(|set| set.len() as u64)
2599                .unwrap_or(0);
2600
2601            if node_count > 0 {
2602                // Estimate average degree
2603                let avg_out_degree = if stats.total_nodes > 0 {
2604                    stats.total_edges as f64 / stats.total_nodes as f64
2605                } else {
2606                    0.0
2607                };
2608
2609                let label_stats =
2610                    LabelStatistics::new(node_count).with_degrees(avg_out_degree, avg_out_degree);
2611
2612                stats.update_label(label_name.as_ref(), label_stats);
2613            }
2614        }
2615
2616        // Compute per-edge-type statistics
2617        let id_to_edge_type = self.id_to_edge_type.read();
2618        let edges = self.edges.read();
2619        let epoch = self.current_epoch();
2620
2621        let mut edge_type_counts: FxHashMap<u32, u64> = FxHashMap::default();
2622        for chain in edges.values() {
2623            if let Some(record) = chain.visible_at(epoch) {
2624                if !record.is_deleted() {
2625                    *edge_type_counts.entry(record.type_id).or_default() += 1;
2626                }
2627            }
2628        }
2629
2630        for (type_id, count) in edge_type_counts {
2631            if let Some(type_name) = id_to_edge_type.get(type_id as usize) {
2632                let avg_degree = if stats.total_nodes > 0 {
2633                    count as f64 / stats.total_nodes as f64
2634                } else {
2635                    0.0
2636                };
2637
2638                let edge_stats = EdgeTypeStatistics::new(count, avg_degree, avg_degree);
2639                stats.update_edge_type(type_name.as_ref(), edge_stats);
2640            }
2641        }
2642
2643        *self.statistics.write() = stats;
2644    }
2645
2646    /// Recomputes statistics from current data.
2647    /// (Tiered storage version)
2648    #[cfg(feature = "tiered-storage")]
2649    pub fn compute_statistics(&self) {
2650        let mut stats = Statistics::new();
2651
2652        // Compute total counts
2653        stats.total_nodes = self.node_count() as u64;
2654        stats.total_edges = self.edge_count() as u64;
2655
2656        // Compute per-label statistics
2657        let id_to_label = self.id_to_label.read();
2658        let label_index = self.label_index.read();
2659
2660        for (label_id, label_name) in id_to_label.iter().enumerate() {
2661            let node_count = label_index
2662                .get(label_id)
2663                .map(|set| set.len() as u64)
2664                .unwrap_or(0);
2665
2666            if node_count > 0 {
2667                let avg_out_degree = if stats.total_nodes > 0 {
2668                    stats.total_edges as f64 / stats.total_nodes as f64
2669                } else {
2670                    0.0
2671                };
2672
2673                let label_stats =
2674                    LabelStatistics::new(node_count).with_degrees(avg_out_degree, avg_out_degree);
2675
2676                stats.update_label(label_name.as_ref(), label_stats);
2677            }
2678        }
2679
2680        // Compute per-edge-type statistics
2681        let id_to_edge_type = self.id_to_edge_type.read();
2682        let versions = self.edge_versions.read();
2683        let epoch = self.current_epoch();
2684
2685        let mut edge_type_counts: FxHashMap<u32, u64> = FxHashMap::default();
2686        for index in versions.values() {
2687            if let Some(vref) = index.visible_at(epoch) {
2688                if let Some(record) = self.read_edge_record(&vref) {
2689                    if !record.is_deleted() {
2690                        *edge_type_counts.entry(record.type_id).or_default() += 1;
2691                    }
2692                }
2693            }
2694        }
2695
2696        for (type_id, count) in edge_type_counts {
2697            if let Some(type_name) = id_to_edge_type.get(type_id as usize) {
2698                let avg_degree = if stats.total_nodes > 0 {
2699                    count as f64 / stats.total_nodes as f64
2700                } else {
2701                    0.0
2702                };
2703
2704                let edge_stats = EdgeTypeStatistics::new(count, avg_degree, avg_degree);
2705                stats.update_edge_type(type_name.as_ref(), edge_stats);
2706            }
2707        }
2708
2709        *self.statistics.write() = stats;
2710    }
2711
2712    /// Estimates cardinality for a label scan.
2713    #[must_use]
2714    pub fn estimate_label_cardinality(&self, label: &str) -> f64 {
2715        self.statistics.read().estimate_label_cardinality(label)
2716    }
2717
2718    /// Estimates average degree for an edge type.
2719    #[must_use]
2720    pub fn estimate_avg_degree(&self, edge_type: &str, outgoing: bool) -> f64 {
2721        self.statistics
2722            .read()
2723            .estimate_avg_degree(edge_type, outgoing)
2724    }
2725
2726    // === Internal Helpers ===
2727
2728    fn get_or_create_label_id(&self, label: &str) -> u32 {
2729        {
2730            let label_to_id = self.label_to_id.read();
2731            if let Some(&id) = label_to_id.get(label) {
2732                return id;
2733            }
2734        }
2735
2736        let mut label_to_id = self.label_to_id.write();
2737        let mut id_to_label = self.id_to_label.write();
2738
2739        // Double-check after acquiring write lock
2740        if let Some(&id) = label_to_id.get(label) {
2741            return id;
2742        }
2743
2744        let id = id_to_label.len() as u32;
2745
2746        let label: ArcStr = label.into();
2747        label_to_id.insert(label.clone(), id);
2748        id_to_label.push(label);
2749
2750        id
2751    }
2752
2753    fn get_or_create_edge_type_id(&self, edge_type: &str) -> u32 {
2754        {
2755            let type_to_id = self.edge_type_to_id.read();
2756            if let Some(&id) = type_to_id.get(edge_type) {
2757                return id;
2758            }
2759        }
2760
2761        let mut type_to_id = self.edge_type_to_id.write();
2762        let mut id_to_type = self.id_to_edge_type.write();
2763
2764        // Double-check
2765        if let Some(&id) = type_to_id.get(edge_type) {
2766            return id;
2767        }
2768
2769        let id = id_to_type.len() as u32;
2770        let edge_type: ArcStr = edge_type.into();
2771        type_to_id.insert(edge_type.clone(), id);
2772        id_to_type.push(edge_type);
2773
2774        id
2775    }
2776
2777    // === Recovery Support ===
2778
2779    /// Creates a node with a specific ID during recovery.
2780    ///
2781    /// This is used for WAL recovery to restore nodes with their original IDs.
2782    /// The caller must ensure IDs don't conflict with existing nodes.
2783    #[cfg(not(feature = "tiered-storage"))]
2784    pub fn create_node_with_id(&self, id: NodeId, labels: &[&str]) {
2785        let epoch = self.current_epoch();
2786        let mut record = NodeRecord::new(id, epoch);
2787        record.set_label_count(labels.len() as u16);
2788
2789        // Store labels in node_labels map and label_index
2790        let mut node_label_set = FxHashSet::default();
2791        for label in labels {
2792            let label_id = self.get_or_create_label_id(*label);
2793            node_label_set.insert(label_id);
2794
2795            // Update label index
2796            let mut index = self.label_index.write();
2797            while index.len() <= label_id as usize {
2798                index.push(FxHashMap::default());
2799            }
2800            index[label_id as usize].insert(id, ());
2801        }
2802
2803        // Store node's labels
2804        self.node_labels.write().insert(id, node_label_set);
2805
2806        // Create version chain with initial version (using SYSTEM tx for recovery)
2807        let chain = VersionChain::with_initial(record, epoch, TxId::SYSTEM);
2808        self.nodes.write().insert(id, chain);
2809
2810        // Update next_node_id if necessary to avoid future collisions
2811        let id_val = id.as_u64();
2812        let _ = self
2813            .next_node_id
2814            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
2815                if id_val >= current {
2816                    Some(id_val + 1)
2817                } else {
2818                    None
2819                }
2820            });
2821    }
2822
2823    /// Creates a node with a specific ID during recovery.
2824    /// (Tiered storage version)
2825    #[cfg(feature = "tiered-storage")]
2826    pub fn create_node_with_id(&self, id: NodeId, labels: &[&str]) {
2827        let epoch = self.current_epoch();
2828        let mut record = NodeRecord::new(id, epoch);
2829        record.set_label_count(labels.len() as u16);
2830
2831        // Store labels in node_labels map and label_index
2832        let mut node_label_set = FxHashSet::default();
2833        for label in labels {
2834            let label_id = self.get_or_create_label_id(*label);
2835            node_label_set.insert(label_id);
2836
2837            // Update label index
2838            let mut index = self.label_index.write();
2839            while index.len() <= label_id as usize {
2840                index.push(FxHashMap::default());
2841            }
2842            index[label_id as usize].insert(id, ());
2843        }
2844
2845        // Store node's labels
2846        self.node_labels.write().insert(id, node_label_set);
2847
2848        // Allocate record in arena and get offset (create epoch if needed)
2849        let arena = self.arena_allocator.arena_or_create(epoch);
2850        let (offset, _stored) = arena.alloc_value_with_offset(record);
2851
2852        // Create HotVersionRef (using SYSTEM tx for recovery)
2853        let hot_ref = HotVersionRef::new(epoch, offset, TxId::SYSTEM);
2854        let mut versions = self.node_versions.write();
2855        versions.insert(id, VersionIndex::with_initial(hot_ref));
2856
2857        // Update next_node_id if necessary to avoid future collisions
2858        let id_val = id.as_u64();
2859        let _ = self
2860            .next_node_id
2861            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
2862                if id_val >= current {
2863                    Some(id_val + 1)
2864                } else {
2865                    None
2866                }
2867            });
2868    }
2869
2870    /// Creates an edge with a specific ID during recovery.
2871    ///
2872    /// This is used for WAL recovery to restore edges with their original IDs.
2873    #[cfg(not(feature = "tiered-storage"))]
2874    pub fn create_edge_with_id(&self, id: EdgeId, src: NodeId, dst: NodeId, edge_type: &str) {
2875        let epoch = self.current_epoch();
2876        let type_id = self.get_or_create_edge_type_id(edge_type);
2877
2878        let record = EdgeRecord::new(id, src, dst, type_id, epoch);
2879        let chain = VersionChain::with_initial(record, epoch, TxId::SYSTEM);
2880        self.edges.write().insert(id, chain);
2881
2882        // Update adjacency
2883        self.forward_adj.add_edge(src, dst, id);
2884        if let Some(ref backward) = self.backward_adj {
2885            backward.add_edge(dst, src, id);
2886        }
2887
2888        // Update next_edge_id if necessary
2889        let id_val = id.as_u64();
2890        let _ = self
2891            .next_edge_id
2892            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
2893                if id_val >= current {
2894                    Some(id_val + 1)
2895                } else {
2896                    None
2897                }
2898            });
2899    }
2900
2901    /// Creates an edge with a specific ID during recovery.
2902    /// (Tiered storage version)
2903    #[cfg(feature = "tiered-storage")]
2904    pub fn create_edge_with_id(&self, id: EdgeId, src: NodeId, dst: NodeId, edge_type: &str) {
2905        let epoch = self.current_epoch();
2906        let type_id = self.get_or_create_edge_type_id(edge_type);
2907
2908        let record = EdgeRecord::new(id, src, dst, type_id, epoch);
2909
2910        // Allocate record in arena and get offset (create epoch if needed)
2911        let arena = self.arena_allocator.arena_or_create(epoch);
2912        let (offset, _stored) = arena.alloc_value_with_offset(record);
2913
2914        // Create HotVersionRef (using SYSTEM tx for recovery)
2915        let hot_ref = HotVersionRef::new(epoch, offset, TxId::SYSTEM);
2916        let mut versions = self.edge_versions.write();
2917        versions.insert(id, VersionIndex::with_initial(hot_ref));
2918
2919        // Update adjacency
2920        self.forward_adj.add_edge(src, dst, id);
2921        if let Some(ref backward) = self.backward_adj {
2922            backward.add_edge(dst, src, id);
2923        }
2924
2925        // Update next_edge_id if necessary
2926        let id_val = id.as_u64();
2927        let _ = self
2928            .next_edge_id
2929            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
2930                if id_val >= current {
2931                    Some(id_val + 1)
2932                } else {
2933                    None
2934                }
2935            });
2936    }
2937
2938    /// Sets the current epoch during recovery.
2939    pub fn set_epoch(&self, epoch: EpochId) {
2940        self.current_epoch.store(epoch.as_u64(), Ordering::SeqCst);
2941    }
2942}
2943
2944impl Default for LpgStore {
2945    fn default() -> Self {
2946        Self::new()
2947    }
2948}
2949
2950#[cfg(test)]
2951mod tests {
2952    use super::*;
2953
2954    #[test]
2955    fn test_create_node() {
2956        let store = LpgStore::new();
2957
2958        let id = store.create_node(&["Person"]);
2959        assert!(id.is_valid());
2960
2961        let node = store.get_node(id).unwrap();
2962        assert!(node.has_label("Person"));
2963        assert!(!node.has_label("Animal"));
2964    }
2965
2966    #[test]
2967    fn test_create_node_with_props() {
2968        let store = LpgStore::new();
2969
2970        let id = store.create_node_with_props(
2971            &["Person"],
2972            [("name", Value::from("Alice")), ("age", Value::from(30i64))],
2973        );
2974
2975        let node = store.get_node(id).unwrap();
2976        assert_eq!(
2977            node.get_property("name").and_then(|v| v.as_str()),
2978            Some("Alice")
2979        );
2980        assert_eq!(
2981            node.get_property("age").and_then(|v| v.as_int64()),
2982            Some(30)
2983        );
2984    }
2985
2986    #[test]
2987    fn test_delete_node() {
2988        let store = LpgStore::new();
2989
2990        let id = store.create_node(&["Person"]);
2991        assert_eq!(store.node_count(), 1);
2992
2993        assert!(store.delete_node(id));
2994        assert_eq!(store.node_count(), 0);
2995        assert!(store.get_node(id).is_none());
2996
2997        // Double delete should return false
2998        assert!(!store.delete_node(id));
2999    }
3000
3001    #[test]
3002    fn test_create_edge() {
3003        let store = LpgStore::new();
3004
3005        let alice = store.create_node(&["Person"]);
3006        let bob = store.create_node(&["Person"]);
3007
3008        let edge_id = store.create_edge(alice, bob, "KNOWS");
3009        assert!(edge_id.is_valid());
3010
3011        let edge = store.get_edge(edge_id).unwrap();
3012        assert_eq!(edge.src, alice);
3013        assert_eq!(edge.dst, bob);
3014        assert_eq!(edge.edge_type.as_str(), "KNOWS");
3015    }
3016
3017    #[test]
3018    fn test_neighbors() {
3019        let store = LpgStore::new();
3020
3021        let a = store.create_node(&["Person"]);
3022        let b = store.create_node(&["Person"]);
3023        let c = store.create_node(&["Person"]);
3024
3025        store.create_edge(a, b, "KNOWS");
3026        store.create_edge(a, c, "KNOWS");
3027
3028        let outgoing: Vec<_> = store.neighbors(a, Direction::Outgoing).collect();
3029        assert_eq!(outgoing.len(), 2);
3030        assert!(outgoing.contains(&b));
3031        assert!(outgoing.contains(&c));
3032
3033        let incoming: Vec<_> = store.neighbors(b, Direction::Incoming).collect();
3034        assert_eq!(incoming.len(), 1);
3035        assert!(incoming.contains(&a));
3036    }
3037
3038    #[test]
3039    fn test_nodes_by_label() {
3040        let store = LpgStore::new();
3041
3042        let p1 = store.create_node(&["Person"]);
3043        let p2 = store.create_node(&["Person"]);
3044        let _a = store.create_node(&["Animal"]);
3045
3046        let persons = store.nodes_by_label("Person");
3047        assert_eq!(persons.len(), 2);
3048        assert!(persons.contains(&p1));
3049        assert!(persons.contains(&p2));
3050
3051        let animals = store.nodes_by_label("Animal");
3052        assert_eq!(animals.len(), 1);
3053    }
3054
3055    #[test]
3056    fn test_delete_edge() {
3057        let store = LpgStore::new();
3058
3059        let a = store.create_node(&["Person"]);
3060        let b = store.create_node(&["Person"]);
3061        let edge_id = store.create_edge(a, b, "KNOWS");
3062
3063        assert_eq!(store.edge_count(), 1);
3064
3065        assert!(store.delete_edge(edge_id));
3066        assert_eq!(store.edge_count(), 0);
3067        assert!(store.get_edge(edge_id).is_none());
3068    }
3069
3070    // === New tests for improved coverage ===
3071
3072    #[test]
3073    fn test_lpg_store_config() {
3074        // Test with_config
3075        let config = LpgStoreConfig {
3076            backward_edges: false,
3077            initial_node_capacity: 100,
3078            initial_edge_capacity: 200,
3079        };
3080        let store = LpgStore::with_config(config);
3081
3082        // Store should work but without backward adjacency
3083        let a = store.create_node(&["Person"]);
3084        let b = store.create_node(&["Person"]);
3085        store.create_edge(a, b, "KNOWS");
3086
3087        // Outgoing should work
3088        let outgoing: Vec<_> = store.neighbors(a, Direction::Outgoing).collect();
3089        assert_eq!(outgoing.len(), 1);
3090
3091        // Incoming should be empty (no backward adjacency)
3092        let incoming: Vec<_> = store.neighbors(b, Direction::Incoming).collect();
3093        assert_eq!(incoming.len(), 0);
3094    }
3095
3096    #[test]
3097    fn test_epoch_management() {
3098        let store = LpgStore::new();
3099
3100        let epoch0 = store.current_epoch();
3101        assert_eq!(epoch0.as_u64(), 0);
3102
3103        let epoch1 = store.new_epoch();
3104        assert_eq!(epoch1.as_u64(), 1);
3105
3106        let current = store.current_epoch();
3107        assert_eq!(current.as_u64(), 1);
3108    }
3109
3110    #[test]
3111    fn test_node_properties() {
3112        let store = LpgStore::new();
3113        let id = store.create_node(&["Person"]);
3114
3115        // Set and get property
3116        store.set_node_property(id, "name", Value::from("Alice"));
3117        let name = store.get_node_property(id, &"name".into());
3118        assert!(matches!(name, Some(Value::String(s)) if s.as_str() == "Alice"));
3119
3120        // Update property
3121        store.set_node_property(id, "name", Value::from("Bob"));
3122        let name = store.get_node_property(id, &"name".into());
3123        assert!(matches!(name, Some(Value::String(s)) if s.as_str() == "Bob"));
3124
3125        // Remove property
3126        let old = store.remove_node_property(id, "name");
3127        assert!(matches!(old, Some(Value::String(s)) if s.as_str() == "Bob"));
3128
3129        // Property should be gone
3130        let name = store.get_node_property(id, &"name".into());
3131        assert!(name.is_none());
3132
3133        // Remove non-existent property
3134        let none = store.remove_node_property(id, "nonexistent");
3135        assert!(none.is_none());
3136    }
3137
3138    #[test]
3139    fn test_edge_properties() {
3140        let store = LpgStore::new();
3141        let a = store.create_node(&["Person"]);
3142        let b = store.create_node(&["Person"]);
3143        let edge_id = store.create_edge(a, b, "KNOWS");
3144
3145        // Set and get property
3146        store.set_edge_property(edge_id, "since", Value::from(2020i64));
3147        let since = store.get_edge_property(edge_id, &"since".into());
3148        assert_eq!(since.and_then(|v| v.as_int64()), Some(2020));
3149
3150        // Remove property
3151        let old = store.remove_edge_property(edge_id, "since");
3152        assert_eq!(old.and_then(|v| v.as_int64()), Some(2020));
3153
3154        let since = store.get_edge_property(edge_id, &"since".into());
3155        assert!(since.is_none());
3156    }
3157
3158    #[test]
3159    fn test_add_remove_label() {
3160        let store = LpgStore::new();
3161        let id = store.create_node(&["Person"]);
3162
3163        // Add new label
3164        assert!(store.add_label(id, "Employee"));
3165
3166        let node = store.get_node(id).unwrap();
3167        assert!(node.has_label("Person"));
3168        assert!(node.has_label("Employee"));
3169
3170        // Adding same label again should fail
3171        assert!(!store.add_label(id, "Employee"));
3172
3173        // Remove label
3174        assert!(store.remove_label(id, "Employee"));
3175
3176        let node = store.get_node(id).unwrap();
3177        assert!(node.has_label("Person"));
3178        assert!(!node.has_label("Employee"));
3179
3180        // Removing non-existent label should fail
3181        assert!(!store.remove_label(id, "Employee"));
3182        assert!(!store.remove_label(id, "NonExistent"));
3183    }
3184
3185    #[test]
3186    fn test_add_label_to_nonexistent_node() {
3187        let store = LpgStore::new();
3188        let fake_id = NodeId::new(999);
3189        assert!(!store.add_label(fake_id, "Label"));
3190    }
3191
3192    #[test]
3193    fn test_remove_label_from_nonexistent_node() {
3194        let store = LpgStore::new();
3195        let fake_id = NodeId::new(999);
3196        assert!(!store.remove_label(fake_id, "Label"));
3197    }
3198
3199    #[test]
3200    fn test_node_ids() {
3201        let store = LpgStore::new();
3202
3203        let n1 = store.create_node(&["Person"]);
3204        let n2 = store.create_node(&["Person"]);
3205        let n3 = store.create_node(&["Person"]);
3206
3207        let ids = store.node_ids();
3208        assert_eq!(ids.len(), 3);
3209        assert!(ids.contains(&n1));
3210        assert!(ids.contains(&n2));
3211        assert!(ids.contains(&n3));
3212
3213        // Delete one
3214        store.delete_node(n2);
3215        let ids = store.node_ids();
3216        assert_eq!(ids.len(), 2);
3217        assert!(!ids.contains(&n2));
3218    }
3219
3220    #[test]
3221    fn test_delete_node_nonexistent() {
3222        let store = LpgStore::new();
3223        let fake_id = NodeId::new(999);
3224        assert!(!store.delete_node(fake_id));
3225    }
3226
3227    #[test]
3228    fn test_delete_edge_nonexistent() {
3229        let store = LpgStore::new();
3230        let fake_id = EdgeId::new(999);
3231        assert!(!store.delete_edge(fake_id));
3232    }
3233
3234    #[test]
3235    fn test_delete_edge_double() {
3236        let store = LpgStore::new();
3237        let a = store.create_node(&["Person"]);
3238        let b = store.create_node(&["Person"]);
3239        let edge_id = store.create_edge(a, b, "KNOWS");
3240
3241        assert!(store.delete_edge(edge_id));
3242        assert!(!store.delete_edge(edge_id)); // Double delete
3243    }
3244
3245    #[test]
3246    fn test_create_edge_with_props() {
3247        let store = LpgStore::new();
3248        let a = store.create_node(&["Person"]);
3249        let b = store.create_node(&["Person"]);
3250
3251        let edge_id = store.create_edge_with_props(
3252            a,
3253            b,
3254            "KNOWS",
3255            [
3256                ("since", Value::from(2020i64)),
3257                ("weight", Value::from(1.0)),
3258            ],
3259        );
3260
3261        let edge = store.get_edge(edge_id).unwrap();
3262        assert_eq!(
3263            edge.get_property("since").and_then(|v| v.as_int64()),
3264            Some(2020)
3265        );
3266        assert_eq!(
3267            edge.get_property("weight").and_then(|v| v.as_float64()),
3268            Some(1.0)
3269        );
3270    }
3271
3272    #[test]
3273    fn test_delete_node_edges() {
3274        let store = LpgStore::new();
3275
3276        let a = store.create_node(&["Person"]);
3277        let b = store.create_node(&["Person"]);
3278        let c = store.create_node(&["Person"]);
3279
3280        store.create_edge(a, b, "KNOWS"); // a -> b
3281        store.create_edge(c, a, "KNOWS"); // c -> a
3282
3283        assert_eq!(store.edge_count(), 2);
3284
3285        // Delete all edges connected to a
3286        store.delete_node_edges(a);
3287
3288        assert_eq!(store.edge_count(), 0);
3289    }
3290
3291    #[test]
3292    fn test_neighbors_both_directions() {
3293        let store = LpgStore::new();
3294
3295        let a = store.create_node(&["Person"]);
3296        let b = store.create_node(&["Person"]);
3297        let c = store.create_node(&["Person"]);
3298
3299        store.create_edge(a, b, "KNOWS"); // a -> b
3300        store.create_edge(c, a, "KNOWS"); // c -> a
3301
3302        // Direction::Both for node a
3303        let neighbors: Vec<_> = store.neighbors(a, Direction::Both).collect();
3304        assert_eq!(neighbors.len(), 2);
3305        assert!(neighbors.contains(&b)); // outgoing
3306        assert!(neighbors.contains(&c)); // incoming
3307    }
3308
3309    #[test]
3310    fn test_edges_from() {
3311        let store = LpgStore::new();
3312
3313        let a = store.create_node(&["Person"]);
3314        let b = store.create_node(&["Person"]);
3315        let c = store.create_node(&["Person"]);
3316
3317        let e1 = store.create_edge(a, b, "KNOWS");
3318        let e2 = store.create_edge(a, c, "KNOWS");
3319
3320        let edges: Vec<_> = store.edges_from(a, Direction::Outgoing).collect();
3321        assert_eq!(edges.len(), 2);
3322        assert!(edges.iter().any(|(_, e)| *e == e1));
3323        assert!(edges.iter().any(|(_, e)| *e == e2));
3324
3325        // Incoming edges to b
3326        let incoming: Vec<_> = store.edges_from(b, Direction::Incoming).collect();
3327        assert_eq!(incoming.len(), 1);
3328        assert_eq!(incoming[0].1, e1);
3329    }
3330
3331    #[test]
3332    fn test_edges_to() {
3333        let store = LpgStore::new();
3334
3335        let a = store.create_node(&["Person"]);
3336        let b = store.create_node(&["Person"]);
3337        let c = store.create_node(&["Person"]);
3338
3339        let e1 = store.create_edge(a, b, "KNOWS");
3340        let e2 = store.create_edge(c, b, "KNOWS");
3341
3342        // Edges pointing TO b
3343        let to_b = store.edges_to(b);
3344        assert_eq!(to_b.len(), 2);
3345        assert!(to_b.iter().any(|(src, e)| *src == a && *e == e1));
3346        assert!(to_b.iter().any(|(src, e)| *src == c && *e == e2));
3347    }
3348
3349    #[test]
3350    fn test_out_degree_in_degree() {
3351        let store = LpgStore::new();
3352
3353        let a = store.create_node(&["Person"]);
3354        let b = store.create_node(&["Person"]);
3355        let c = store.create_node(&["Person"]);
3356
3357        store.create_edge(a, b, "KNOWS");
3358        store.create_edge(a, c, "KNOWS");
3359        store.create_edge(c, b, "KNOWS");
3360
3361        assert_eq!(store.out_degree(a), 2);
3362        assert_eq!(store.out_degree(b), 0);
3363        assert_eq!(store.out_degree(c), 1);
3364
3365        assert_eq!(store.in_degree(a), 0);
3366        assert_eq!(store.in_degree(b), 2);
3367        assert_eq!(store.in_degree(c), 1);
3368    }
3369
3370    #[test]
3371    fn test_edge_type() {
3372        let store = LpgStore::new();
3373
3374        let a = store.create_node(&["Person"]);
3375        let b = store.create_node(&["Person"]);
3376        let edge_id = store.create_edge(a, b, "KNOWS");
3377
3378        let edge_type = store.edge_type(edge_id);
3379        assert_eq!(edge_type.as_deref(), Some("KNOWS"));
3380
3381        // Non-existent edge
3382        let fake_id = EdgeId::new(999);
3383        assert!(store.edge_type(fake_id).is_none());
3384    }
3385
3386    #[test]
3387    fn test_count_methods() {
3388        let store = LpgStore::new();
3389
3390        assert_eq!(store.label_count(), 0);
3391        assert_eq!(store.edge_type_count(), 0);
3392        assert_eq!(store.property_key_count(), 0);
3393
3394        let a = store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
3395        let b = store.create_node(&["Company"]);
3396        store.create_edge_with_props(a, b, "WORKS_AT", [("since", Value::from(2020i64))]);
3397
3398        assert_eq!(store.label_count(), 2); // Person, Company
3399        assert_eq!(store.edge_type_count(), 1); // WORKS_AT
3400        assert_eq!(store.property_key_count(), 2); // age, since
3401    }
3402
3403    #[test]
3404    fn test_all_nodes_and_edges() {
3405        let store = LpgStore::new();
3406
3407        let a = store.create_node(&["Person"]);
3408        let b = store.create_node(&["Person"]);
3409        store.create_edge(a, b, "KNOWS");
3410
3411        let nodes: Vec<_> = store.all_nodes().collect();
3412        assert_eq!(nodes.len(), 2);
3413
3414        let edges: Vec<_> = store.all_edges().collect();
3415        assert_eq!(edges.len(), 1);
3416    }
3417
3418    #[test]
3419    fn test_all_labels_and_edge_types() {
3420        let store = LpgStore::new();
3421
3422        store.create_node(&["Person"]);
3423        store.create_node(&["Company"]);
3424        let a = store.create_node(&["Animal"]);
3425        let b = store.create_node(&["Animal"]);
3426        store.create_edge(a, b, "EATS");
3427
3428        let labels = store.all_labels();
3429        assert_eq!(labels.len(), 3);
3430        assert!(labels.contains(&"Person".to_string()));
3431        assert!(labels.contains(&"Company".to_string()));
3432        assert!(labels.contains(&"Animal".to_string()));
3433
3434        let edge_types = store.all_edge_types();
3435        assert_eq!(edge_types.len(), 1);
3436        assert!(edge_types.contains(&"EATS".to_string()));
3437    }
3438
3439    #[test]
3440    fn test_all_property_keys() {
3441        let store = LpgStore::new();
3442
3443        let a = store.create_node_with_props(&["Person"], [("name", Value::from("Alice"))]);
3444        let b = store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
3445        store.create_edge_with_props(a, b, "KNOWS", [("since", Value::from(2020i64))]);
3446
3447        let keys = store.all_property_keys();
3448        assert!(keys.contains(&"name".to_string()));
3449        assert!(keys.contains(&"age".to_string()));
3450        assert!(keys.contains(&"since".to_string()));
3451    }
3452
3453    #[test]
3454    fn test_nodes_with_label() {
3455        let store = LpgStore::new();
3456
3457        store.create_node(&["Person"]);
3458        store.create_node(&["Person"]);
3459        store.create_node(&["Company"]);
3460
3461        let persons: Vec<_> = store.nodes_with_label("Person").collect();
3462        assert_eq!(persons.len(), 2);
3463
3464        let companies: Vec<_> = store.nodes_with_label("Company").collect();
3465        assert_eq!(companies.len(), 1);
3466
3467        let none: Vec<_> = store.nodes_with_label("NonExistent").collect();
3468        assert_eq!(none.len(), 0);
3469    }
3470
3471    #[test]
3472    fn test_edges_with_type() {
3473        let store = LpgStore::new();
3474
3475        let a = store.create_node(&["Person"]);
3476        let b = store.create_node(&["Person"]);
3477        let c = store.create_node(&["Company"]);
3478
3479        store.create_edge(a, b, "KNOWS");
3480        store.create_edge(a, c, "WORKS_AT");
3481
3482        let knows: Vec<_> = store.edges_with_type("KNOWS").collect();
3483        assert_eq!(knows.len(), 1);
3484
3485        let works_at: Vec<_> = store.edges_with_type("WORKS_AT").collect();
3486        assert_eq!(works_at.len(), 1);
3487
3488        let none: Vec<_> = store.edges_with_type("NonExistent").collect();
3489        assert_eq!(none.len(), 0);
3490    }
3491
3492    #[test]
3493    fn test_nodes_by_label_nonexistent() {
3494        let store = LpgStore::new();
3495        store.create_node(&["Person"]);
3496
3497        let empty = store.nodes_by_label("NonExistent");
3498        assert!(empty.is_empty());
3499    }
3500
3501    #[test]
3502    fn test_statistics() {
3503        let store = LpgStore::new();
3504
3505        let a = store.create_node(&["Person"]);
3506        let b = store.create_node(&["Person"]);
3507        let c = store.create_node(&["Company"]);
3508
3509        store.create_edge(a, b, "KNOWS");
3510        store.create_edge(a, c, "WORKS_AT");
3511
3512        store.compute_statistics();
3513        let stats = store.statistics();
3514
3515        assert_eq!(stats.total_nodes, 3);
3516        assert_eq!(stats.total_edges, 2);
3517
3518        // Estimates
3519        let person_card = store.estimate_label_cardinality("Person");
3520        assert!(person_card > 0.0);
3521
3522        let avg_degree = store.estimate_avg_degree("KNOWS", true);
3523        assert!(avg_degree >= 0.0);
3524    }
3525
3526    #[test]
3527    fn test_zone_maps() {
3528        let store = LpgStore::new();
3529
3530        store.create_node_with_props(&["Person"], [("age", Value::from(25i64))]);
3531        store.create_node_with_props(&["Person"], [("age", Value::from(35i64))]);
3532
3533        // Zone map should indicate possible matches (30 is within [25, 35] range)
3534        let might_match =
3535            store.node_property_might_match(&"age".into(), CompareOp::Eq, &Value::from(30i64));
3536        // Zone maps return true conservatively when value is within min/max range
3537        assert!(might_match);
3538
3539        let zone = store.node_property_zone_map(&"age".into());
3540        assert!(zone.is_some());
3541
3542        // Non-existent property
3543        let no_zone = store.node_property_zone_map(&"nonexistent".into());
3544        assert!(no_zone.is_none());
3545
3546        // Edge zone maps
3547        let a = store.create_node(&["A"]);
3548        let b = store.create_node(&["B"]);
3549        store.create_edge_with_props(a, b, "REL", [("weight", Value::from(1.0))]);
3550
3551        let edge_zone = store.edge_property_zone_map(&"weight".into());
3552        assert!(edge_zone.is_some());
3553    }
3554
3555    #[test]
3556    fn test_rebuild_zone_maps() {
3557        let store = LpgStore::new();
3558        store.create_node_with_props(&["Person"], [("age", Value::from(25i64))]);
3559
3560        // Should not panic
3561        store.rebuild_zone_maps();
3562    }
3563
3564    #[test]
3565    fn test_create_node_with_id() {
3566        let store = LpgStore::new();
3567
3568        let specific_id = NodeId::new(100);
3569        store.create_node_with_id(specific_id, &["Person", "Employee"]);
3570
3571        let node = store.get_node(specific_id).unwrap();
3572        assert!(node.has_label("Person"));
3573        assert!(node.has_label("Employee"));
3574
3575        // Next auto-generated ID should be > 100
3576        let next = store.create_node(&["Other"]);
3577        assert!(next.as_u64() > 100);
3578    }
3579
3580    #[test]
3581    fn test_create_edge_with_id() {
3582        let store = LpgStore::new();
3583
3584        let a = store.create_node(&["A"]);
3585        let b = store.create_node(&["B"]);
3586
3587        let specific_id = EdgeId::new(500);
3588        store.create_edge_with_id(specific_id, a, b, "REL");
3589
3590        let edge = store.get_edge(specific_id).unwrap();
3591        assert_eq!(edge.src, a);
3592        assert_eq!(edge.dst, b);
3593        assert_eq!(edge.edge_type.as_str(), "REL");
3594
3595        // Next auto-generated ID should be > 500
3596        let next = store.create_edge(a, b, "OTHER");
3597        assert!(next.as_u64() > 500);
3598    }
3599
3600    #[test]
3601    fn test_set_epoch() {
3602        let store = LpgStore::new();
3603
3604        assert_eq!(store.current_epoch().as_u64(), 0);
3605
3606        store.set_epoch(EpochId::new(42));
3607        assert_eq!(store.current_epoch().as_u64(), 42);
3608    }
3609
3610    #[test]
3611    fn test_get_node_nonexistent() {
3612        let store = LpgStore::new();
3613        let fake_id = NodeId::new(999);
3614        assert!(store.get_node(fake_id).is_none());
3615    }
3616
3617    #[test]
3618    fn test_get_edge_nonexistent() {
3619        let store = LpgStore::new();
3620        let fake_id = EdgeId::new(999);
3621        assert!(store.get_edge(fake_id).is_none());
3622    }
3623
3624    #[test]
3625    fn test_multiple_labels() {
3626        let store = LpgStore::new();
3627
3628        let id = store.create_node(&["Person", "Employee", "Manager"]);
3629        let node = store.get_node(id).unwrap();
3630
3631        assert!(node.has_label("Person"));
3632        assert!(node.has_label("Employee"));
3633        assert!(node.has_label("Manager"));
3634        assert!(!node.has_label("Other"));
3635    }
3636
3637    #[test]
3638    fn test_default_impl() {
3639        let store: LpgStore = Default::default();
3640        assert_eq!(store.node_count(), 0);
3641        assert_eq!(store.edge_count(), 0);
3642    }
3643
3644    #[test]
3645    fn test_edges_from_both_directions() {
3646        let store = LpgStore::new();
3647
3648        let a = store.create_node(&["A"]);
3649        let b = store.create_node(&["B"]);
3650        let c = store.create_node(&["C"]);
3651
3652        let e1 = store.create_edge(a, b, "R1"); // a -> b
3653        let e2 = store.create_edge(c, a, "R2"); // c -> a
3654
3655        // Both directions from a
3656        let edges: Vec<_> = store.edges_from(a, Direction::Both).collect();
3657        assert_eq!(edges.len(), 2);
3658        assert!(edges.iter().any(|(_, e)| *e == e1)); // outgoing
3659        assert!(edges.iter().any(|(_, e)| *e == e2)); // incoming
3660    }
3661
3662    #[test]
3663    fn test_no_backward_adj_in_degree() {
3664        let config = LpgStoreConfig {
3665            backward_edges: false,
3666            initial_node_capacity: 10,
3667            initial_edge_capacity: 10,
3668        };
3669        let store = LpgStore::with_config(config);
3670
3671        let a = store.create_node(&["A"]);
3672        let b = store.create_node(&["B"]);
3673        store.create_edge(a, b, "R");
3674
3675        // in_degree should still work (falls back to scanning)
3676        let degree = store.in_degree(b);
3677        assert_eq!(degree, 1);
3678    }
3679
3680    #[test]
3681    fn test_no_backward_adj_edges_to() {
3682        let config = LpgStoreConfig {
3683            backward_edges: false,
3684            initial_node_capacity: 10,
3685            initial_edge_capacity: 10,
3686        };
3687        let store = LpgStore::with_config(config);
3688
3689        let a = store.create_node(&["A"]);
3690        let b = store.create_node(&["B"]);
3691        let e = store.create_edge(a, b, "R");
3692
3693        // edges_to should still work (falls back to scanning)
3694        let edges = store.edges_to(b);
3695        assert_eq!(edges.len(), 1);
3696        assert_eq!(edges[0].1, e);
3697    }
3698
3699    #[test]
3700    fn test_node_versioned_creation() {
3701        let store = LpgStore::new();
3702
3703        let epoch = store.new_epoch();
3704        let tx_id = TxId::new(1);
3705
3706        let id = store.create_node_versioned(&["Person"], epoch, tx_id);
3707        assert!(store.get_node(id).is_some());
3708    }
3709
3710    #[test]
3711    fn test_edge_versioned_creation() {
3712        let store = LpgStore::new();
3713
3714        let a = store.create_node(&["A"]);
3715        let b = store.create_node(&["B"]);
3716
3717        let epoch = store.new_epoch();
3718        let tx_id = TxId::new(1);
3719
3720        let edge_id = store.create_edge_versioned(a, b, "REL", epoch, tx_id);
3721        assert!(store.get_edge(edge_id).is_some());
3722    }
3723
3724    #[test]
3725    fn test_node_with_props_versioned() {
3726        let store = LpgStore::new();
3727
3728        let epoch = store.new_epoch();
3729        let tx_id = TxId::new(1);
3730
3731        let id = store.create_node_with_props_versioned(
3732            &["Person"],
3733            [("name", Value::from("Alice"))],
3734            epoch,
3735            tx_id,
3736        );
3737
3738        let node = store.get_node(id).unwrap();
3739        assert_eq!(
3740            node.get_property("name").and_then(|v| v.as_str()),
3741            Some("Alice")
3742        );
3743    }
3744
3745    #[test]
3746    fn test_discard_uncommitted_versions() {
3747        let store = LpgStore::new();
3748
3749        let epoch = store.new_epoch();
3750        let tx_id = TxId::new(42);
3751
3752        // Create node with specific tx
3753        let node_id = store.create_node_versioned(&["Person"], epoch, tx_id);
3754        assert!(store.get_node(node_id).is_some());
3755
3756        // Discard uncommitted versions for this tx
3757        store.discard_uncommitted_versions(tx_id);
3758
3759        // Node should be gone (version chain was removed)
3760        assert!(store.get_node(node_id).is_none());
3761    }
3762
3763    // === Property Index Tests ===
3764
3765    #[test]
3766    fn test_property_index_create_and_lookup() {
3767        let store = LpgStore::new();
3768
3769        // Create nodes with properties
3770        let alice = store.create_node(&["Person"]);
3771        let bob = store.create_node(&["Person"]);
3772        let charlie = store.create_node(&["Person"]);
3773
3774        store.set_node_property(alice, "city", Value::from("NYC"));
3775        store.set_node_property(bob, "city", Value::from("NYC"));
3776        store.set_node_property(charlie, "city", Value::from("LA"));
3777
3778        // Before indexing, lookup still works (via scan)
3779        let nyc_people = store.find_nodes_by_property("city", &Value::from("NYC"));
3780        assert_eq!(nyc_people.len(), 2);
3781
3782        // Create index
3783        store.create_property_index("city");
3784        assert!(store.has_property_index("city"));
3785
3786        // Indexed lookup should return same results
3787        let nyc_people = store.find_nodes_by_property("city", &Value::from("NYC"));
3788        assert_eq!(nyc_people.len(), 2);
3789        assert!(nyc_people.contains(&alice));
3790        assert!(nyc_people.contains(&bob));
3791
3792        let la_people = store.find_nodes_by_property("city", &Value::from("LA"));
3793        assert_eq!(la_people.len(), 1);
3794        assert!(la_people.contains(&charlie));
3795    }
3796
3797    #[test]
3798    fn test_property_index_maintained_on_update() {
3799        let store = LpgStore::new();
3800
3801        // Create index first
3802        store.create_property_index("status");
3803
3804        let node = store.create_node(&["Task"]);
3805        store.set_node_property(node, "status", Value::from("pending"));
3806
3807        // Should find by initial value
3808        let pending = store.find_nodes_by_property("status", &Value::from("pending"));
3809        assert_eq!(pending.len(), 1);
3810        assert!(pending.contains(&node));
3811
3812        // Update the property
3813        store.set_node_property(node, "status", Value::from("done"));
3814
3815        // Old value should not find it
3816        let pending = store.find_nodes_by_property("status", &Value::from("pending"));
3817        assert!(pending.is_empty());
3818
3819        // New value should find it
3820        let done = store.find_nodes_by_property("status", &Value::from("done"));
3821        assert_eq!(done.len(), 1);
3822        assert!(done.contains(&node));
3823    }
3824
3825    #[test]
3826    fn test_property_index_maintained_on_remove() {
3827        let store = LpgStore::new();
3828
3829        store.create_property_index("tag");
3830
3831        let node = store.create_node(&["Item"]);
3832        store.set_node_property(node, "tag", Value::from("important"));
3833
3834        // Should find it
3835        let found = store.find_nodes_by_property("tag", &Value::from("important"));
3836        assert_eq!(found.len(), 1);
3837
3838        // Remove the property
3839        store.remove_node_property(node, "tag");
3840
3841        // Should no longer find it
3842        let found = store.find_nodes_by_property("tag", &Value::from("important"));
3843        assert!(found.is_empty());
3844    }
3845
3846    #[test]
3847    fn test_property_index_drop() {
3848        let store = LpgStore::new();
3849
3850        store.create_property_index("key");
3851        assert!(store.has_property_index("key"));
3852
3853        assert!(store.drop_property_index("key"));
3854        assert!(!store.has_property_index("key"));
3855
3856        // Dropping non-existent index returns false
3857        assert!(!store.drop_property_index("key"));
3858    }
3859
3860    #[test]
3861    fn test_property_index_multiple_values() {
3862        let store = LpgStore::new();
3863
3864        store.create_property_index("age");
3865
3866        // Create multiple nodes with same and different ages
3867        let n1 = store.create_node(&["Person"]);
3868        let n2 = store.create_node(&["Person"]);
3869        let n3 = store.create_node(&["Person"]);
3870        let n4 = store.create_node(&["Person"]);
3871
3872        store.set_node_property(n1, "age", Value::from(25i64));
3873        store.set_node_property(n2, "age", Value::from(25i64));
3874        store.set_node_property(n3, "age", Value::from(30i64));
3875        store.set_node_property(n4, "age", Value::from(25i64));
3876
3877        let age_25 = store.find_nodes_by_property("age", &Value::from(25i64));
3878        assert_eq!(age_25.len(), 3);
3879
3880        let age_30 = store.find_nodes_by_property("age", &Value::from(30i64));
3881        assert_eq!(age_30.len(), 1);
3882
3883        let age_40 = store.find_nodes_by_property("age", &Value::from(40i64));
3884        assert!(age_40.is_empty());
3885    }
3886
3887    #[test]
3888    fn test_property_index_builds_from_existing_data() {
3889        let store = LpgStore::new();
3890
3891        // Create nodes first
3892        let n1 = store.create_node(&["Person"]);
3893        let n2 = store.create_node(&["Person"]);
3894        store.set_node_property(n1, "email", Value::from("alice@example.com"));
3895        store.set_node_property(n2, "email", Value::from("bob@example.com"));
3896
3897        // Create index after data exists
3898        store.create_property_index("email");
3899
3900        // Index should include existing data
3901        let alice = store.find_nodes_by_property("email", &Value::from("alice@example.com"));
3902        assert_eq!(alice.len(), 1);
3903        assert!(alice.contains(&n1));
3904
3905        let bob = store.find_nodes_by_property("email", &Value::from("bob@example.com"));
3906        assert_eq!(bob.len(), 1);
3907        assert!(bob.contains(&n2));
3908    }
3909
3910    #[test]
3911    fn test_get_node_property_batch() {
3912        let store = LpgStore::new();
3913
3914        let n1 = store.create_node(&["Person"]);
3915        let n2 = store.create_node(&["Person"]);
3916        let n3 = store.create_node(&["Person"]);
3917
3918        store.set_node_property(n1, "age", Value::from(25i64));
3919        store.set_node_property(n2, "age", Value::from(30i64));
3920        // n3 has no age property
3921
3922        let age_key = PropertyKey::new("age");
3923        let values = store.get_node_property_batch(&[n1, n2, n3], &age_key);
3924
3925        assert_eq!(values.len(), 3);
3926        assert_eq!(values[0], Some(Value::from(25i64)));
3927        assert_eq!(values[1], Some(Value::from(30i64)));
3928        assert_eq!(values[2], None);
3929    }
3930
3931    #[test]
3932    fn test_get_node_property_batch_empty() {
3933        let store = LpgStore::new();
3934        let key = PropertyKey::new("any");
3935
3936        let values = store.get_node_property_batch(&[], &key);
3937        assert!(values.is_empty());
3938    }
3939
3940    #[test]
3941    fn test_get_nodes_properties_batch() {
3942        let store = LpgStore::new();
3943
3944        let n1 = store.create_node(&["Person"]);
3945        let n2 = store.create_node(&["Person"]);
3946        let n3 = store.create_node(&["Person"]);
3947
3948        store.set_node_property(n1, "name", Value::from("Alice"));
3949        store.set_node_property(n1, "age", Value::from(25i64));
3950        store.set_node_property(n2, "name", Value::from("Bob"));
3951        // n3 has no properties
3952
3953        let all_props = store.get_nodes_properties_batch(&[n1, n2, n3]);
3954
3955        assert_eq!(all_props.len(), 3);
3956        assert_eq!(all_props[0].len(), 2); // name and age
3957        assert_eq!(all_props[1].len(), 1); // name only
3958        assert_eq!(all_props[2].len(), 0); // no properties
3959
3960        assert_eq!(
3961            all_props[0].get(&PropertyKey::new("name")),
3962            Some(&Value::from("Alice"))
3963        );
3964        assert_eq!(
3965            all_props[1].get(&PropertyKey::new("name")),
3966            Some(&Value::from("Bob"))
3967        );
3968    }
3969
3970    #[test]
3971    fn test_get_nodes_properties_batch_empty() {
3972        let store = LpgStore::new();
3973
3974        let all_props = store.get_nodes_properties_batch(&[]);
3975        assert!(all_props.is_empty());
3976    }
3977
3978    #[test]
3979    fn test_get_nodes_properties_selective_batch() {
3980        let store = LpgStore::new();
3981
3982        let n1 = store.create_node(&["Person"]);
3983        let n2 = store.create_node(&["Person"]);
3984
3985        // Set multiple properties
3986        store.set_node_property(n1, "name", Value::from("Alice"));
3987        store.set_node_property(n1, "age", Value::from(25i64));
3988        store.set_node_property(n1, "email", Value::from("alice@example.com"));
3989        store.set_node_property(n2, "name", Value::from("Bob"));
3990        store.set_node_property(n2, "age", Value::from(30i64));
3991        store.set_node_property(n2, "city", Value::from("NYC"));
3992
3993        // Request only name and age (not email or city)
3994        let keys = vec![PropertyKey::new("name"), PropertyKey::new("age")];
3995        let props = store.get_nodes_properties_selective_batch(&[n1, n2], &keys);
3996
3997        assert_eq!(props.len(), 2);
3998
3999        // n1: should have name and age, but NOT email
4000        assert_eq!(props[0].len(), 2);
4001        assert_eq!(
4002            props[0].get(&PropertyKey::new("name")),
4003            Some(&Value::from("Alice"))
4004        );
4005        assert_eq!(
4006            props[0].get(&PropertyKey::new("age")),
4007            Some(&Value::from(25i64))
4008        );
4009        assert_eq!(props[0].get(&PropertyKey::new("email")), None);
4010
4011        // n2: should have name and age, but NOT city
4012        assert_eq!(props[1].len(), 2);
4013        assert_eq!(
4014            props[1].get(&PropertyKey::new("name")),
4015            Some(&Value::from("Bob"))
4016        );
4017        assert_eq!(
4018            props[1].get(&PropertyKey::new("age")),
4019            Some(&Value::from(30i64))
4020        );
4021        assert_eq!(props[1].get(&PropertyKey::new("city")), None);
4022    }
4023
4024    #[test]
4025    fn test_get_nodes_properties_selective_batch_empty_keys() {
4026        let store = LpgStore::new();
4027
4028        let n1 = store.create_node(&["Person"]);
4029        store.set_node_property(n1, "name", Value::from("Alice"));
4030
4031        // Request no properties
4032        let props = store.get_nodes_properties_selective_batch(&[n1], &[]);
4033
4034        assert_eq!(props.len(), 1);
4035        assert!(props[0].is_empty()); // Empty map when no keys requested
4036    }
4037
4038    #[test]
4039    fn test_get_nodes_properties_selective_batch_missing_keys() {
4040        let store = LpgStore::new();
4041
4042        let n1 = store.create_node(&["Person"]);
4043        store.set_node_property(n1, "name", Value::from("Alice"));
4044
4045        // Request a property that doesn't exist
4046        let keys = vec![PropertyKey::new("nonexistent"), PropertyKey::new("name")];
4047        let props = store.get_nodes_properties_selective_batch(&[n1], &keys);
4048
4049        assert_eq!(props.len(), 1);
4050        assert_eq!(props[0].len(), 1); // Only name exists
4051        assert_eq!(
4052            props[0].get(&PropertyKey::new("name")),
4053            Some(&Value::from("Alice"))
4054        );
4055    }
4056}