Skip to main content

grafeo_core/graph/lpg/
store.rs

1//! The in-memory LPG graph store.
2//!
3//! This is where your nodes and edges actually live. Most users interact
4//! through [`GrafeoDB`](grafeo_engine::GrafeoDB), but algorithm implementers
5//! sometimes need the raw [`LpgStore`] for direct adjacency traversal.
6//!
7//! Key features:
8//! - MVCC versioning - concurrent readers don't block each other
9//! - Columnar properties with zone maps for fast filtering
10//! - Forward and backward adjacency indexes
11
12use super::property::CompareOp;
13use super::{Edge, EdgeRecord, Node, NodeRecord, PropertyStorage};
14use crate::graph::Direction;
15use crate::index::adjacency::ChunkedAdjacency;
16use crate::index::zone_map::ZoneMapEntry;
17use crate::statistics::{EdgeTypeStatistics, LabelStatistics, Statistics};
18use dashmap::DashMap;
19#[cfg(not(feature = "tiered-storage"))]
20use grafeo_common::mvcc::VersionChain;
21use grafeo_common::types::{EdgeId, EpochId, HashableValue, NodeId, PropertyKey, TxId, Value};
22use grafeo_common::utils::hash::{FxHashMap, FxHashSet};
23use parking_lot::RwLock;
24use std::cmp::Ordering as CmpOrdering;
25use std::sync::Arc;
26use std::sync::atomic::{AtomicU64, Ordering};
27
28/// Compares two values for ordering (used for range checks).
29fn compare_values_for_range(a: &Value, b: &Value) -> Option<CmpOrdering> {
30    match (a, b) {
31        (Value::Int64(a), Value::Int64(b)) => Some(a.cmp(b)),
32        (Value::Float64(a), Value::Float64(b)) => a.partial_cmp(b),
33        (Value::String(a), Value::String(b)) => Some(a.cmp(b)),
34        (Value::Bool(a), Value::Bool(b)) => Some(a.cmp(b)),
35        _ => None,
36    }
37}
38
39/// Checks if a value is within a range.
40fn value_in_range(
41    value: &Value,
42    min: Option<&Value>,
43    max: Option<&Value>,
44    min_inclusive: bool,
45    max_inclusive: bool,
46) -> bool {
47    // Check lower bound
48    if let Some(min_val) = min {
49        match compare_values_for_range(value, min_val) {
50            Some(CmpOrdering::Less) => return false,
51            Some(CmpOrdering::Equal) if !min_inclusive => return false,
52            None => return false, // Can't compare
53            _ => {}
54        }
55    }
56
57    // Check upper bound
58    if let Some(max_val) = max {
59        match compare_values_for_range(value, max_val) {
60            Some(CmpOrdering::Greater) => return false,
61            Some(CmpOrdering::Equal) if !max_inclusive => return false,
62            None => return false,
63            _ => {}
64        }
65    }
66
67    true
68}
69
70// Tiered storage imports
71#[cfg(feature = "tiered-storage")]
72use crate::storage::EpochStore;
73#[cfg(feature = "tiered-storage")]
74use grafeo_common::memory::arena::ArenaAllocator;
75#[cfg(feature = "tiered-storage")]
76use grafeo_common::mvcc::{ColdVersionRef, HotVersionRef, VersionIndex, VersionRef};
77
78/// Configuration for the LPG store.
79///
80/// The defaults work well for most cases. Tune `backward_edges` if you only
81/// traverse in one direction (saves memory), or adjust capacities if you know
82/// your graph size upfront (avoids reallocations).
83#[derive(Debug, Clone)]
84pub struct LpgStoreConfig {
85    /// Maintain backward adjacency for incoming edge queries. Turn off if
86    /// you only traverse outgoing edges - saves ~50% adjacency memory.
87    pub backward_edges: bool,
88    /// Initial capacity for nodes (avoids early reallocations).
89    pub initial_node_capacity: usize,
90    /// Initial capacity for edges (avoids early reallocations).
91    pub initial_edge_capacity: usize,
92}
93
94impl Default for LpgStoreConfig {
95    fn default() -> Self {
96        Self {
97            backward_edges: true,
98            initial_node_capacity: 1024,
99            initial_edge_capacity: 4096,
100        }
101    }
102}
103
104/// The core in-memory graph storage.
105///
106/// Everything lives here: nodes, edges, properties, adjacency indexes, and
107/// version chains for MVCC. Concurrent reads never block each other.
108///
109/// Most users should go through `GrafeoDB` (from the `grafeo_engine` crate) which
110/// adds transaction management and query execution. Use `LpgStore` directly
111/// when you need raw performance for algorithm implementations.
112///
113/// # Example
114///
115/// ```
116/// use grafeo_core::graph::lpg::LpgStore;
117/// use grafeo_core::graph::Direction;
118///
119/// let store = LpgStore::new();
120///
121/// // Create a small social network
122/// let alice = store.create_node(&["Person"]);
123/// let bob = store.create_node(&["Person"]);
124/// store.create_edge(alice, bob, "KNOWS");
125///
126/// // Traverse outgoing edges
127/// for neighbor in store.neighbors(alice, Direction::Outgoing) {
128///     println!("Alice knows node {:?}", neighbor);
129/// }
130/// ```
131pub struct LpgStore {
132    /// Configuration.
133    #[allow(dead_code)]
134    config: LpgStoreConfig,
135
136    /// Node records indexed by NodeId, with version chains for MVCC.
137    /// Used when `tiered-storage` feature is disabled.
138    #[cfg(not(feature = "tiered-storage"))]
139    nodes: RwLock<FxHashMap<NodeId, VersionChain<NodeRecord>>>,
140
141    /// Edge records indexed by EdgeId, with version chains for MVCC.
142    /// Used when `tiered-storage` feature is disabled.
143    #[cfg(not(feature = "tiered-storage"))]
144    edges: RwLock<FxHashMap<EdgeId, VersionChain<EdgeRecord>>>,
145
146    // === Tiered Storage Fields (feature-gated) ===
147    /// Arena allocator for hot data storage.
148    /// Data is stored in per-epoch arenas for fast allocation and bulk deallocation.
149    #[cfg(feature = "tiered-storage")]
150    arena_allocator: Arc<ArenaAllocator>,
151
152    /// Node version indexes - store metadata and arena offsets.
153    /// The actual NodeRecord data is stored in the arena.
154    #[cfg(feature = "tiered-storage")]
155    node_versions: RwLock<FxHashMap<NodeId, VersionIndex>>,
156
157    /// Edge version indexes - store metadata and arena offsets.
158    /// The actual EdgeRecord data is stored in the arena.
159    #[cfg(feature = "tiered-storage")]
160    edge_versions: RwLock<FxHashMap<EdgeId, VersionIndex>>,
161
162    /// Cold storage for frozen epochs.
163    /// Contains compressed epoch blocks for historical data.
164    #[cfg(feature = "tiered-storage")]
165    epoch_store: Arc<EpochStore>,
166
167    /// Property storage for nodes.
168    node_properties: PropertyStorage<NodeId>,
169
170    /// Property storage for edges.
171    edge_properties: PropertyStorage<EdgeId>,
172
173    /// Label name to ID mapping.
174    label_to_id: RwLock<FxHashMap<Arc<str>, u32>>,
175
176    /// Label ID to name mapping.
177    id_to_label: RwLock<Vec<Arc<str>>>,
178
179    /// Edge type name to ID mapping.
180    edge_type_to_id: RwLock<FxHashMap<Arc<str>, u32>>,
181
182    /// Edge type ID to name mapping.
183    id_to_edge_type: RwLock<Vec<Arc<str>>>,
184
185    /// Forward adjacency lists (outgoing edges).
186    forward_adj: ChunkedAdjacency,
187
188    /// Backward adjacency lists (incoming edges).
189    /// Only populated if config.backward_edges is true.
190    backward_adj: Option<ChunkedAdjacency>,
191
192    /// Label index: label_id -> set of node IDs.
193    label_index: RwLock<Vec<FxHashMap<NodeId, ()>>>,
194
195    /// Node labels: node_id -> set of label IDs.
196    /// Reverse mapping to efficiently get labels for a node.
197    node_labels: RwLock<FxHashMap<NodeId, FxHashSet<u32>>>,
198
199    /// Property indexes: property_key -> (value -> set of node IDs).
200    ///
201    /// When a property is indexed, lookups by value are O(1) instead of O(n).
202    /// Use [`create_property_index`] to enable indexing for a property.
203    property_indexes: RwLock<FxHashMap<PropertyKey, DashMap<HashableValue, FxHashSet<NodeId>>>>,
204
205    /// Next node ID.
206    next_node_id: AtomicU64,
207
208    /// Next edge ID.
209    next_edge_id: AtomicU64,
210
211    /// Current epoch.
212    current_epoch: AtomicU64,
213
214    /// Statistics for cost-based optimization.
215    statistics: RwLock<Statistics>,
216}
217
218impl LpgStore {
219    /// Creates a new LPG store with default configuration.
220    #[must_use]
221    pub fn new() -> Self {
222        Self::with_config(LpgStoreConfig::default())
223    }
224
225    /// Creates a new LPG store with custom configuration.
226    #[must_use]
227    pub fn with_config(config: LpgStoreConfig) -> Self {
228        let backward_adj = if config.backward_edges {
229            Some(ChunkedAdjacency::new())
230        } else {
231            None
232        };
233
234        Self {
235            #[cfg(not(feature = "tiered-storage"))]
236            nodes: RwLock::new(FxHashMap::default()),
237            #[cfg(not(feature = "tiered-storage"))]
238            edges: RwLock::new(FxHashMap::default()),
239            #[cfg(feature = "tiered-storage")]
240            arena_allocator: Arc::new(ArenaAllocator::new()),
241            #[cfg(feature = "tiered-storage")]
242            node_versions: RwLock::new(FxHashMap::default()),
243            #[cfg(feature = "tiered-storage")]
244            edge_versions: RwLock::new(FxHashMap::default()),
245            #[cfg(feature = "tiered-storage")]
246            epoch_store: Arc::new(EpochStore::new()),
247            node_properties: PropertyStorage::new(),
248            edge_properties: PropertyStorage::new(),
249            label_to_id: RwLock::new(FxHashMap::default()),
250            id_to_label: RwLock::new(Vec::new()),
251            edge_type_to_id: RwLock::new(FxHashMap::default()),
252            id_to_edge_type: RwLock::new(Vec::new()),
253            forward_adj: ChunkedAdjacency::new(),
254            backward_adj,
255            label_index: RwLock::new(Vec::new()),
256            node_labels: RwLock::new(FxHashMap::default()),
257            property_indexes: RwLock::new(FxHashMap::default()),
258            next_node_id: AtomicU64::new(0),
259            next_edge_id: AtomicU64::new(0),
260            current_epoch: AtomicU64::new(0),
261            statistics: RwLock::new(Statistics::new()),
262            config,
263        }
264    }
265
266    /// Returns the current epoch.
267    #[must_use]
268    pub fn current_epoch(&self) -> EpochId {
269        EpochId::new(self.current_epoch.load(Ordering::Acquire))
270    }
271
272    /// Creates a new epoch.
273    pub fn new_epoch(&self) -> EpochId {
274        let id = self.current_epoch.fetch_add(1, Ordering::AcqRel) + 1;
275        EpochId::new(id)
276    }
277
278    // === Node Operations ===
279
280    /// Creates a new node with the given labels.
281    ///
282    /// Uses the system transaction for non-transactional operations.
283    pub fn create_node(&self, labels: &[&str]) -> NodeId {
284        self.create_node_versioned(labels, self.current_epoch(), TxId::SYSTEM)
285    }
286
287    /// Creates a new node with the given labels within a transaction context.
288    #[cfg(not(feature = "tiered-storage"))]
289    pub fn create_node_versioned(&self, labels: &[&str], epoch: EpochId, tx_id: TxId) -> NodeId {
290        let id = NodeId::new(self.next_node_id.fetch_add(1, Ordering::Relaxed));
291
292        let mut record = NodeRecord::new(id, epoch);
293        record.set_label_count(labels.len() as u16);
294
295        // Store labels in node_labels map and label_index
296        let mut node_label_set = FxHashSet::default();
297        for label in labels {
298            let label_id = self.get_or_create_label_id(*label);
299            node_label_set.insert(label_id);
300
301            // Update label index
302            let mut index = self.label_index.write();
303            while index.len() <= label_id as usize {
304                index.push(FxHashMap::default());
305            }
306            index[label_id as usize].insert(id, ());
307        }
308
309        // Store node's labels
310        self.node_labels.write().insert(id, node_label_set);
311
312        // Create version chain with initial version
313        let chain = VersionChain::with_initial(record, epoch, tx_id);
314        self.nodes.write().insert(id, chain);
315        id
316    }
317
318    /// Creates a new node with the given labels within a transaction context.
319    /// (Tiered storage version: stores data in arena, metadata in VersionIndex)
320    #[cfg(feature = "tiered-storage")]
321    pub fn create_node_versioned(&self, labels: &[&str], epoch: EpochId, tx_id: TxId) -> NodeId {
322        let id = NodeId::new(self.next_node_id.fetch_add(1, Ordering::Relaxed));
323
324        let mut record = NodeRecord::new(id, epoch);
325        record.set_label_count(labels.len() as u16);
326
327        // Store labels in node_labels map and label_index
328        let mut node_label_set = FxHashSet::default();
329        for label in labels {
330            let label_id = self.get_or_create_label_id(*label);
331            node_label_set.insert(label_id);
332
333            // Update label index
334            let mut index = self.label_index.write();
335            while index.len() <= label_id as usize {
336                index.push(FxHashMap::default());
337            }
338            index[label_id as usize].insert(id, ());
339        }
340
341        // Store node's labels
342        self.node_labels.write().insert(id, node_label_set);
343
344        // Allocate record in arena and get offset (create epoch if needed)
345        let arena = self.arena_allocator.arena_or_create(epoch);
346        let (offset, _stored) = arena.alloc_value_with_offset(record);
347
348        // Create HotVersionRef pointing to arena data
349        let hot_ref = HotVersionRef::new(epoch, offset, tx_id);
350
351        // Create or update version index
352        let mut versions = self.node_versions.write();
353        if let Some(index) = versions.get_mut(&id) {
354            index.add_hot(hot_ref);
355        } else {
356            versions.insert(id, VersionIndex::with_initial(hot_ref));
357        }
358
359        id
360    }
361
362    /// Creates a new node with labels and properties.
363    pub fn create_node_with_props(
364        &self,
365        labels: &[&str],
366        properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
367    ) -> NodeId {
368        self.create_node_with_props_versioned(
369            labels,
370            properties,
371            self.current_epoch(),
372            TxId::SYSTEM,
373        )
374    }
375
376    /// Creates a new node with labels and properties within a transaction context.
377    #[cfg(not(feature = "tiered-storage"))]
378    pub fn create_node_with_props_versioned(
379        &self,
380        labels: &[&str],
381        properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
382        epoch: EpochId,
383        tx_id: TxId,
384    ) -> NodeId {
385        let id = self.create_node_versioned(labels, epoch, tx_id);
386
387        for (key, value) in properties {
388            self.node_properties.set(id, key.into(), value.into());
389        }
390
391        // Update props_count in record
392        let count = self.node_properties.get_all(id).len() as u16;
393        if let Some(chain) = self.nodes.write().get_mut(&id) {
394            if let Some(record) = chain.latest_mut() {
395                record.props_count = count;
396            }
397        }
398
399        id
400    }
401
402    /// Creates a new node with labels and properties within a transaction context.
403    /// (Tiered storage version)
404    #[cfg(feature = "tiered-storage")]
405    pub fn create_node_with_props_versioned(
406        &self,
407        labels: &[&str],
408        properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
409        epoch: EpochId,
410        tx_id: TxId,
411    ) -> NodeId {
412        let id = self.create_node_versioned(labels, epoch, tx_id);
413
414        for (key, value) in properties {
415            self.node_properties.set(id, key.into(), value.into());
416        }
417
418        // Note: props_count in record is not updated for tiered storage.
419        // The record is immutable once allocated in the arena.
420
421        id
422    }
423
424    /// Gets a node by ID (latest visible version).
425    #[must_use]
426    pub fn get_node(&self, id: NodeId) -> Option<Node> {
427        self.get_node_at_epoch(id, self.current_epoch())
428    }
429
430    /// Gets a node by ID at a specific epoch.
431    #[must_use]
432    #[cfg(not(feature = "tiered-storage"))]
433    pub fn get_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> Option<Node> {
434        let nodes = self.nodes.read();
435        let chain = nodes.get(&id)?;
436        let record = chain.visible_at(epoch)?;
437
438        if record.is_deleted() {
439            return None;
440        }
441
442        let mut node = Node::new(id);
443
444        // Get labels from node_labels map
445        let id_to_label = self.id_to_label.read();
446        let node_labels = self.node_labels.read();
447        if let Some(label_ids) = node_labels.get(&id) {
448            for &label_id in label_ids {
449                if let Some(label) = id_to_label.get(label_id as usize) {
450                    node.labels.push(label.clone());
451                }
452            }
453        }
454
455        // Get properties
456        node.properties = self.node_properties.get_all(id).into_iter().collect();
457
458        Some(node)
459    }
460
461    /// Gets a node by ID at a specific epoch.
462    /// (Tiered storage version: reads from arena via VersionIndex)
463    #[must_use]
464    #[cfg(feature = "tiered-storage")]
465    pub fn get_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> Option<Node> {
466        let versions = self.node_versions.read();
467        let index = versions.get(&id)?;
468        let version_ref = index.visible_at(epoch)?;
469
470        // Read the record from arena
471        let record = self.read_node_record(&version_ref)?;
472
473        if record.is_deleted() {
474            return None;
475        }
476
477        let mut node = Node::new(id);
478
479        // Get labels from node_labels map
480        let id_to_label = self.id_to_label.read();
481        let node_labels = self.node_labels.read();
482        if let Some(label_ids) = node_labels.get(&id) {
483            for &label_id in label_ids {
484                if let Some(label) = id_to_label.get(label_id as usize) {
485                    node.labels.push(label.clone());
486                }
487            }
488        }
489
490        // Get properties
491        node.properties = self.node_properties.get_all(id).into_iter().collect();
492
493        Some(node)
494    }
495
496    /// Gets a node visible to a specific transaction.
497    #[must_use]
498    #[cfg(not(feature = "tiered-storage"))]
499    pub fn get_node_versioned(&self, id: NodeId, epoch: EpochId, tx_id: TxId) -> Option<Node> {
500        let nodes = self.nodes.read();
501        let chain = nodes.get(&id)?;
502        let record = chain.visible_to(epoch, tx_id)?;
503
504        if record.is_deleted() {
505            return None;
506        }
507
508        let mut node = Node::new(id);
509
510        // Get labels from node_labels map
511        let id_to_label = self.id_to_label.read();
512        let node_labels = self.node_labels.read();
513        if let Some(label_ids) = node_labels.get(&id) {
514            for &label_id in label_ids {
515                if let Some(label) = id_to_label.get(label_id as usize) {
516                    node.labels.push(label.clone());
517                }
518            }
519        }
520
521        // Get properties
522        node.properties = self.node_properties.get_all(id).into_iter().collect();
523
524        Some(node)
525    }
526
527    /// Gets a node visible to a specific transaction.
528    /// (Tiered storage version: reads from arena via VersionIndex)
529    #[must_use]
530    #[cfg(feature = "tiered-storage")]
531    pub fn get_node_versioned(&self, id: NodeId, epoch: EpochId, tx_id: TxId) -> Option<Node> {
532        let versions = self.node_versions.read();
533        let index = versions.get(&id)?;
534        let version_ref = index.visible_to(epoch, tx_id)?;
535
536        // Read the record from arena
537        let record = self.read_node_record(&version_ref)?;
538
539        if record.is_deleted() {
540            return None;
541        }
542
543        let mut node = Node::new(id);
544
545        // Get labels from node_labels map
546        let id_to_label = self.id_to_label.read();
547        let node_labels = self.node_labels.read();
548        if let Some(label_ids) = node_labels.get(&id) {
549            for &label_id in label_ids {
550                if let Some(label) = id_to_label.get(label_id as usize) {
551                    node.labels.push(label.clone());
552                }
553            }
554        }
555
556        // Get properties
557        node.properties = self.node_properties.get_all(id).into_iter().collect();
558
559        Some(node)
560    }
561
562    /// Reads a NodeRecord from arena (hot) or epoch store (cold) using a VersionRef.
563    #[cfg(feature = "tiered-storage")]
564    #[allow(unsafe_code)]
565    fn read_node_record(&self, version_ref: &VersionRef) -> Option<NodeRecord> {
566        match version_ref {
567            VersionRef::Hot(hot_ref) => {
568                let arena = self.arena_allocator.arena(hot_ref.epoch);
569                // SAFETY: The offset was returned by alloc_value_with_offset for a NodeRecord
570                let record: &NodeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
571                Some(record.clone())
572            }
573            VersionRef::Cold(cold_ref) => {
574                // Read from compressed epoch store
575                self.epoch_store
576                    .get_node(cold_ref.epoch, cold_ref.block_offset, cold_ref.length)
577            }
578        }
579    }
580
581    /// Deletes a node and all its edges (using latest epoch).
582    pub fn delete_node(&self, id: NodeId) -> bool {
583        self.delete_node_at_epoch(id, self.current_epoch())
584    }
585
586    /// Deletes a node at a specific epoch.
587    #[cfg(not(feature = "tiered-storage"))]
588    pub fn delete_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> bool {
589        let mut nodes = self.nodes.write();
590        if let Some(chain) = nodes.get_mut(&id) {
591            // Check if visible at this epoch (not already deleted)
592            if let Some(record) = chain.visible_at(epoch) {
593                if record.is_deleted() {
594                    return false;
595                }
596            } else {
597                // Not visible at this epoch (already deleted or doesn't exist)
598                return false;
599            }
600
601            // Mark the version chain as deleted at this epoch
602            chain.mark_deleted(epoch);
603
604            // Remove from label index using node_labels map
605            let mut index = self.label_index.write();
606            let mut node_labels = self.node_labels.write();
607            if let Some(label_ids) = node_labels.remove(&id) {
608                for label_id in label_ids {
609                    if let Some(set) = index.get_mut(label_id as usize) {
610                        set.remove(&id);
611                    }
612                }
613            }
614
615            // Remove properties
616            drop(nodes); // Release lock before removing properties
617            drop(index);
618            drop(node_labels);
619            self.node_properties.remove_all(id);
620
621            // Note: Caller should use delete_node_edges() first if detach is needed
622
623            true
624        } else {
625            false
626        }
627    }
628
629    /// Deletes a node at a specific epoch.
630    /// (Tiered storage version)
631    #[cfg(feature = "tiered-storage")]
632    pub fn delete_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> bool {
633        let mut versions = self.node_versions.write();
634        if let Some(index) = versions.get_mut(&id) {
635            // Check if visible at this epoch
636            if let Some(version_ref) = index.visible_at(epoch) {
637                if let Some(record) = self.read_node_record(&version_ref) {
638                    if record.is_deleted() {
639                        return false;
640                    }
641                } else {
642                    return false;
643                }
644            } else {
645                return false;
646            }
647
648            // Mark as deleted in version index
649            index.mark_deleted(epoch);
650
651            // Remove from label index using node_labels map
652            let mut label_index = self.label_index.write();
653            let mut node_labels = self.node_labels.write();
654            if let Some(label_ids) = node_labels.remove(&id) {
655                for label_id in label_ids {
656                    if let Some(set) = label_index.get_mut(label_id as usize) {
657                        set.remove(&id);
658                    }
659                }
660            }
661
662            // Remove properties
663            drop(versions);
664            drop(label_index);
665            drop(node_labels);
666            self.node_properties.remove_all(id);
667
668            true
669        } else {
670            false
671        }
672    }
673
674    /// Deletes all edges connected to a node (implements DETACH DELETE).
675    ///
676    /// Call this before `delete_node()` if you want to remove a node that
677    /// has edges. Grafeo doesn't auto-delete edges - you have to be explicit.
678    #[cfg(not(feature = "tiered-storage"))]
679    pub fn delete_node_edges(&self, node_id: NodeId) {
680        // Get outgoing edges
681        let outgoing: Vec<EdgeId> = self
682            .forward_adj
683            .edges_from(node_id)
684            .into_iter()
685            .map(|(_, edge_id)| edge_id)
686            .collect();
687
688        // Get incoming edges
689        let incoming: Vec<EdgeId> = if let Some(ref backward) = self.backward_adj {
690            backward
691                .edges_from(node_id)
692                .into_iter()
693                .map(|(_, edge_id)| edge_id)
694                .collect()
695        } else {
696            // No backward adjacency - scan all edges
697            let epoch = self.current_epoch();
698            self.edges
699                .read()
700                .iter()
701                .filter_map(|(id, chain)| {
702                    chain.visible_at(epoch).and_then(|r| {
703                        if !r.is_deleted() && r.dst == node_id {
704                            Some(*id)
705                        } else {
706                            None
707                        }
708                    })
709                })
710                .collect()
711        };
712
713        // Delete all edges
714        for edge_id in outgoing.into_iter().chain(incoming) {
715            self.delete_edge(edge_id);
716        }
717    }
718
719    /// Deletes all edges connected to a node (implements DETACH DELETE).
720    /// (Tiered storage version)
721    #[cfg(feature = "tiered-storage")]
722    pub fn delete_node_edges(&self, node_id: NodeId) {
723        // Get outgoing edges
724        let outgoing: Vec<EdgeId> = self
725            .forward_adj
726            .edges_from(node_id)
727            .into_iter()
728            .map(|(_, edge_id)| edge_id)
729            .collect();
730
731        // Get incoming edges
732        let incoming: Vec<EdgeId> = if let Some(ref backward) = self.backward_adj {
733            backward
734                .edges_from(node_id)
735                .into_iter()
736                .map(|(_, edge_id)| edge_id)
737                .collect()
738        } else {
739            // No backward adjacency - scan all edges
740            let epoch = self.current_epoch();
741            let versions = self.edge_versions.read();
742            versions
743                .iter()
744                .filter_map(|(id, index)| {
745                    index.visible_at(epoch).and_then(|vref| {
746                        self.read_edge_record(&vref).and_then(|r| {
747                            if !r.is_deleted() && r.dst == node_id {
748                                Some(*id)
749                            } else {
750                                None
751                            }
752                        })
753                    })
754                })
755                .collect()
756        };
757
758        // Delete all edges
759        for edge_id in outgoing.into_iter().chain(incoming) {
760            self.delete_edge(edge_id);
761        }
762    }
763
764    /// Sets a property on a node.
765    #[cfg(not(feature = "tiered-storage"))]
766    pub fn set_node_property(&self, id: NodeId, key: &str, value: Value) {
767        let prop_key: PropertyKey = key.into();
768
769        // Update property index before setting the property (needs to read old value)
770        self.update_property_index_on_set(id, &prop_key, &value);
771
772        self.node_properties.set(id, prop_key, value);
773
774        // Update props_count in record
775        let count = self.node_properties.get_all(id).len() as u16;
776        if let Some(chain) = self.nodes.write().get_mut(&id) {
777            if let Some(record) = chain.latest_mut() {
778                record.props_count = count;
779            }
780        }
781    }
782
783    /// Sets a property on a node.
784    /// (Tiered storage version: properties stored separately, record is immutable)
785    #[cfg(feature = "tiered-storage")]
786    pub fn set_node_property(&self, id: NodeId, key: &str, value: Value) {
787        let prop_key: PropertyKey = key.into();
788
789        // Update property index before setting the property (needs to read old value)
790        self.update_property_index_on_set(id, &prop_key, &value);
791
792        self.node_properties.set(id, prop_key, value);
793        // Note: props_count in record is not updated for tiered storage.
794        // The record is immutable once allocated in the arena.
795        // Property count can be derived from PropertyStorage if needed.
796    }
797
798    /// Sets a property on an edge.
799    pub fn set_edge_property(&self, id: EdgeId, key: &str, value: Value) {
800        self.edge_properties.set(id, key.into(), value);
801    }
802
803    /// Removes a property from a node.
804    ///
805    /// Returns the previous value if it existed, or None if the property didn't exist.
806    #[cfg(not(feature = "tiered-storage"))]
807    pub fn remove_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
808        let prop_key: PropertyKey = key.into();
809
810        // Update property index before removing (needs to read old value)
811        self.update_property_index_on_remove(id, &prop_key);
812
813        let result = self.node_properties.remove(id, &prop_key);
814
815        // Update props_count in record
816        let count = self.node_properties.get_all(id).len() as u16;
817        if let Some(chain) = self.nodes.write().get_mut(&id) {
818            if let Some(record) = chain.latest_mut() {
819                record.props_count = count;
820            }
821        }
822
823        result
824    }
825
826    /// Removes a property from a node.
827    /// (Tiered storage version)
828    #[cfg(feature = "tiered-storage")]
829    pub fn remove_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
830        let prop_key: PropertyKey = key.into();
831
832        // Update property index before removing (needs to read old value)
833        self.update_property_index_on_remove(id, &prop_key);
834
835        self.node_properties.remove(id, &prop_key)
836        // Note: props_count in record is not updated for tiered storage.
837    }
838
839    /// Removes a property from an edge.
840    ///
841    /// Returns the previous value if it existed, or None if the property didn't exist.
842    pub fn remove_edge_property(&self, id: EdgeId, key: &str) -> Option<Value> {
843        self.edge_properties.remove(id, &key.into())
844    }
845
846    /// Gets a single property from a node without loading all properties.
847    ///
848    /// This is O(1) vs O(properties) for `get_node().get_property()`.
849    /// Use this for filter predicates where you only need one property value.
850    ///
851    /// # Example
852    ///
853    /// ```ignore
854    /// // Fast: Direct single-property lookup
855    /// let age = store.get_node_property(node_id, "age");
856    ///
857    /// // Slow: Loads all properties, then extracts one
858    /// let age = store.get_node(node_id).and_then(|n| n.get_property("age").cloned());
859    /// ```
860    #[must_use]
861    pub fn get_node_property(&self, id: NodeId, key: &PropertyKey) -> Option<Value> {
862        self.node_properties.get(id, key)
863    }
864
865    /// Gets a single property from an edge without loading all properties.
866    ///
867    /// This is O(1) vs O(properties) for `get_edge().get_property()`.
868    #[must_use]
869    pub fn get_edge_property(&self, id: EdgeId, key: &PropertyKey) -> Option<Value> {
870        self.edge_properties.get(id, key)
871    }
872
873    // === Batch Property Operations ===
874
875    /// Gets a property for multiple nodes in a single batch operation.
876    ///
877    /// More efficient than calling [`Self::get_node_property`] in a loop because it
878    /// reduces lock overhead and enables better cache utilization.
879    ///
880    /// # Example
881    ///
882    /// ```
883    /// use grafeo_core::graph::lpg::LpgStore;
884    /// use grafeo_common::types::{NodeId, PropertyKey, Value};
885    ///
886    /// let store = LpgStore::new();
887    /// let n1 = store.create_node(&["Person"]);
888    /// let n2 = store.create_node(&["Person"]);
889    /// store.set_node_property(n1, "age", Value::from(25i64));
890    /// store.set_node_property(n2, "age", Value::from(30i64));
891    ///
892    /// let ages = store.get_node_property_batch(&[n1, n2], &PropertyKey::new("age"));
893    /// assert_eq!(ages, vec![Some(Value::from(25i64)), Some(Value::from(30i64))]);
894    /// ```
895    #[must_use]
896    pub fn get_node_property_batch(&self, ids: &[NodeId], key: &PropertyKey) -> Vec<Option<Value>> {
897        self.node_properties.get_batch(ids, key)
898    }
899
900    /// Gets all properties for multiple nodes in a single batch operation.
901    ///
902    /// Returns a vector of property maps, one per node ID (empty map if no properties).
903    /// More efficient than calling [`Self::get_node`] in a loop.
904    #[must_use]
905    pub fn get_nodes_properties_batch(&self, ids: &[NodeId]) -> Vec<FxHashMap<PropertyKey, Value>> {
906        self.node_properties.get_all_batch(ids)
907    }
908
909    /// Finds nodes where a property value is in a range.
910    ///
911    /// This is useful for queries like `n.age > 30` or `n.price BETWEEN 10 AND 100`.
912    /// Uses zone maps to skip scanning when the range definitely doesn't match.
913    ///
914    /// # Arguments
915    ///
916    /// * `property` - The property to check
917    /// * `min` - Optional lower bound (None for unbounded)
918    /// * `max` - Optional upper bound (None for unbounded)
919    /// * `min_inclusive` - Whether lower bound is inclusive (>= vs >)
920    /// * `max_inclusive` - Whether upper bound is inclusive (<= vs <)
921    ///
922    /// # Example
923    ///
924    /// ```
925    /// use grafeo_core::graph::lpg::LpgStore;
926    /// use grafeo_common::types::Value;
927    ///
928    /// let store = LpgStore::new();
929    /// let n1 = store.create_node(&["Person"]);
930    /// let n2 = store.create_node(&["Person"]);
931    /// store.set_node_property(n1, "age", Value::from(25i64));
932    /// store.set_node_property(n2, "age", Value::from(35i64));
933    ///
934    /// // Find nodes where age > 30
935    /// let result = store.find_nodes_in_range(
936    ///     "age",
937    ///     Some(&Value::from(30i64)),
938    ///     None,
939    ///     false, // exclusive lower bound
940    ///     true,  // inclusive upper bound (doesn't matter since None)
941    /// );
942    /// assert_eq!(result.len(), 1); // Only n2 matches
943    /// ```
944    #[must_use]
945    pub fn find_nodes_in_range(
946        &self,
947        property: &str,
948        min: Option<&Value>,
949        max: Option<&Value>,
950        min_inclusive: bool,
951        max_inclusive: bool,
952    ) -> Vec<NodeId> {
953        let key = PropertyKey::new(property);
954
955        // Check zone map first - if no values could match, return empty
956        if !self
957            .node_properties
958            .might_match_range(&key, min, max, min_inclusive, max_inclusive)
959        {
960            return Vec::new();
961        }
962
963        // Scan all nodes and filter by range
964        self.node_ids()
965            .into_iter()
966            .filter(|&node_id| {
967                self.node_properties
968                    .get(node_id, &key)
969                    .is_some_and(|v| value_in_range(&v, min, max, min_inclusive, max_inclusive))
970            })
971            .collect()
972    }
973
974    /// Finds nodes matching multiple property equality conditions.
975    ///
976    /// This is more efficient than intersecting multiple single-property lookups
977    /// because it can use indexes when available and short-circuits on the first
978    /// miss.
979    ///
980    /// # Example
981    ///
982    /// ```
983    /// use grafeo_core::graph::lpg::LpgStore;
984    /// use grafeo_common::types::Value;
985    ///
986    /// let store = LpgStore::new();
987    /// let alice = store.create_node(&["Person"]);
988    /// store.set_node_property(alice, "name", Value::from("Alice"));
989    /// store.set_node_property(alice, "city", Value::from("NYC"));
990    ///
991    /// // Find nodes where name = "Alice" AND city = "NYC"
992    /// let matches = store.find_nodes_by_properties(&[
993    ///     ("name", Value::from("Alice")),
994    ///     ("city", Value::from("NYC")),
995    /// ]);
996    /// assert!(matches.contains(&alice));
997    /// ```
998    #[must_use]
999    pub fn find_nodes_by_properties(&self, conditions: &[(&str, Value)]) -> Vec<NodeId> {
1000        if conditions.is_empty() {
1001            return self.node_ids();
1002        }
1003
1004        // Find the most selective condition (smallest result set) to start
1005        // If any condition has an index, use that first
1006        let mut best_start: Option<(usize, Vec<NodeId>)> = None;
1007        let indexes = self.property_indexes.read();
1008
1009        for (i, (prop, value)) in conditions.iter().enumerate() {
1010            let key = PropertyKey::new(*prop);
1011            let hv = HashableValue::new(value.clone());
1012
1013            if let Some(index) = indexes.get(&key) {
1014                let matches: Vec<NodeId> = index
1015                    .get(&hv)
1016                    .map(|nodes| nodes.iter().copied().collect())
1017                    .unwrap_or_default();
1018
1019                // Short-circuit if any indexed condition has no matches
1020                if matches.is_empty() {
1021                    return Vec::new();
1022                }
1023
1024                // Use smallest indexed result as starting point
1025                if best_start
1026                    .as_ref()
1027                    .is_none_or(|(_, best)| matches.len() < best.len())
1028                {
1029                    best_start = Some((i, matches));
1030                }
1031            }
1032        }
1033        drop(indexes);
1034
1035        // Start from best indexed result or fall back to full node scan
1036        let (start_idx, mut candidates) = best_start.unwrap_or_else(|| {
1037            // No indexes available, start with first condition via full scan
1038            let (prop, value) = &conditions[0];
1039            (0, self.find_nodes_by_property(prop, value))
1040        });
1041
1042        // Filter candidates through remaining conditions
1043        for (i, (prop, value)) in conditions.iter().enumerate() {
1044            if i == start_idx {
1045                continue;
1046            }
1047
1048            let key = PropertyKey::new(*prop);
1049            candidates.retain(|&node_id| {
1050                self.node_properties
1051                    .get(node_id, &key)
1052                    .is_some_and(|v| v == *value)
1053            });
1054
1055            // Short-circuit if no candidates remain
1056            if candidates.is_empty() {
1057                return Vec::new();
1058            }
1059        }
1060
1061        candidates
1062    }
1063
1064    // === Property Index Operations ===
1065
1066    /// Creates an index on a node property for O(1) lookups by value.
1067    ///
1068    /// After creating an index, calls to [`Self::find_nodes_by_property`] will be
1069    /// O(1) instead of O(n) for this property. The index is automatically
1070    /// maintained when properties are set or removed.
1071    ///
1072    /// # Example
1073    ///
1074    /// ```
1075    /// use grafeo_core::graph::lpg::LpgStore;
1076    /// use grafeo_common::types::Value;
1077    ///
1078    /// let store = LpgStore::new();
1079    ///
1080    /// // Create nodes with an 'id' property
1081    /// let alice = store.create_node(&["Person"]);
1082    /// store.set_node_property(alice, "id", Value::from("alice_123"));
1083    ///
1084    /// // Create an index on the 'id' property
1085    /// store.create_property_index("id");
1086    ///
1087    /// // Now lookups by 'id' are O(1)
1088    /// let found = store.find_nodes_by_property("id", &Value::from("alice_123"));
1089    /// assert!(found.contains(&alice));
1090    /// ```
1091    pub fn create_property_index(&self, property: &str) {
1092        let key = PropertyKey::new(property);
1093
1094        let mut indexes = self.property_indexes.write();
1095        if indexes.contains_key(&key) {
1096            return; // Already indexed
1097        }
1098
1099        // Create the index and populate it with existing data
1100        let index: DashMap<HashableValue, FxHashSet<NodeId>> = DashMap::new();
1101
1102        // Scan all nodes to build the index
1103        for node_id in self.node_ids() {
1104            if let Some(value) = self.node_properties.get(node_id, &key) {
1105                let hv = HashableValue::new(value);
1106                index
1107                    .entry(hv)
1108                    .or_insert_with(FxHashSet::default)
1109                    .insert(node_id);
1110            }
1111        }
1112
1113        indexes.insert(key, index);
1114    }
1115
1116    /// Drops an index on a node property.
1117    ///
1118    /// Returns `true` if the index existed and was removed.
1119    pub fn drop_property_index(&self, property: &str) -> bool {
1120        let key = PropertyKey::new(property);
1121        self.property_indexes.write().remove(&key).is_some()
1122    }
1123
1124    /// Returns `true` if the property has an index.
1125    #[must_use]
1126    pub fn has_property_index(&self, property: &str) -> bool {
1127        let key = PropertyKey::new(property);
1128        self.property_indexes.read().contains_key(&key)
1129    }
1130
1131    /// Finds all nodes that have a specific property value.
1132    ///
1133    /// If the property is indexed, this is O(1). Otherwise, it scans all nodes
1134    /// which is O(n). Use [`Self::create_property_index`] for frequently queried properties.
1135    ///
1136    /// # Example
1137    ///
1138    /// ```
1139    /// use grafeo_core::graph::lpg::LpgStore;
1140    /// use grafeo_common::types::Value;
1141    ///
1142    /// let store = LpgStore::new();
1143    /// store.create_property_index("city"); // Optional but makes lookups fast
1144    ///
1145    /// let alice = store.create_node(&["Person"]);
1146    /// let bob = store.create_node(&["Person"]);
1147    /// store.set_node_property(alice, "city", Value::from("NYC"));
1148    /// store.set_node_property(bob, "city", Value::from("NYC"));
1149    ///
1150    /// let nyc_people = store.find_nodes_by_property("city", &Value::from("NYC"));
1151    /// assert_eq!(nyc_people.len(), 2);
1152    /// ```
1153    #[must_use]
1154    pub fn find_nodes_by_property(&self, property: &str, value: &Value) -> Vec<NodeId> {
1155        let key = PropertyKey::new(property);
1156        let hv = HashableValue::new(value.clone());
1157
1158        // Try indexed lookup first
1159        let indexes = self.property_indexes.read();
1160        if let Some(index) = indexes.get(&key) {
1161            if let Some(nodes) = index.get(&hv) {
1162                return nodes.iter().copied().collect();
1163            }
1164            return Vec::new();
1165        }
1166        drop(indexes);
1167
1168        // Fall back to full scan
1169        self.node_ids()
1170            .into_iter()
1171            .filter(|&node_id| {
1172                self.node_properties
1173                    .get(node_id, &key)
1174                    .is_some_and(|v| v == *value)
1175            })
1176            .collect()
1177    }
1178
1179    /// Updates property indexes when a property is set.
1180    fn update_property_index_on_set(&self, node_id: NodeId, key: &PropertyKey, new_value: &Value) {
1181        let indexes = self.property_indexes.read();
1182        if let Some(index) = indexes.get(key) {
1183            // Get old value to remove from index
1184            if let Some(old_value) = self.node_properties.get(node_id, key) {
1185                let old_hv = HashableValue::new(old_value);
1186                if let Some(mut nodes) = index.get_mut(&old_hv) {
1187                    nodes.remove(&node_id);
1188                    if nodes.is_empty() {
1189                        drop(nodes);
1190                        index.remove(&old_hv);
1191                    }
1192                }
1193            }
1194
1195            // Add new value to index
1196            let new_hv = HashableValue::new(new_value.clone());
1197            index
1198                .entry(new_hv)
1199                .or_insert_with(FxHashSet::default)
1200                .insert(node_id);
1201        }
1202    }
1203
1204    /// Updates property indexes when a property is removed.
1205    fn update_property_index_on_remove(&self, node_id: NodeId, key: &PropertyKey) {
1206        let indexes = self.property_indexes.read();
1207        if let Some(index) = indexes.get(key) {
1208            // Get old value to remove from index
1209            if let Some(old_value) = self.node_properties.get(node_id, key) {
1210                let old_hv = HashableValue::new(old_value);
1211                if let Some(mut nodes) = index.get_mut(&old_hv) {
1212                    nodes.remove(&node_id);
1213                    if nodes.is_empty() {
1214                        drop(nodes);
1215                        index.remove(&old_hv);
1216                    }
1217                }
1218            }
1219        }
1220    }
1221
1222    /// Adds a label to a node.
1223    ///
1224    /// Returns true if the label was added, false if the node doesn't exist
1225    /// or already has the label.
1226    #[cfg(not(feature = "tiered-storage"))]
1227    pub fn add_label(&self, node_id: NodeId, label: &str) -> bool {
1228        let epoch = self.current_epoch();
1229
1230        // Check if node exists
1231        let nodes = self.nodes.read();
1232        if let Some(chain) = nodes.get(&node_id) {
1233            if chain.visible_at(epoch).map_or(true, |r| r.is_deleted()) {
1234                return false;
1235            }
1236        } else {
1237            return false;
1238        }
1239        drop(nodes);
1240
1241        // Get or create label ID
1242        let label_id = self.get_or_create_label_id(label);
1243
1244        // Add to node_labels map
1245        let mut node_labels = self.node_labels.write();
1246        let label_set = node_labels
1247            .entry(node_id)
1248            .or_insert_with(FxHashSet::default);
1249
1250        if label_set.contains(&label_id) {
1251            return false; // Already has this label
1252        }
1253
1254        label_set.insert(label_id);
1255        drop(node_labels);
1256
1257        // Add to label_index
1258        let mut index = self.label_index.write();
1259        if (label_id as usize) >= index.len() {
1260            index.resize(label_id as usize + 1, FxHashMap::default());
1261        }
1262        index[label_id as usize].insert(node_id, ());
1263
1264        // Update label count in node record
1265        if let Some(chain) = self.nodes.write().get_mut(&node_id) {
1266            if let Some(record) = chain.latest_mut() {
1267                let count = self.node_labels.read().get(&node_id).map_or(0, |s| s.len());
1268                record.set_label_count(count as u16);
1269            }
1270        }
1271
1272        true
1273    }
1274
1275    /// Adds a label to a node.
1276    /// (Tiered storage version)
1277    #[cfg(feature = "tiered-storage")]
1278    pub fn add_label(&self, node_id: NodeId, label: &str) -> bool {
1279        let epoch = self.current_epoch();
1280
1281        // Check if node exists
1282        let versions = self.node_versions.read();
1283        if let Some(index) = versions.get(&node_id) {
1284            if let Some(vref) = index.visible_at(epoch) {
1285                if let Some(record) = self.read_node_record(&vref) {
1286                    if record.is_deleted() {
1287                        return false;
1288                    }
1289                } else {
1290                    return false;
1291                }
1292            } else {
1293                return false;
1294            }
1295        } else {
1296            return false;
1297        }
1298        drop(versions);
1299
1300        // Get or create label ID
1301        let label_id = self.get_or_create_label_id(label);
1302
1303        // Add to node_labels map
1304        let mut node_labels = self.node_labels.write();
1305        let label_set = node_labels
1306            .entry(node_id)
1307            .or_insert_with(FxHashSet::default);
1308
1309        if label_set.contains(&label_id) {
1310            return false; // Already has this label
1311        }
1312
1313        label_set.insert(label_id);
1314        drop(node_labels);
1315
1316        // Add to label_index
1317        let mut index = self.label_index.write();
1318        if (label_id as usize) >= index.len() {
1319            index.resize(label_id as usize + 1, FxHashMap::default());
1320        }
1321        index[label_id as usize].insert(node_id, ());
1322
1323        // Note: label_count in record is not updated for tiered storage.
1324        // The record is immutable once allocated in the arena.
1325
1326        true
1327    }
1328
1329    /// Removes a label from a node.
1330    ///
1331    /// Returns true if the label was removed, false if the node doesn't exist
1332    /// or doesn't have the label.
1333    #[cfg(not(feature = "tiered-storage"))]
1334    pub fn remove_label(&self, node_id: NodeId, label: &str) -> bool {
1335        let epoch = self.current_epoch();
1336
1337        // Check if node exists
1338        let nodes = self.nodes.read();
1339        if let Some(chain) = nodes.get(&node_id) {
1340            if chain.visible_at(epoch).map_or(true, |r| r.is_deleted()) {
1341                return false;
1342            }
1343        } else {
1344            return false;
1345        }
1346        drop(nodes);
1347
1348        // Get label ID
1349        let label_id = {
1350            let label_ids = self.label_to_id.read();
1351            match label_ids.get(label) {
1352                Some(&id) => id,
1353                None => return false, // Label doesn't exist
1354            }
1355        };
1356
1357        // Remove from node_labels map
1358        let mut node_labels = self.node_labels.write();
1359        if let Some(label_set) = node_labels.get_mut(&node_id) {
1360            if !label_set.remove(&label_id) {
1361                return false; // Node doesn't have this label
1362            }
1363        } else {
1364            return false;
1365        }
1366        drop(node_labels);
1367
1368        // Remove from label_index
1369        let mut index = self.label_index.write();
1370        if (label_id as usize) < index.len() {
1371            index[label_id as usize].remove(&node_id);
1372        }
1373
1374        // Update label count in node record
1375        if let Some(chain) = self.nodes.write().get_mut(&node_id) {
1376            if let Some(record) = chain.latest_mut() {
1377                let count = self.node_labels.read().get(&node_id).map_or(0, |s| s.len());
1378                record.set_label_count(count as u16);
1379            }
1380        }
1381
1382        true
1383    }
1384
1385    /// Removes a label from a node.
1386    /// (Tiered storage version)
1387    #[cfg(feature = "tiered-storage")]
1388    pub fn remove_label(&self, node_id: NodeId, label: &str) -> bool {
1389        let epoch = self.current_epoch();
1390
1391        // Check if node exists
1392        let versions = self.node_versions.read();
1393        if let Some(index) = versions.get(&node_id) {
1394            if let Some(vref) = index.visible_at(epoch) {
1395                if let Some(record) = self.read_node_record(&vref) {
1396                    if record.is_deleted() {
1397                        return false;
1398                    }
1399                } else {
1400                    return false;
1401                }
1402            } else {
1403                return false;
1404            }
1405        } else {
1406            return false;
1407        }
1408        drop(versions);
1409
1410        // Get label ID
1411        let label_id = {
1412            let label_ids = self.label_to_id.read();
1413            match label_ids.get(label) {
1414                Some(&id) => id,
1415                None => return false, // Label doesn't exist
1416            }
1417        };
1418
1419        // Remove from node_labels map
1420        let mut node_labels = self.node_labels.write();
1421        if let Some(label_set) = node_labels.get_mut(&node_id) {
1422            if !label_set.remove(&label_id) {
1423                return false; // Node doesn't have this label
1424            }
1425        } else {
1426            return false;
1427        }
1428        drop(node_labels);
1429
1430        // Remove from label_index
1431        let mut index = self.label_index.write();
1432        if (label_id as usize) < index.len() {
1433            index[label_id as usize].remove(&node_id);
1434        }
1435
1436        // Note: label_count in record is not updated for tiered storage.
1437
1438        true
1439    }
1440
1441    /// Returns the number of nodes (non-deleted at current epoch).
1442    #[must_use]
1443    #[cfg(not(feature = "tiered-storage"))]
1444    pub fn node_count(&self) -> usize {
1445        let epoch = self.current_epoch();
1446        self.nodes
1447            .read()
1448            .values()
1449            .filter_map(|chain| chain.visible_at(epoch))
1450            .filter(|r| !r.is_deleted())
1451            .count()
1452    }
1453
1454    /// Returns the number of nodes (non-deleted at current epoch).
1455    /// (Tiered storage version)
1456    #[must_use]
1457    #[cfg(feature = "tiered-storage")]
1458    pub fn node_count(&self) -> usize {
1459        let epoch = self.current_epoch();
1460        let versions = self.node_versions.read();
1461        versions
1462            .iter()
1463            .filter(|(_, index)| {
1464                index.visible_at(epoch).map_or(false, |vref| {
1465                    self.read_node_record(&vref)
1466                        .map_or(false, |r| !r.is_deleted())
1467                })
1468            })
1469            .count()
1470    }
1471
1472    /// Returns all node IDs in the store.
1473    ///
1474    /// This returns a snapshot of current node IDs. The returned vector
1475    /// excludes deleted nodes. Results are sorted by NodeId for deterministic
1476    /// iteration order.
1477    #[must_use]
1478    #[cfg(not(feature = "tiered-storage"))]
1479    pub fn node_ids(&self) -> Vec<NodeId> {
1480        let epoch = self.current_epoch();
1481        let mut ids: Vec<NodeId> = self
1482            .nodes
1483            .read()
1484            .iter()
1485            .filter_map(|(id, chain)| {
1486                chain
1487                    .visible_at(epoch)
1488                    .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
1489            })
1490            .collect();
1491        ids.sort_unstable();
1492        ids
1493    }
1494
1495    /// Returns all node IDs in the store.
1496    /// (Tiered storage version)
1497    #[must_use]
1498    #[cfg(feature = "tiered-storage")]
1499    pub fn node_ids(&self) -> Vec<NodeId> {
1500        let epoch = self.current_epoch();
1501        let versions = self.node_versions.read();
1502        let mut ids: Vec<NodeId> = versions
1503            .iter()
1504            .filter_map(|(id, index)| {
1505                index.visible_at(epoch).and_then(|vref| {
1506                    self.read_node_record(&vref)
1507                        .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
1508                })
1509            })
1510            .collect();
1511        ids.sort_unstable();
1512        ids
1513    }
1514
1515    // === Edge Operations ===
1516
1517    /// Creates a new edge.
1518    pub fn create_edge(&self, src: NodeId, dst: NodeId, edge_type: &str) -> EdgeId {
1519        self.create_edge_versioned(src, dst, edge_type, self.current_epoch(), TxId::SYSTEM)
1520    }
1521
1522    /// Creates a new edge within a transaction context.
1523    #[cfg(not(feature = "tiered-storage"))]
1524    pub fn create_edge_versioned(
1525        &self,
1526        src: NodeId,
1527        dst: NodeId,
1528        edge_type: &str,
1529        epoch: EpochId,
1530        tx_id: TxId,
1531    ) -> EdgeId {
1532        let id = EdgeId::new(self.next_edge_id.fetch_add(1, Ordering::Relaxed));
1533        let type_id = self.get_or_create_edge_type_id(edge_type);
1534
1535        let record = EdgeRecord::new(id, src, dst, type_id, epoch);
1536        let chain = VersionChain::with_initial(record, epoch, tx_id);
1537        self.edges.write().insert(id, chain);
1538
1539        // Update adjacency
1540        self.forward_adj.add_edge(src, dst, id);
1541        if let Some(ref backward) = self.backward_adj {
1542            backward.add_edge(dst, src, id);
1543        }
1544
1545        id
1546    }
1547
1548    /// Creates a new edge within a transaction context.
1549    /// (Tiered storage version)
1550    #[cfg(feature = "tiered-storage")]
1551    pub fn create_edge_versioned(
1552        &self,
1553        src: NodeId,
1554        dst: NodeId,
1555        edge_type: &str,
1556        epoch: EpochId,
1557        tx_id: TxId,
1558    ) -> EdgeId {
1559        let id = EdgeId::new(self.next_edge_id.fetch_add(1, Ordering::Relaxed));
1560        let type_id = self.get_or_create_edge_type_id(edge_type);
1561
1562        let record = EdgeRecord::new(id, src, dst, type_id, epoch);
1563
1564        // Allocate record in arena and get offset (create epoch if needed)
1565        let arena = self.arena_allocator.arena_or_create(epoch);
1566        let (offset, _stored) = arena.alloc_value_with_offset(record);
1567
1568        // Create HotVersionRef pointing to arena data
1569        let hot_ref = HotVersionRef::new(epoch, offset, tx_id);
1570
1571        // Create or update version index
1572        let mut versions = self.edge_versions.write();
1573        if let Some(index) = versions.get_mut(&id) {
1574            index.add_hot(hot_ref);
1575        } else {
1576            versions.insert(id, VersionIndex::with_initial(hot_ref));
1577        }
1578
1579        // Update adjacency
1580        self.forward_adj.add_edge(src, dst, id);
1581        if let Some(ref backward) = self.backward_adj {
1582            backward.add_edge(dst, src, id);
1583        }
1584
1585        id
1586    }
1587
1588    /// Creates a new edge with properties.
1589    pub fn create_edge_with_props(
1590        &self,
1591        src: NodeId,
1592        dst: NodeId,
1593        edge_type: &str,
1594        properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
1595    ) -> EdgeId {
1596        let id = self.create_edge(src, dst, edge_type);
1597
1598        for (key, value) in properties {
1599            self.edge_properties.set(id, key.into(), value.into());
1600        }
1601
1602        id
1603    }
1604
1605    /// Gets an edge by ID (latest visible version).
1606    #[must_use]
1607    pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
1608        self.get_edge_at_epoch(id, self.current_epoch())
1609    }
1610
1611    /// Gets an edge by ID at a specific epoch.
1612    #[must_use]
1613    #[cfg(not(feature = "tiered-storage"))]
1614    pub fn get_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> Option<Edge> {
1615        let edges = self.edges.read();
1616        let chain = edges.get(&id)?;
1617        let record = chain.visible_at(epoch)?;
1618
1619        if record.is_deleted() {
1620            return None;
1621        }
1622
1623        let edge_type = {
1624            let id_to_type = self.id_to_edge_type.read();
1625            id_to_type.get(record.type_id as usize)?.clone()
1626        };
1627
1628        let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1629
1630        // Get properties
1631        edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1632
1633        Some(edge)
1634    }
1635
1636    /// Gets an edge by ID at a specific epoch.
1637    /// (Tiered storage version)
1638    #[must_use]
1639    #[cfg(feature = "tiered-storage")]
1640    pub fn get_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> Option<Edge> {
1641        let versions = self.edge_versions.read();
1642        let index = versions.get(&id)?;
1643        let version_ref = index.visible_at(epoch)?;
1644
1645        let record = self.read_edge_record(&version_ref)?;
1646
1647        if record.is_deleted() {
1648            return None;
1649        }
1650
1651        let edge_type = {
1652            let id_to_type = self.id_to_edge_type.read();
1653            id_to_type.get(record.type_id as usize)?.clone()
1654        };
1655
1656        let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1657
1658        // Get properties
1659        edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1660
1661        Some(edge)
1662    }
1663
1664    /// Gets an edge visible to a specific transaction.
1665    #[must_use]
1666    #[cfg(not(feature = "tiered-storage"))]
1667    pub fn get_edge_versioned(&self, id: EdgeId, epoch: EpochId, tx_id: TxId) -> Option<Edge> {
1668        let edges = self.edges.read();
1669        let chain = edges.get(&id)?;
1670        let record = chain.visible_to(epoch, tx_id)?;
1671
1672        if record.is_deleted() {
1673            return None;
1674        }
1675
1676        let edge_type = {
1677            let id_to_type = self.id_to_edge_type.read();
1678            id_to_type.get(record.type_id as usize)?.clone()
1679        };
1680
1681        let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1682
1683        // Get properties
1684        edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1685
1686        Some(edge)
1687    }
1688
1689    /// Gets an edge visible to a specific transaction.
1690    /// (Tiered storage version)
1691    #[must_use]
1692    #[cfg(feature = "tiered-storage")]
1693    pub fn get_edge_versioned(&self, id: EdgeId, epoch: EpochId, tx_id: TxId) -> Option<Edge> {
1694        let versions = self.edge_versions.read();
1695        let index = versions.get(&id)?;
1696        let version_ref = index.visible_to(epoch, tx_id)?;
1697
1698        let record = self.read_edge_record(&version_ref)?;
1699
1700        if record.is_deleted() {
1701            return None;
1702        }
1703
1704        let edge_type = {
1705            let id_to_type = self.id_to_edge_type.read();
1706            id_to_type.get(record.type_id as usize)?.clone()
1707        };
1708
1709        let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1710
1711        // Get properties
1712        edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1713
1714        Some(edge)
1715    }
1716
1717    /// Reads an EdgeRecord from arena using a VersionRef.
1718    #[cfg(feature = "tiered-storage")]
1719    #[allow(unsafe_code)]
1720    fn read_edge_record(&self, version_ref: &VersionRef) -> Option<EdgeRecord> {
1721        match version_ref {
1722            VersionRef::Hot(hot_ref) => {
1723                let arena = self.arena_allocator.arena(hot_ref.epoch);
1724                // SAFETY: The offset was returned by alloc_value_with_offset for an EdgeRecord
1725                let record: &EdgeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
1726                Some(record.clone())
1727            }
1728            VersionRef::Cold(cold_ref) => {
1729                // Read from compressed epoch store
1730                self.epoch_store
1731                    .get_edge(cold_ref.epoch, cold_ref.block_offset, cold_ref.length)
1732            }
1733        }
1734    }
1735
1736    /// Deletes an edge (using latest epoch).
1737    pub fn delete_edge(&self, id: EdgeId) -> bool {
1738        self.delete_edge_at_epoch(id, self.current_epoch())
1739    }
1740
1741    /// Deletes an edge at a specific epoch.
1742    #[cfg(not(feature = "tiered-storage"))]
1743    pub fn delete_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> bool {
1744        let mut edges = self.edges.write();
1745        if let Some(chain) = edges.get_mut(&id) {
1746            // Get the visible record to check if deleted and get src/dst
1747            let (src, dst) = {
1748                match chain.visible_at(epoch) {
1749                    Some(record) => {
1750                        if record.is_deleted() {
1751                            return false;
1752                        }
1753                        (record.src, record.dst)
1754                    }
1755                    None => return false, // Not visible at this epoch (already deleted)
1756                }
1757            };
1758
1759            // Mark the version chain as deleted
1760            chain.mark_deleted(epoch);
1761
1762            drop(edges); // Release lock
1763
1764            // Mark as deleted in adjacency (soft delete)
1765            self.forward_adj.mark_deleted(src, id);
1766            if let Some(ref backward) = self.backward_adj {
1767                backward.mark_deleted(dst, id);
1768            }
1769
1770            // Remove properties
1771            self.edge_properties.remove_all(id);
1772
1773            true
1774        } else {
1775            false
1776        }
1777    }
1778
1779    /// Deletes an edge at a specific epoch.
1780    /// (Tiered storage version)
1781    #[cfg(feature = "tiered-storage")]
1782    pub fn delete_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> bool {
1783        let mut versions = self.edge_versions.write();
1784        if let Some(index) = versions.get_mut(&id) {
1785            // Get the visible record to check if deleted and get src/dst
1786            let (src, dst) = {
1787                match index.visible_at(epoch) {
1788                    Some(version_ref) => {
1789                        if let Some(record) = self.read_edge_record(&version_ref) {
1790                            if record.is_deleted() {
1791                                return false;
1792                            }
1793                            (record.src, record.dst)
1794                        } else {
1795                            return false;
1796                        }
1797                    }
1798                    None => return false,
1799                }
1800            };
1801
1802            // Mark as deleted in version index
1803            index.mark_deleted(epoch);
1804
1805            drop(versions); // Release lock
1806
1807            // Mark as deleted in adjacency (soft delete)
1808            self.forward_adj.mark_deleted(src, id);
1809            if let Some(ref backward) = self.backward_adj {
1810                backward.mark_deleted(dst, id);
1811            }
1812
1813            // Remove properties
1814            self.edge_properties.remove_all(id);
1815
1816            true
1817        } else {
1818            false
1819        }
1820    }
1821
1822    /// Returns the number of edges (non-deleted at current epoch).
1823    #[must_use]
1824    #[cfg(not(feature = "tiered-storage"))]
1825    pub fn edge_count(&self) -> usize {
1826        let epoch = self.current_epoch();
1827        self.edges
1828            .read()
1829            .values()
1830            .filter_map(|chain| chain.visible_at(epoch))
1831            .filter(|r| !r.is_deleted())
1832            .count()
1833    }
1834
1835    /// Returns the number of edges (non-deleted at current epoch).
1836    /// (Tiered storage version)
1837    #[must_use]
1838    #[cfg(feature = "tiered-storage")]
1839    pub fn edge_count(&self) -> usize {
1840        let epoch = self.current_epoch();
1841        let versions = self.edge_versions.read();
1842        versions
1843            .iter()
1844            .filter(|(_, index)| {
1845                index.visible_at(epoch).map_or(false, |vref| {
1846                    self.read_edge_record(&vref)
1847                        .map_or(false, |r| !r.is_deleted())
1848                })
1849            })
1850            .count()
1851    }
1852
1853    /// Discards all uncommitted versions created by a transaction.
1854    ///
1855    /// This is called during transaction rollback to clean up uncommitted changes.
1856    /// The method removes version chain entries created by the specified transaction.
1857    #[cfg(not(feature = "tiered-storage"))]
1858    pub fn discard_uncommitted_versions(&self, tx_id: TxId) {
1859        // Remove uncommitted node versions
1860        {
1861            let mut nodes = self.nodes.write();
1862            for chain in nodes.values_mut() {
1863                chain.remove_versions_by(tx_id);
1864            }
1865            // Remove completely empty chains (no versions left)
1866            nodes.retain(|_, chain| !chain.is_empty());
1867        }
1868
1869        // Remove uncommitted edge versions
1870        {
1871            let mut edges = self.edges.write();
1872            for chain in edges.values_mut() {
1873                chain.remove_versions_by(tx_id);
1874            }
1875            // Remove completely empty chains (no versions left)
1876            edges.retain(|_, chain| !chain.is_empty());
1877        }
1878    }
1879
1880    /// Discards all uncommitted versions created by a transaction.
1881    /// (Tiered storage version)
1882    #[cfg(feature = "tiered-storage")]
1883    pub fn discard_uncommitted_versions(&self, tx_id: TxId) {
1884        // Remove uncommitted node versions
1885        {
1886            let mut versions = self.node_versions.write();
1887            for index in versions.values_mut() {
1888                index.remove_versions_by(tx_id);
1889            }
1890            // Remove completely empty indexes (no versions left)
1891            versions.retain(|_, index| !index.is_empty());
1892        }
1893
1894        // Remove uncommitted edge versions
1895        {
1896            let mut versions = self.edge_versions.write();
1897            for index in versions.values_mut() {
1898                index.remove_versions_by(tx_id);
1899            }
1900            // Remove completely empty indexes (no versions left)
1901            versions.retain(|_, index| !index.is_empty());
1902        }
1903    }
1904
1905    /// Freezes an epoch from hot (arena) storage to cold (compressed) storage.
1906    ///
1907    /// This is called by the transaction manager when an epoch becomes eligible
1908    /// for freezing (no active transactions can see it). The freeze process:
1909    ///
1910    /// 1. Collects all hot version refs for the epoch
1911    /// 2. Reads the corresponding records from arena
1912    /// 3. Compresses them into a `CompressedEpochBlock`
1913    /// 4. Updates `VersionIndex` entries to point to cold storage
1914    /// 5. The arena can be deallocated after all epochs in it are frozen
1915    ///
1916    /// # Arguments
1917    ///
1918    /// * `epoch` - The epoch to freeze
1919    ///
1920    /// # Returns
1921    ///
1922    /// The number of records frozen (nodes + edges).
1923    #[cfg(feature = "tiered-storage")]
1924    #[allow(unsafe_code)]
1925    pub fn freeze_epoch(&self, epoch: EpochId) -> usize {
1926        // Collect node records to freeze
1927        let mut node_records: Vec<(u64, NodeRecord)> = Vec::new();
1928        let mut node_hot_refs: Vec<(NodeId, HotVersionRef)> = Vec::new();
1929
1930        {
1931            let versions = self.node_versions.read();
1932            for (node_id, index) in versions.iter() {
1933                for hot_ref in index.hot_refs_for_epoch(epoch) {
1934                    let arena = self.arena_allocator.arena(hot_ref.epoch);
1935                    // SAFETY: The offset was returned by alloc_value_with_offset for a NodeRecord
1936                    let record: &NodeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
1937                    node_records.push((node_id.as_u64(), record.clone()));
1938                    node_hot_refs.push((*node_id, *hot_ref));
1939                }
1940            }
1941        }
1942
1943        // Collect edge records to freeze
1944        let mut edge_records: Vec<(u64, EdgeRecord)> = Vec::new();
1945        let mut edge_hot_refs: Vec<(EdgeId, HotVersionRef)> = Vec::new();
1946
1947        {
1948            let versions = self.edge_versions.read();
1949            for (edge_id, index) in versions.iter() {
1950                for hot_ref in index.hot_refs_for_epoch(epoch) {
1951                    let arena = self.arena_allocator.arena(hot_ref.epoch);
1952                    // SAFETY: The offset was returned by alloc_value_with_offset for an EdgeRecord
1953                    let record: &EdgeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
1954                    edge_records.push((edge_id.as_u64(), record.clone()));
1955                    edge_hot_refs.push((*edge_id, *hot_ref));
1956                }
1957            }
1958        }
1959
1960        let total_frozen = node_records.len() + edge_records.len();
1961
1962        if total_frozen == 0 {
1963            return 0;
1964        }
1965
1966        // Freeze to compressed storage
1967        let (node_entries, edge_entries) =
1968            self.epoch_store
1969                .freeze_epoch(epoch, node_records, edge_records);
1970
1971        // Build lookup maps for index entries
1972        let node_entry_map: FxHashMap<u64, _> = node_entries
1973            .iter()
1974            .map(|e| (e.entity_id, (e.offset, e.length)))
1975            .collect();
1976        let edge_entry_map: FxHashMap<u64, _> = edge_entries
1977            .iter()
1978            .map(|e| (e.entity_id, (e.offset, e.length)))
1979            .collect();
1980
1981        // Update version indexes to use cold refs
1982        {
1983            let mut versions = self.node_versions.write();
1984            for (node_id, hot_ref) in &node_hot_refs {
1985                if let Some(index) = versions.get_mut(node_id) {
1986                    if let Some(&(offset, length)) = node_entry_map.get(&node_id.as_u64()) {
1987                        let cold_ref = ColdVersionRef {
1988                            epoch,
1989                            block_offset: offset,
1990                            length,
1991                            created_by: hot_ref.created_by,
1992                            deleted_epoch: hot_ref.deleted_epoch,
1993                        };
1994                        index.freeze_epoch(epoch, std::iter::once(cold_ref));
1995                    }
1996                }
1997            }
1998        }
1999
2000        {
2001            let mut versions = self.edge_versions.write();
2002            for (edge_id, hot_ref) in &edge_hot_refs {
2003                if let Some(index) = versions.get_mut(edge_id) {
2004                    if let Some(&(offset, length)) = edge_entry_map.get(&edge_id.as_u64()) {
2005                        let cold_ref = ColdVersionRef {
2006                            epoch,
2007                            block_offset: offset,
2008                            length,
2009                            created_by: hot_ref.created_by,
2010                            deleted_epoch: hot_ref.deleted_epoch,
2011                        };
2012                        index.freeze_epoch(epoch, std::iter::once(cold_ref));
2013                    }
2014                }
2015            }
2016        }
2017
2018        total_frozen
2019    }
2020
2021    /// Returns the epoch store for cold storage statistics.
2022    #[cfg(feature = "tiered-storage")]
2023    #[must_use]
2024    pub fn epoch_store(&self) -> &EpochStore {
2025        &self.epoch_store
2026    }
2027
2028    /// Returns the number of distinct labels in the store.
2029    #[must_use]
2030    pub fn label_count(&self) -> usize {
2031        self.id_to_label.read().len()
2032    }
2033
2034    /// Returns the number of distinct property keys in the store.
2035    ///
2036    /// This counts unique property keys across both nodes and edges.
2037    #[must_use]
2038    pub fn property_key_count(&self) -> usize {
2039        let node_keys = self.node_properties.column_count();
2040        let edge_keys = self.edge_properties.column_count();
2041        // Note: This may count some keys twice if the same key is used
2042        // for both nodes and edges. A more precise count would require
2043        // tracking unique keys across both storages.
2044        node_keys + edge_keys
2045    }
2046
2047    /// Returns the number of distinct edge types in the store.
2048    #[must_use]
2049    pub fn edge_type_count(&self) -> usize {
2050        self.id_to_edge_type.read().len()
2051    }
2052
2053    // === Traversal ===
2054
2055    /// Iterates over neighbors of a node in the specified direction.
2056    ///
2057    /// This is the fast path for graph traversal - goes straight to the
2058    /// adjacency index without loading full node data.
2059    pub fn neighbors(
2060        &self,
2061        node: NodeId,
2062        direction: Direction,
2063    ) -> impl Iterator<Item = NodeId> + '_ {
2064        let forward: Box<dyn Iterator<Item = NodeId>> = match direction {
2065            Direction::Outgoing | Direction::Both => {
2066                Box::new(self.forward_adj.neighbors(node).into_iter())
2067            }
2068            Direction::Incoming => Box::new(std::iter::empty()),
2069        };
2070
2071        let backward: Box<dyn Iterator<Item = NodeId>> = match direction {
2072            Direction::Incoming | Direction::Both => {
2073                if let Some(ref adj) = self.backward_adj {
2074                    Box::new(adj.neighbors(node).into_iter())
2075                } else {
2076                    Box::new(std::iter::empty())
2077                }
2078            }
2079            Direction::Outgoing => Box::new(std::iter::empty()),
2080        };
2081
2082        forward.chain(backward)
2083    }
2084
2085    /// Returns edges from a node with their targets.
2086    ///
2087    /// Returns an iterator of (target_node, edge_id) pairs.
2088    pub fn edges_from(
2089        &self,
2090        node: NodeId,
2091        direction: Direction,
2092    ) -> impl Iterator<Item = (NodeId, EdgeId)> + '_ {
2093        let forward: Box<dyn Iterator<Item = (NodeId, EdgeId)>> = match direction {
2094            Direction::Outgoing | Direction::Both => {
2095                Box::new(self.forward_adj.edges_from(node).into_iter())
2096            }
2097            Direction::Incoming => Box::new(std::iter::empty()),
2098        };
2099
2100        let backward: Box<dyn Iterator<Item = (NodeId, EdgeId)>> = match direction {
2101            Direction::Incoming | Direction::Both => {
2102                if let Some(ref adj) = self.backward_adj {
2103                    Box::new(adj.edges_from(node).into_iter())
2104                } else {
2105                    Box::new(std::iter::empty())
2106                }
2107            }
2108            Direction::Outgoing => Box::new(std::iter::empty()),
2109        };
2110
2111        forward.chain(backward)
2112    }
2113
2114    /// Returns edges to a node (where the node is the destination).
2115    ///
2116    /// Returns (source_node, edge_id) pairs for all edges pointing TO this node.
2117    /// Uses the backward adjacency index for O(degree) lookup.
2118    ///
2119    /// # Example
2120    ///
2121    /// ```ignore
2122    /// // For edges: A->B, C->B
2123    /// let incoming = store.edges_to(B);
2124    /// // Returns: [(A, edge1), (C, edge2)]
2125    /// ```
2126    pub fn edges_to(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
2127        if let Some(ref backward) = self.backward_adj {
2128            backward.edges_from(node)
2129        } else {
2130            // Fallback: scan all edges (slow but correct)
2131            self.all_edges()
2132                .filter_map(|edge| {
2133                    if edge.dst == node {
2134                        Some((edge.src, edge.id))
2135                    } else {
2136                        None
2137                    }
2138                })
2139                .collect()
2140        }
2141    }
2142
2143    /// Returns the out-degree of a node (number of outgoing edges).
2144    ///
2145    /// Uses the forward adjacency index for O(1) lookup.
2146    #[must_use]
2147    pub fn out_degree(&self, node: NodeId) -> usize {
2148        self.forward_adj.out_degree(node)
2149    }
2150
2151    /// Returns the in-degree of a node (number of incoming edges).
2152    ///
2153    /// Uses the backward adjacency index for O(1) lookup if available,
2154    /// otherwise falls back to scanning edges.
2155    #[must_use]
2156    pub fn in_degree(&self, node: NodeId) -> usize {
2157        if let Some(ref backward) = self.backward_adj {
2158            backward.in_degree(node)
2159        } else {
2160            // Fallback: count edges (slow)
2161            self.all_edges().filter(|edge| edge.dst == node).count()
2162        }
2163    }
2164
2165    /// Gets the type of an edge by ID.
2166    #[must_use]
2167    #[cfg(not(feature = "tiered-storage"))]
2168    pub fn edge_type(&self, id: EdgeId) -> Option<Arc<str>> {
2169        let edges = self.edges.read();
2170        let chain = edges.get(&id)?;
2171        let epoch = self.current_epoch();
2172        let record = chain.visible_at(epoch)?;
2173        let id_to_type = self.id_to_edge_type.read();
2174        id_to_type.get(record.type_id as usize).cloned()
2175    }
2176
2177    /// Gets the type of an edge by ID.
2178    /// (Tiered storage version)
2179    #[must_use]
2180    #[cfg(feature = "tiered-storage")]
2181    pub fn edge_type(&self, id: EdgeId) -> Option<Arc<str>> {
2182        let versions = self.edge_versions.read();
2183        let index = versions.get(&id)?;
2184        let epoch = self.current_epoch();
2185        let vref = index.visible_at(epoch)?;
2186        let record = self.read_edge_record(&vref)?;
2187        let id_to_type = self.id_to_edge_type.read();
2188        id_to_type.get(record.type_id as usize).cloned()
2189    }
2190
2191    /// Returns all nodes with a specific label.
2192    ///
2193    /// Uses the label index for O(1) lookup per label. Returns a snapshot -
2194    /// concurrent modifications won't affect the returned vector. Results are
2195    /// sorted by NodeId for deterministic iteration order.
2196    pub fn nodes_by_label(&self, label: &str) -> Vec<NodeId> {
2197        let label_to_id = self.label_to_id.read();
2198        if let Some(&label_id) = label_to_id.get(label) {
2199            let index = self.label_index.read();
2200            if let Some(set) = index.get(label_id as usize) {
2201                let mut ids: Vec<NodeId> = set.keys().copied().collect();
2202                ids.sort_unstable();
2203                return ids;
2204            }
2205        }
2206        Vec::new()
2207    }
2208
2209    // === Admin API: Iteration ===
2210
2211    /// Returns an iterator over all nodes in the database.
2212    ///
2213    /// This creates a snapshot of all visible nodes at the current epoch.
2214    /// Useful for dump/export operations.
2215    #[cfg(not(feature = "tiered-storage"))]
2216    pub fn all_nodes(&self) -> impl Iterator<Item = Node> + '_ {
2217        let epoch = self.current_epoch();
2218        let node_ids: Vec<NodeId> = self
2219            .nodes
2220            .read()
2221            .iter()
2222            .filter_map(|(id, chain)| {
2223                chain
2224                    .visible_at(epoch)
2225                    .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
2226            })
2227            .collect();
2228
2229        node_ids.into_iter().filter_map(move |id| self.get_node(id))
2230    }
2231
2232    /// Returns an iterator over all nodes in the database.
2233    /// (Tiered storage version)
2234    #[cfg(feature = "tiered-storage")]
2235    pub fn all_nodes(&self) -> impl Iterator<Item = Node> + '_ {
2236        let node_ids = self.node_ids();
2237        node_ids.into_iter().filter_map(move |id| self.get_node(id))
2238    }
2239
2240    /// Returns an iterator over all edges in the database.
2241    ///
2242    /// This creates a snapshot of all visible edges at the current epoch.
2243    /// Useful for dump/export operations.
2244    #[cfg(not(feature = "tiered-storage"))]
2245    pub fn all_edges(&self) -> impl Iterator<Item = Edge> + '_ {
2246        let epoch = self.current_epoch();
2247        let edge_ids: Vec<EdgeId> = self
2248            .edges
2249            .read()
2250            .iter()
2251            .filter_map(|(id, chain)| {
2252                chain
2253                    .visible_at(epoch)
2254                    .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
2255            })
2256            .collect();
2257
2258        edge_ids.into_iter().filter_map(move |id| self.get_edge(id))
2259    }
2260
2261    /// Returns an iterator over all edges in the database.
2262    /// (Tiered storage version)
2263    #[cfg(feature = "tiered-storage")]
2264    pub fn all_edges(&self) -> impl Iterator<Item = Edge> + '_ {
2265        let epoch = self.current_epoch();
2266        let versions = self.edge_versions.read();
2267        let edge_ids: Vec<EdgeId> = versions
2268            .iter()
2269            .filter_map(|(id, index)| {
2270                index.visible_at(epoch).and_then(|vref| {
2271                    self.read_edge_record(&vref)
2272                        .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
2273                })
2274            })
2275            .collect();
2276
2277        edge_ids.into_iter().filter_map(move |id| self.get_edge(id))
2278    }
2279
2280    /// Returns all label names in the database.
2281    pub fn all_labels(&self) -> Vec<String> {
2282        self.id_to_label
2283            .read()
2284            .iter()
2285            .map(|s| s.to_string())
2286            .collect()
2287    }
2288
2289    /// Returns all edge type names in the database.
2290    pub fn all_edge_types(&self) -> Vec<String> {
2291        self.id_to_edge_type
2292            .read()
2293            .iter()
2294            .map(|s| s.to_string())
2295            .collect()
2296    }
2297
2298    /// Returns all property keys used in the database.
2299    pub fn all_property_keys(&self) -> Vec<String> {
2300        let mut keys = std::collections::HashSet::new();
2301        for key in self.node_properties.keys() {
2302            keys.insert(key.to_string());
2303        }
2304        for key in self.edge_properties.keys() {
2305            keys.insert(key.to_string());
2306        }
2307        keys.into_iter().collect()
2308    }
2309
2310    /// Returns an iterator over nodes with a specific label.
2311    pub fn nodes_with_label<'a>(&'a self, label: &str) -> impl Iterator<Item = Node> + 'a {
2312        let node_ids = self.nodes_by_label(label);
2313        node_ids.into_iter().filter_map(move |id| self.get_node(id))
2314    }
2315
2316    /// Returns an iterator over edges with a specific type.
2317    #[cfg(not(feature = "tiered-storage"))]
2318    pub fn edges_with_type<'a>(&'a self, edge_type: &str) -> impl Iterator<Item = Edge> + 'a {
2319        let epoch = self.current_epoch();
2320        let type_to_id = self.edge_type_to_id.read();
2321
2322        if let Some(&type_id) = type_to_id.get(edge_type) {
2323            let edge_ids: Vec<EdgeId> = self
2324                .edges
2325                .read()
2326                .iter()
2327                .filter_map(|(id, chain)| {
2328                    chain.visible_at(epoch).and_then(|r| {
2329                        if !r.is_deleted() && r.type_id == type_id {
2330                            Some(*id)
2331                        } else {
2332                            None
2333                        }
2334                    })
2335                })
2336                .collect();
2337
2338            // Return a boxed iterator for the found edges
2339            Box::new(edge_ids.into_iter().filter_map(move |id| self.get_edge(id)))
2340                as Box<dyn Iterator<Item = Edge> + 'a>
2341        } else {
2342            // Return empty iterator
2343            Box::new(std::iter::empty()) as Box<dyn Iterator<Item = Edge> + 'a>
2344        }
2345    }
2346
2347    /// Returns an iterator over edges with a specific type.
2348    /// (Tiered storage version)
2349    #[cfg(feature = "tiered-storage")]
2350    pub fn edges_with_type<'a>(&'a self, edge_type: &str) -> impl Iterator<Item = Edge> + 'a {
2351        let epoch = self.current_epoch();
2352        let type_to_id = self.edge_type_to_id.read();
2353
2354        if let Some(&type_id) = type_to_id.get(edge_type) {
2355            let versions = self.edge_versions.read();
2356            let edge_ids: Vec<EdgeId> = versions
2357                .iter()
2358                .filter_map(|(id, index)| {
2359                    index.visible_at(epoch).and_then(|vref| {
2360                        self.read_edge_record(&vref).and_then(|r| {
2361                            if !r.is_deleted() && r.type_id == type_id {
2362                                Some(*id)
2363                            } else {
2364                                None
2365                            }
2366                        })
2367                    })
2368                })
2369                .collect();
2370
2371            Box::new(edge_ids.into_iter().filter_map(move |id| self.get_edge(id)))
2372                as Box<dyn Iterator<Item = Edge> + 'a>
2373        } else {
2374            Box::new(std::iter::empty()) as Box<dyn Iterator<Item = Edge> + 'a>
2375        }
2376    }
2377
2378    // === Zone Map Support ===
2379
2380    /// Checks if a node property predicate might match any nodes.
2381    ///
2382    /// Uses zone maps for early filtering. Returns `true` if there might be
2383    /// matching nodes, `false` if there definitely aren't.
2384    #[must_use]
2385    pub fn node_property_might_match(
2386        &self,
2387        property: &PropertyKey,
2388        op: CompareOp,
2389        value: &Value,
2390    ) -> bool {
2391        self.node_properties.might_match(property, op, value)
2392    }
2393
2394    /// Checks if an edge property predicate might match any edges.
2395    #[must_use]
2396    pub fn edge_property_might_match(
2397        &self,
2398        property: &PropertyKey,
2399        op: CompareOp,
2400        value: &Value,
2401    ) -> bool {
2402        self.edge_properties.might_match(property, op, value)
2403    }
2404
2405    /// Gets the zone map for a node property.
2406    #[must_use]
2407    pub fn node_property_zone_map(&self, property: &PropertyKey) -> Option<ZoneMapEntry> {
2408        self.node_properties.zone_map(property)
2409    }
2410
2411    /// Gets the zone map for an edge property.
2412    #[must_use]
2413    pub fn edge_property_zone_map(&self, property: &PropertyKey) -> Option<ZoneMapEntry> {
2414        self.edge_properties.zone_map(property)
2415    }
2416
2417    /// Rebuilds zone maps for all properties.
2418    pub fn rebuild_zone_maps(&self) {
2419        self.node_properties.rebuild_zone_maps();
2420        self.edge_properties.rebuild_zone_maps();
2421    }
2422
2423    // === Statistics ===
2424
2425    /// Returns the current statistics.
2426    #[must_use]
2427    pub fn statistics(&self) -> Statistics {
2428        self.statistics.read().clone()
2429    }
2430
2431    /// Recomputes statistics from current data.
2432    ///
2433    /// Scans all labels and edge types to build cardinality estimates for the
2434    /// query optimizer. Call this periodically or after bulk data loads.
2435    #[cfg(not(feature = "tiered-storage"))]
2436    pub fn compute_statistics(&self) {
2437        let mut stats = Statistics::new();
2438
2439        // Compute total counts
2440        stats.total_nodes = self.node_count() as u64;
2441        stats.total_edges = self.edge_count() as u64;
2442
2443        // Compute per-label statistics
2444        let id_to_label = self.id_to_label.read();
2445        let label_index = self.label_index.read();
2446
2447        for (label_id, label_name) in id_to_label.iter().enumerate() {
2448            let node_count = label_index
2449                .get(label_id)
2450                .map(|set| set.len() as u64)
2451                .unwrap_or(0);
2452
2453            if node_count > 0 {
2454                // Estimate average degree
2455                let avg_out_degree = if stats.total_nodes > 0 {
2456                    stats.total_edges as f64 / stats.total_nodes as f64
2457                } else {
2458                    0.0
2459                };
2460
2461                let label_stats =
2462                    LabelStatistics::new(node_count).with_degrees(avg_out_degree, avg_out_degree);
2463
2464                stats.update_label(label_name.as_ref(), label_stats);
2465            }
2466        }
2467
2468        // Compute per-edge-type statistics
2469        let id_to_edge_type = self.id_to_edge_type.read();
2470        let edges = self.edges.read();
2471        let epoch = self.current_epoch();
2472
2473        let mut edge_type_counts: FxHashMap<u32, u64> = FxHashMap::default();
2474        for chain in edges.values() {
2475            if let Some(record) = chain.visible_at(epoch) {
2476                if !record.is_deleted() {
2477                    *edge_type_counts.entry(record.type_id).or_default() += 1;
2478                }
2479            }
2480        }
2481
2482        for (type_id, count) in edge_type_counts {
2483            if let Some(type_name) = id_to_edge_type.get(type_id as usize) {
2484                let avg_degree = if stats.total_nodes > 0 {
2485                    count as f64 / stats.total_nodes as f64
2486                } else {
2487                    0.0
2488                };
2489
2490                let edge_stats = EdgeTypeStatistics::new(count, avg_degree, avg_degree);
2491                stats.update_edge_type(type_name.as_ref(), edge_stats);
2492            }
2493        }
2494
2495        *self.statistics.write() = stats;
2496    }
2497
2498    /// Recomputes statistics from current data.
2499    /// (Tiered storage version)
2500    #[cfg(feature = "tiered-storage")]
2501    pub fn compute_statistics(&self) {
2502        let mut stats = Statistics::new();
2503
2504        // Compute total counts
2505        stats.total_nodes = self.node_count() as u64;
2506        stats.total_edges = self.edge_count() as u64;
2507
2508        // Compute per-label statistics
2509        let id_to_label = self.id_to_label.read();
2510        let label_index = self.label_index.read();
2511
2512        for (label_id, label_name) in id_to_label.iter().enumerate() {
2513            let node_count = label_index
2514                .get(label_id)
2515                .map(|set| set.len() as u64)
2516                .unwrap_or(0);
2517
2518            if node_count > 0 {
2519                let avg_out_degree = if stats.total_nodes > 0 {
2520                    stats.total_edges as f64 / stats.total_nodes as f64
2521                } else {
2522                    0.0
2523                };
2524
2525                let label_stats =
2526                    LabelStatistics::new(node_count).with_degrees(avg_out_degree, avg_out_degree);
2527
2528                stats.update_label(label_name.as_ref(), label_stats);
2529            }
2530        }
2531
2532        // Compute per-edge-type statistics
2533        let id_to_edge_type = self.id_to_edge_type.read();
2534        let versions = self.edge_versions.read();
2535        let epoch = self.current_epoch();
2536
2537        let mut edge_type_counts: FxHashMap<u32, u64> = FxHashMap::default();
2538        for index in versions.values() {
2539            if let Some(vref) = index.visible_at(epoch) {
2540                if let Some(record) = self.read_edge_record(&vref) {
2541                    if !record.is_deleted() {
2542                        *edge_type_counts.entry(record.type_id).or_default() += 1;
2543                    }
2544                }
2545            }
2546        }
2547
2548        for (type_id, count) in edge_type_counts {
2549            if let Some(type_name) = id_to_edge_type.get(type_id as usize) {
2550                let avg_degree = if stats.total_nodes > 0 {
2551                    count as f64 / stats.total_nodes as f64
2552                } else {
2553                    0.0
2554                };
2555
2556                let edge_stats = EdgeTypeStatistics::new(count, avg_degree, avg_degree);
2557                stats.update_edge_type(type_name.as_ref(), edge_stats);
2558            }
2559        }
2560
2561        *self.statistics.write() = stats;
2562    }
2563
2564    /// Estimates cardinality for a label scan.
2565    #[must_use]
2566    pub fn estimate_label_cardinality(&self, label: &str) -> f64 {
2567        self.statistics.read().estimate_label_cardinality(label)
2568    }
2569
2570    /// Estimates average degree for an edge type.
2571    #[must_use]
2572    pub fn estimate_avg_degree(&self, edge_type: &str, outgoing: bool) -> f64 {
2573        self.statistics
2574            .read()
2575            .estimate_avg_degree(edge_type, outgoing)
2576    }
2577
2578    // === Internal Helpers ===
2579
2580    fn get_or_create_label_id(&self, label: &str) -> u32 {
2581        {
2582            let label_to_id = self.label_to_id.read();
2583            if let Some(&id) = label_to_id.get(label) {
2584                return id;
2585            }
2586        }
2587
2588        let mut label_to_id = self.label_to_id.write();
2589        let mut id_to_label = self.id_to_label.write();
2590
2591        // Double-check after acquiring write lock
2592        if let Some(&id) = label_to_id.get(label) {
2593            return id;
2594        }
2595
2596        let id = id_to_label.len() as u32;
2597
2598        let label: Arc<str> = label.into();
2599        label_to_id.insert(label.clone(), id);
2600        id_to_label.push(label);
2601
2602        id
2603    }
2604
2605    fn get_or_create_edge_type_id(&self, edge_type: &str) -> u32 {
2606        {
2607            let type_to_id = self.edge_type_to_id.read();
2608            if let Some(&id) = type_to_id.get(edge_type) {
2609                return id;
2610            }
2611        }
2612
2613        let mut type_to_id = self.edge_type_to_id.write();
2614        let mut id_to_type = self.id_to_edge_type.write();
2615
2616        // Double-check
2617        if let Some(&id) = type_to_id.get(edge_type) {
2618            return id;
2619        }
2620
2621        let id = id_to_type.len() as u32;
2622        let edge_type: Arc<str> = edge_type.into();
2623        type_to_id.insert(edge_type.clone(), id);
2624        id_to_type.push(edge_type);
2625
2626        id
2627    }
2628
2629    // === Recovery Support ===
2630
2631    /// Creates a node with a specific ID during recovery.
2632    ///
2633    /// This is used for WAL recovery to restore nodes with their original IDs.
2634    /// The caller must ensure IDs don't conflict with existing nodes.
2635    #[cfg(not(feature = "tiered-storage"))]
2636    pub fn create_node_with_id(&self, id: NodeId, labels: &[&str]) {
2637        let epoch = self.current_epoch();
2638        let mut record = NodeRecord::new(id, epoch);
2639        record.set_label_count(labels.len() as u16);
2640
2641        // Store labels in node_labels map and label_index
2642        let mut node_label_set = FxHashSet::default();
2643        for label in labels {
2644            let label_id = self.get_or_create_label_id(*label);
2645            node_label_set.insert(label_id);
2646
2647            // Update label index
2648            let mut index = self.label_index.write();
2649            while index.len() <= label_id as usize {
2650                index.push(FxHashMap::default());
2651            }
2652            index[label_id as usize].insert(id, ());
2653        }
2654
2655        // Store node's labels
2656        self.node_labels.write().insert(id, node_label_set);
2657
2658        // Create version chain with initial version (using SYSTEM tx for recovery)
2659        let chain = VersionChain::with_initial(record, epoch, TxId::SYSTEM);
2660        self.nodes.write().insert(id, chain);
2661
2662        // Update next_node_id if necessary to avoid future collisions
2663        let id_val = id.as_u64();
2664        let _ = self
2665            .next_node_id
2666            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
2667                if id_val >= current {
2668                    Some(id_val + 1)
2669                } else {
2670                    None
2671                }
2672            });
2673    }
2674
2675    /// Creates a node with a specific ID during recovery.
2676    /// (Tiered storage version)
2677    #[cfg(feature = "tiered-storage")]
2678    pub fn create_node_with_id(&self, id: NodeId, labels: &[&str]) {
2679        let epoch = self.current_epoch();
2680        let mut record = NodeRecord::new(id, epoch);
2681        record.set_label_count(labels.len() as u16);
2682
2683        // Store labels in node_labels map and label_index
2684        let mut node_label_set = FxHashSet::default();
2685        for label in labels {
2686            let label_id = self.get_or_create_label_id(*label);
2687            node_label_set.insert(label_id);
2688
2689            // Update label index
2690            let mut index = self.label_index.write();
2691            while index.len() <= label_id as usize {
2692                index.push(FxHashMap::default());
2693            }
2694            index[label_id as usize].insert(id, ());
2695        }
2696
2697        // Store node's labels
2698        self.node_labels.write().insert(id, node_label_set);
2699
2700        // Allocate record in arena and get offset (create epoch if needed)
2701        let arena = self.arena_allocator.arena_or_create(epoch);
2702        let (offset, _stored) = arena.alloc_value_with_offset(record);
2703
2704        // Create HotVersionRef (using SYSTEM tx for recovery)
2705        let hot_ref = HotVersionRef::new(epoch, offset, TxId::SYSTEM);
2706        let mut versions = self.node_versions.write();
2707        versions.insert(id, VersionIndex::with_initial(hot_ref));
2708
2709        // Update next_node_id if necessary to avoid future collisions
2710        let id_val = id.as_u64();
2711        let _ = self
2712            .next_node_id
2713            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
2714                if id_val >= current {
2715                    Some(id_val + 1)
2716                } else {
2717                    None
2718                }
2719            });
2720    }
2721
2722    /// Creates an edge with a specific ID during recovery.
2723    ///
2724    /// This is used for WAL recovery to restore edges with their original IDs.
2725    #[cfg(not(feature = "tiered-storage"))]
2726    pub fn create_edge_with_id(&self, id: EdgeId, src: NodeId, dst: NodeId, edge_type: &str) {
2727        let epoch = self.current_epoch();
2728        let type_id = self.get_or_create_edge_type_id(edge_type);
2729
2730        let record = EdgeRecord::new(id, src, dst, type_id, epoch);
2731        let chain = VersionChain::with_initial(record, epoch, TxId::SYSTEM);
2732        self.edges.write().insert(id, chain);
2733
2734        // Update adjacency
2735        self.forward_adj.add_edge(src, dst, id);
2736        if let Some(ref backward) = self.backward_adj {
2737            backward.add_edge(dst, src, id);
2738        }
2739
2740        // Update next_edge_id if necessary
2741        let id_val = id.as_u64();
2742        let _ = self
2743            .next_edge_id
2744            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
2745                if id_val >= current {
2746                    Some(id_val + 1)
2747                } else {
2748                    None
2749                }
2750            });
2751    }
2752
2753    /// Creates an edge with a specific ID during recovery.
2754    /// (Tiered storage version)
2755    #[cfg(feature = "tiered-storage")]
2756    pub fn create_edge_with_id(&self, id: EdgeId, src: NodeId, dst: NodeId, edge_type: &str) {
2757        let epoch = self.current_epoch();
2758        let type_id = self.get_or_create_edge_type_id(edge_type);
2759
2760        let record = EdgeRecord::new(id, src, dst, type_id, epoch);
2761
2762        // Allocate record in arena and get offset (create epoch if needed)
2763        let arena = self.arena_allocator.arena_or_create(epoch);
2764        let (offset, _stored) = arena.alloc_value_with_offset(record);
2765
2766        // Create HotVersionRef (using SYSTEM tx for recovery)
2767        let hot_ref = HotVersionRef::new(epoch, offset, TxId::SYSTEM);
2768        let mut versions = self.edge_versions.write();
2769        versions.insert(id, VersionIndex::with_initial(hot_ref));
2770
2771        // Update adjacency
2772        self.forward_adj.add_edge(src, dst, id);
2773        if let Some(ref backward) = self.backward_adj {
2774            backward.add_edge(dst, src, id);
2775        }
2776
2777        // Update next_edge_id if necessary
2778        let id_val = id.as_u64();
2779        let _ = self
2780            .next_edge_id
2781            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
2782                if id_val >= current {
2783                    Some(id_val + 1)
2784                } else {
2785                    None
2786                }
2787            });
2788    }
2789
2790    /// Sets the current epoch during recovery.
2791    pub fn set_epoch(&self, epoch: EpochId) {
2792        self.current_epoch.store(epoch.as_u64(), Ordering::SeqCst);
2793    }
2794}
2795
2796impl Default for LpgStore {
2797    fn default() -> Self {
2798        Self::new()
2799    }
2800}
2801
2802#[cfg(test)]
2803mod tests {
2804    use super::*;
2805
2806    #[test]
2807    fn test_create_node() {
2808        let store = LpgStore::new();
2809
2810        let id = store.create_node(&["Person"]);
2811        assert!(id.is_valid());
2812
2813        let node = store.get_node(id).unwrap();
2814        assert!(node.has_label("Person"));
2815        assert!(!node.has_label("Animal"));
2816    }
2817
2818    #[test]
2819    fn test_create_node_with_props() {
2820        let store = LpgStore::new();
2821
2822        let id = store.create_node_with_props(
2823            &["Person"],
2824            [("name", Value::from("Alice")), ("age", Value::from(30i64))],
2825        );
2826
2827        let node = store.get_node(id).unwrap();
2828        assert_eq!(
2829            node.get_property("name").and_then(|v| v.as_str()),
2830            Some("Alice")
2831        );
2832        assert_eq!(
2833            node.get_property("age").and_then(|v| v.as_int64()),
2834            Some(30)
2835        );
2836    }
2837
2838    #[test]
2839    fn test_delete_node() {
2840        let store = LpgStore::new();
2841
2842        let id = store.create_node(&["Person"]);
2843        assert_eq!(store.node_count(), 1);
2844
2845        assert!(store.delete_node(id));
2846        assert_eq!(store.node_count(), 0);
2847        assert!(store.get_node(id).is_none());
2848
2849        // Double delete should return false
2850        assert!(!store.delete_node(id));
2851    }
2852
2853    #[test]
2854    fn test_create_edge() {
2855        let store = LpgStore::new();
2856
2857        let alice = store.create_node(&["Person"]);
2858        let bob = store.create_node(&["Person"]);
2859
2860        let edge_id = store.create_edge(alice, bob, "KNOWS");
2861        assert!(edge_id.is_valid());
2862
2863        let edge = store.get_edge(edge_id).unwrap();
2864        assert_eq!(edge.src, alice);
2865        assert_eq!(edge.dst, bob);
2866        assert_eq!(edge.edge_type.as_ref(), "KNOWS");
2867    }
2868
2869    #[test]
2870    fn test_neighbors() {
2871        let store = LpgStore::new();
2872
2873        let a = store.create_node(&["Person"]);
2874        let b = store.create_node(&["Person"]);
2875        let c = store.create_node(&["Person"]);
2876
2877        store.create_edge(a, b, "KNOWS");
2878        store.create_edge(a, c, "KNOWS");
2879
2880        let outgoing: Vec<_> = store.neighbors(a, Direction::Outgoing).collect();
2881        assert_eq!(outgoing.len(), 2);
2882        assert!(outgoing.contains(&b));
2883        assert!(outgoing.contains(&c));
2884
2885        let incoming: Vec<_> = store.neighbors(b, Direction::Incoming).collect();
2886        assert_eq!(incoming.len(), 1);
2887        assert!(incoming.contains(&a));
2888    }
2889
2890    #[test]
2891    fn test_nodes_by_label() {
2892        let store = LpgStore::new();
2893
2894        let p1 = store.create_node(&["Person"]);
2895        let p2 = store.create_node(&["Person"]);
2896        let _a = store.create_node(&["Animal"]);
2897
2898        let persons = store.nodes_by_label("Person");
2899        assert_eq!(persons.len(), 2);
2900        assert!(persons.contains(&p1));
2901        assert!(persons.contains(&p2));
2902
2903        let animals = store.nodes_by_label("Animal");
2904        assert_eq!(animals.len(), 1);
2905    }
2906
2907    #[test]
2908    fn test_delete_edge() {
2909        let store = LpgStore::new();
2910
2911        let a = store.create_node(&["Person"]);
2912        let b = store.create_node(&["Person"]);
2913        let edge_id = store.create_edge(a, b, "KNOWS");
2914
2915        assert_eq!(store.edge_count(), 1);
2916
2917        assert!(store.delete_edge(edge_id));
2918        assert_eq!(store.edge_count(), 0);
2919        assert!(store.get_edge(edge_id).is_none());
2920    }
2921
2922    // === New tests for improved coverage ===
2923
2924    #[test]
2925    fn test_lpg_store_config() {
2926        // Test with_config
2927        let config = LpgStoreConfig {
2928            backward_edges: false,
2929            initial_node_capacity: 100,
2930            initial_edge_capacity: 200,
2931        };
2932        let store = LpgStore::with_config(config);
2933
2934        // Store should work but without backward adjacency
2935        let a = store.create_node(&["Person"]);
2936        let b = store.create_node(&["Person"]);
2937        store.create_edge(a, b, "KNOWS");
2938
2939        // Outgoing should work
2940        let outgoing: Vec<_> = store.neighbors(a, Direction::Outgoing).collect();
2941        assert_eq!(outgoing.len(), 1);
2942
2943        // Incoming should be empty (no backward adjacency)
2944        let incoming: Vec<_> = store.neighbors(b, Direction::Incoming).collect();
2945        assert_eq!(incoming.len(), 0);
2946    }
2947
2948    #[test]
2949    fn test_epoch_management() {
2950        let store = LpgStore::new();
2951
2952        let epoch0 = store.current_epoch();
2953        assert_eq!(epoch0.as_u64(), 0);
2954
2955        let epoch1 = store.new_epoch();
2956        assert_eq!(epoch1.as_u64(), 1);
2957
2958        let current = store.current_epoch();
2959        assert_eq!(current.as_u64(), 1);
2960    }
2961
2962    #[test]
2963    fn test_node_properties() {
2964        let store = LpgStore::new();
2965        let id = store.create_node(&["Person"]);
2966
2967        // Set and get property
2968        store.set_node_property(id, "name", Value::from("Alice"));
2969        let name = store.get_node_property(id, &"name".into());
2970        assert!(matches!(name, Some(Value::String(s)) if s.as_ref() == "Alice"));
2971
2972        // Update property
2973        store.set_node_property(id, "name", Value::from("Bob"));
2974        let name = store.get_node_property(id, &"name".into());
2975        assert!(matches!(name, Some(Value::String(s)) if s.as_ref() == "Bob"));
2976
2977        // Remove property
2978        let old = store.remove_node_property(id, "name");
2979        assert!(matches!(old, Some(Value::String(s)) if s.as_ref() == "Bob"));
2980
2981        // Property should be gone
2982        let name = store.get_node_property(id, &"name".into());
2983        assert!(name.is_none());
2984
2985        // Remove non-existent property
2986        let none = store.remove_node_property(id, "nonexistent");
2987        assert!(none.is_none());
2988    }
2989
2990    #[test]
2991    fn test_edge_properties() {
2992        let store = LpgStore::new();
2993        let a = store.create_node(&["Person"]);
2994        let b = store.create_node(&["Person"]);
2995        let edge_id = store.create_edge(a, b, "KNOWS");
2996
2997        // Set and get property
2998        store.set_edge_property(edge_id, "since", Value::from(2020i64));
2999        let since = store.get_edge_property(edge_id, &"since".into());
3000        assert_eq!(since.and_then(|v| v.as_int64()), Some(2020));
3001
3002        // Remove property
3003        let old = store.remove_edge_property(edge_id, "since");
3004        assert_eq!(old.and_then(|v| v.as_int64()), Some(2020));
3005
3006        let since = store.get_edge_property(edge_id, &"since".into());
3007        assert!(since.is_none());
3008    }
3009
3010    #[test]
3011    fn test_add_remove_label() {
3012        let store = LpgStore::new();
3013        let id = store.create_node(&["Person"]);
3014
3015        // Add new label
3016        assert!(store.add_label(id, "Employee"));
3017
3018        let node = store.get_node(id).unwrap();
3019        assert!(node.has_label("Person"));
3020        assert!(node.has_label("Employee"));
3021
3022        // Adding same label again should fail
3023        assert!(!store.add_label(id, "Employee"));
3024
3025        // Remove label
3026        assert!(store.remove_label(id, "Employee"));
3027
3028        let node = store.get_node(id).unwrap();
3029        assert!(node.has_label("Person"));
3030        assert!(!node.has_label("Employee"));
3031
3032        // Removing non-existent label should fail
3033        assert!(!store.remove_label(id, "Employee"));
3034        assert!(!store.remove_label(id, "NonExistent"));
3035    }
3036
3037    #[test]
3038    fn test_add_label_to_nonexistent_node() {
3039        let store = LpgStore::new();
3040        let fake_id = NodeId::new(999);
3041        assert!(!store.add_label(fake_id, "Label"));
3042    }
3043
3044    #[test]
3045    fn test_remove_label_from_nonexistent_node() {
3046        let store = LpgStore::new();
3047        let fake_id = NodeId::new(999);
3048        assert!(!store.remove_label(fake_id, "Label"));
3049    }
3050
3051    #[test]
3052    fn test_node_ids() {
3053        let store = LpgStore::new();
3054
3055        let n1 = store.create_node(&["Person"]);
3056        let n2 = store.create_node(&["Person"]);
3057        let n3 = store.create_node(&["Person"]);
3058
3059        let ids = store.node_ids();
3060        assert_eq!(ids.len(), 3);
3061        assert!(ids.contains(&n1));
3062        assert!(ids.contains(&n2));
3063        assert!(ids.contains(&n3));
3064
3065        // Delete one
3066        store.delete_node(n2);
3067        let ids = store.node_ids();
3068        assert_eq!(ids.len(), 2);
3069        assert!(!ids.contains(&n2));
3070    }
3071
3072    #[test]
3073    fn test_delete_node_nonexistent() {
3074        let store = LpgStore::new();
3075        let fake_id = NodeId::new(999);
3076        assert!(!store.delete_node(fake_id));
3077    }
3078
3079    #[test]
3080    fn test_delete_edge_nonexistent() {
3081        let store = LpgStore::new();
3082        let fake_id = EdgeId::new(999);
3083        assert!(!store.delete_edge(fake_id));
3084    }
3085
3086    #[test]
3087    fn test_delete_edge_double() {
3088        let store = LpgStore::new();
3089        let a = store.create_node(&["Person"]);
3090        let b = store.create_node(&["Person"]);
3091        let edge_id = store.create_edge(a, b, "KNOWS");
3092
3093        assert!(store.delete_edge(edge_id));
3094        assert!(!store.delete_edge(edge_id)); // Double delete
3095    }
3096
3097    #[test]
3098    fn test_create_edge_with_props() {
3099        let store = LpgStore::new();
3100        let a = store.create_node(&["Person"]);
3101        let b = store.create_node(&["Person"]);
3102
3103        let edge_id = store.create_edge_with_props(
3104            a,
3105            b,
3106            "KNOWS",
3107            [
3108                ("since", Value::from(2020i64)),
3109                ("weight", Value::from(1.0)),
3110            ],
3111        );
3112
3113        let edge = store.get_edge(edge_id).unwrap();
3114        assert_eq!(
3115            edge.get_property("since").and_then(|v| v.as_int64()),
3116            Some(2020)
3117        );
3118        assert_eq!(
3119            edge.get_property("weight").and_then(|v| v.as_float64()),
3120            Some(1.0)
3121        );
3122    }
3123
3124    #[test]
3125    fn test_delete_node_edges() {
3126        let store = LpgStore::new();
3127
3128        let a = store.create_node(&["Person"]);
3129        let b = store.create_node(&["Person"]);
3130        let c = store.create_node(&["Person"]);
3131
3132        store.create_edge(a, b, "KNOWS"); // a -> b
3133        store.create_edge(c, a, "KNOWS"); // c -> a
3134
3135        assert_eq!(store.edge_count(), 2);
3136
3137        // Delete all edges connected to a
3138        store.delete_node_edges(a);
3139
3140        assert_eq!(store.edge_count(), 0);
3141    }
3142
3143    #[test]
3144    fn test_neighbors_both_directions() {
3145        let store = LpgStore::new();
3146
3147        let a = store.create_node(&["Person"]);
3148        let b = store.create_node(&["Person"]);
3149        let c = store.create_node(&["Person"]);
3150
3151        store.create_edge(a, b, "KNOWS"); // a -> b
3152        store.create_edge(c, a, "KNOWS"); // c -> a
3153
3154        // Direction::Both for node a
3155        let neighbors: Vec<_> = store.neighbors(a, Direction::Both).collect();
3156        assert_eq!(neighbors.len(), 2);
3157        assert!(neighbors.contains(&b)); // outgoing
3158        assert!(neighbors.contains(&c)); // incoming
3159    }
3160
3161    #[test]
3162    fn test_edges_from() {
3163        let store = LpgStore::new();
3164
3165        let a = store.create_node(&["Person"]);
3166        let b = store.create_node(&["Person"]);
3167        let c = store.create_node(&["Person"]);
3168
3169        let e1 = store.create_edge(a, b, "KNOWS");
3170        let e2 = store.create_edge(a, c, "KNOWS");
3171
3172        let edges: Vec<_> = store.edges_from(a, Direction::Outgoing).collect();
3173        assert_eq!(edges.len(), 2);
3174        assert!(edges.iter().any(|(_, e)| *e == e1));
3175        assert!(edges.iter().any(|(_, e)| *e == e2));
3176
3177        // Incoming edges to b
3178        let incoming: Vec<_> = store.edges_from(b, Direction::Incoming).collect();
3179        assert_eq!(incoming.len(), 1);
3180        assert_eq!(incoming[0].1, e1);
3181    }
3182
3183    #[test]
3184    fn test_edges_to() {
3185        let store = LpgStore::new();
3186
3187        let a = store.create_node(&["Person"]);
3188        let b = store.create_node(&["Person"]);
3189        let c = store.create_node(&["Person"]);
3190
3191        let e1 = store.create_edge(a, b, "KNOWS");
3192        let e2 = store.create_edge(c, b, "KNOWS");
3193
3194        // Edges pointing TO b
3195        let to_b = store.edges_to(b);
3196        assert_eq!(to_b.len(), 2);
3197        assert!(to_b.iter().any(|(src, e)| *src == a && *e == e1));
3198        assert!(to_b.iter().any(|(src, e)| *src == c && *e == e2));
3199    }
3200
3201    #[test]
3202    fn test_out_degree_in_degree() {
3203        let store = LpgStore::new();
3204
3205        let a = store.create_node(&["Person"]);
3206        let b = store.create_node(&["Person"]);
3207        let c = store.create_node(&["Person"]);
3208
3209        store.create_edge(a, b, "KNOWS");
3210        store.create_edge(a, c, "KNOWS");
3211        store.create_edge(c, b, "KNOWS");
3212
3213        assert_eq!(store.out_degree(a), 2);
3214        assert_eq!(store.out_degree(b), 0);
3215        assert_eq!(store.out_degree(c), 1);
3216
3217        assert_eq!(store.in_degree(a), 0);
3218        assert_eq!(store.in_degree(b), 2);
3219        assert_eq!(store.in_degree(c), 1);
3220    }
3221
3222    #[test]
3223    fn test_edge_type() {
3224        let store = LpgStore::new();
3225
3226        let a = store.create_node(&["Person"]);
3227        let b = store.create_node(&["Person"]);
3228        let edge_id = store.create_edge(a, b, "KNOWS");
3229
3230        let edge_type = store.edge_type(edge_id);
3231        assert_eq!(edge_type.as_deref(), Some("KNOWS"));
3232
3233        // Non-existent edge
3234        let fake_id = EdgeId::new(999);
3235        assert!(store.edge_type(fake_id).is_none());
3236    }
3237
3238    #[test]
3239    fn test_count_methods() {
3240        let store = LpgStore::new();
3241
3242        assert_eq!(store.label_count(), 0);
3243        assert_eq!(store.edge_type_count(), 0);
3244        assert_eq!(store.property_key_count(), 0);
3245
3246        let a = store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
3247        let b = store.create_node(&["Company"]);
3248        store.create_edge_with_props(a, b, "WORKS_AT", [("since", Value::from(2020i64))]);
3249
3250        assert_eq!(store.label_count(), 2); // Person, Company
3251        assert_eq!(store.edge_type_count(), 1); // WORKS_AT
3252        assert_eq!(store.property_key_count(), 2); // age, since
3253    }
3254
3255    #[test]
3256    fn test_all_nodes_and_edges() {
3257        let store = LpgStore::new();
3258
3259        let a = store.create_node(&["Person"]);
3260        let b = store.create_node(&["Person"]);
3261        store.create_edge(a, b, "KNOWS");
3262
3263        let nodes: Vec<_> = store.all_nodes().collect();
3264        assert_eq!(nodes.len(), 2);
3265
3266        let edges: Vec<_> = store.all_edges().collect();
3267        assert_eq!(edges.len(), 1);
3268    }
3269
3270    #[test]
3271    fn test_all_labels_and_edge_types() {
3272        let store = LpgStore::new();
3273
3274        store.create_node(&["Person"]);
3275        store.create_node(&["Company"]);
3276        let a = store.create_node(&["Animal"]);
3277        let b = store.create_node(&["Animal"]);
3278        store.create_edge(a, b, "EATS");
3279
3280        let labels = store.all_labels();
3281        assert_eq!(labels.len(), 3);
3282        assert!(labels.contains(&"Person".to_string()));
3283        assert!(labels.contains(&"Company".to_string()));
3284        assert!(labels.contains(&"Animal".to_string()));
3285
3286        let edge_types = store.all_edge_types();
3287        assert_eq!(edge_types.len(), 1);
3288        assert!(edge_types.contains(&"EATS".to_string()));
3289    }
3290
3291    #[test]
3292    fn test_all_property_keys() {
3293        let store = LpgStore::new();
3294
3295        let a = store.create_node_with_props(&["Person"], [("name", Value::from("Alice"))]);
3296        let b = store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
3297        store.create_edge_with_props(a, b, "KNOWS", [("since", Value::from(2020i64))]);
3298
3299        let keys = store.all_property_keys();
3300        assert!(keys.contains(&"name".to_string()));
3301        assert!(keys.contains(&"age".to_string()));
3302        assert!(keys.contains(&"since".to_string()));
3303    }
3304
3305    #[test]
3306    fn test_nodes_with_label() {
3307        let store = LpgStore::new();
3308
3309        store.create_node(&["Person"]);
3310        store.create_node(&["Person"]);
3311        store.create_node(&["Company"]);
3312
3313        let persons: Vec<_> = store.nodes_with_label("Person").collect();
3314        assert_eq!(persons.len(), 2);
3315
3316        let companies: Vec<_> = store.nodes_with_label("Company").collect();
3317        assert_eq!(companies.len(), 1);
3318
3319        let none: Vec<_> = store.nodes_with_label("NonExistent").collect();
3320        assert_eq!(none.len(), 0);
3321    }
3322
3323    #[test]
3324    fn test_edges_with_type() {
3325        let store = LpgStore::new();
3326
3327        let a = store.create_node(&["Person"]);
3328        let b = store.create_node(&["Person"]);
3329        let c = store.create_node(&["Company"]);
3330
3331        store.create_edge(a, b, "KNOWS");
3332        store.create_edge(a, c, "WORKS_AT");
3333
3334        let knows: Vec<_> = store.edges_with_type("KNOWS").collect();
3335        assert_eq!(knows.len(), 1);
3336
3337        let works_at: Vec<_> = store.edges_with_type("WORKS_AT").collect();
3338        assert_eq!(works_at.len(), 1);
3339
3340        let none: Vec<_> = store.edges_with_type("NonExistent").collect();
3341        assert_eq!(none.len(), 0);
3342    }
3343
3344    #[test]
3345    fn test_nodes_by_label_nonexistent() {
3346        let store = LpgStore::new();
3347        store.create_node(&["Person"]);
3348
3349        let empty = store.nodes_by_label("NonExistent");
3350        assert!(empty.is_empty());
3351    }
3352
3353    #[test]
3354    fn test_statistics() {
3355        let store = LpgStore::new();
3356
3357        let a = store.create_node(&["Person"]);
3358        let b = store.create_node(&["Person"]);
3359        let c = store.create_node(&["Company"]);
3360
3361        store.create_edge(a, b, "KNOWS");
3362        store.create_edge(a, c, "WORKS_AT");
3363
3364        store.compute_statistics();
3365        let stats = store.statistics();
3366
3367        assert_eq!(stats.total_nodes, 3);
3368        assert_eq!(stats.total_edges, 2);
3369
3370        // Estimates
3371        let person_card = store.estimate_label_cardinality("Person");
3372        assert!(person_card > 0.0);
3373
3374        let avg_degree = store.estimate_avg_degree("KNOWS", true);
3375        assert!(avg_degree >= 0.0);
3376    }
3377
3378    #[test]
3379    fn test_zone_maps() {
3380        let store = LpgStore::new();
3381
3382        store.create_node_with_props(&["Person"], [("age", Value::from(25i64))]);
3383        store.create_node_with_props(&["Person"], [("age", Value::from(35i64))]);
3384
3385        // Zone map should indicate possible matches (30 is within [25, 35] range)
3386        let might_match =
3387            store.node_property_might_match(&"age".into(), CompareOp::Eq, &Value::from(30i64));
3388        // Zone maps return true conservatively when value is within min/max range
3389        assert!(might_match);
3390
3391        let zone = store.node_property_zone_map(&"age".into());
3392        assert!(zone.is_some());
3393
3394        // Non-existent property
3395        let no_zone = store.node_property_zone_map(&"nonexistent".into());
3396        assert!(no_zone.is_none());
3397
3398        // Edge zone maps
3399        let a = store.create_node(&["A"]);
3400        let b = store.create_node(&["B"]);
3401        store.create_edge_with_props(a, b, "REL", [("weight", Value::from(1.0))]);
3402
3403        let edge_zone = store.edge_property_zone_map(&"weight".into());
3404        assert!(edge_zone.is_some());
3405    }
3406
3407    #[test]
3408    fn test_rebuild_zone_maps() {
3409        let store = LpgStore::new();
3410        store.create_node_with_props(&["Person"], [("age", Value::from(25i64))]);
3411
3412        // Should not panic
3413        store.rebuild_zone_maps();
3414    }
3415
3416    #[test]
3417    fn test_create_node_with_id() {
3418        let store = LpgStore::new();
3419
3420        let specific_id = NodeId::new(100);
3421        store.create_node_with_id(specific_id, &["Person", "Employee"]);
3422
3423        let node = store.get_node(specific_id).unwrap();
3424        assert!(node.has_label("Person"));
3425        assert!(node.has_label("Employee"));
3426
3427        // Next auto-generated ID should be > 100
3428        let next = store.create_node(&["Other"]);
3429        assert!(next.as_u64() > 100);
3430    }
3431
3432    #[test]
3433    fn test_create_edge_with_id() {
3434        let store = LpgStore::new();
3435
3436        let a = store.create_node(&["A"]);
3437        let b = store.create_node(&["B"]);
3438
3439        let specific_id = EdgeId::new(500);
3440        store.create_edge_with_id(specific_id, a, b, "REL");
3441
3442        let edge = store.get_edge(specific_id).unwrap();
3443        assert_eq!(edge.src, a);
3444        assert_eq!(edge.dst, b);
3445        assert_eq!(edge.edge_type.as_ref(), "REL");
3446
3447        // Next auto-generated ID should be > 500
3448        let next = store.create_edge(a, b, "OTHER");
3449        assert!(next.as_u64() > 500);
3450    }
3451
3452    #[test]
3453    fn test_set_epoch() {
3454        let store = LpgStore::new();
3455
3456        assert_eq!(store.current_epoch().as_u64(), 0);
3457
3458        store.set_epoch(EpochId::new(42));
3459        assert_eq!(store.current_epoch().as_u64(), 42);
3460    }
3461
3462    #[test]
3463    fn test_get_node_nonexistent() {
3464        let store = LpgStore::new();
3465        let fake_id = NodeId::new(999);
3466        assert!(store.get_node(fake_id).is_none());
3467    }
3468
3469    #[test]
3470    fn test_get_edge_nonexistent() {
3471        let store = LpgStore::new();
3472        let fake_id = EdgeId::new(999);
3473        assert!(store.get_edge(fake_id).is_none());
3474    }
3475
3476    #[test]
3477    fn test_multiple_labels() {
3478        let store = LpgStore::new();
3479
3480        let id = store.create_node(&["Person", "Employee", "Manager"]);
3481        let node = store.get_node(id).unwrap();
3482
3483        assert!(node.has_label("Person"));
3484        assert!(node.has_label("Employee"));
3485        assert!(node.has_label("Manager"));
3486        assert!(!node.has_label("Other"));
3487    }
3488
3489    #[test]
3490    fn test_default_impl() {
3491        let store: LpgStore = Default::default();
3492        assert_eq!(store.node_count(), 0);
3493        assert_eq!(store.edge_count(), 0);
3494    }
3495
3496    #[test]
3497    fn test_edges_from_both_directions() {
3498        let store = LpgStore::new();
3499
3500        let a = store.create_node(&["A"]);
3501        let b = store.create_node(&["B"]);
3502        let c = store.create_node(&["C"]);
3503
3504        let e1 = store.create_edge(a, b, "R1"); // a -> b
3505        let e2 = store.create_edge(c, a, "R2"); // c -> a
3506
3507        // Both directions from a
3508        let edges: Vec<_> = store.edges_from(a, Direction::Both).collect();
3509        assert_eq!(edges.len(), 2);
3510        assert!(edges.iter().any(|(_, e)| *e == e1)); // outgoing
3511        assert!(edges.iter().any(|(_, e)| *e == e2)); // incoming
3512    }
3513
3514    #[test]
3515    fn test_no_backward_adj_in_degree() {
3516        let config = LpgStoreConfig {
3517            backward_edges: false,
3518            initial_node_capacity: 10,
3519            initial_edge_capacity: 10,
3520        };
3521        let store = LpgStore::with_config(config);
3522
3523        let a = store.create_node(&["A"]);
3524        let b = store.create_node(&["B"]);
3525        store.create_edge(a, b, "R");
3526
3527        // in_degree should still work (falls back to scanning)
3528        let degree = store.in_degree(b);
3529        assert_eq!(degree, 1);
3530    }
3531
3532    #[test]
3533    fn test_no_backward_adj_edges_to() {
3534        let config = LpgStoreConfig {
3535            backward_edges: false,
3536            initial_node_capacity: 10,
3537            initial_edge_capacity: 10,
3538        };
3539        let store = LpgStore::with_config(config);
3540
3541        let a = store.create_node(&["A"]);
3542        let b = store.create_node(&["B"]);
3543        let e = store.create_edge(a, b, "R");
3544
3545        // edges_to should still work (falls back to scanning)
3546        let edges = store.edges_to(b);
3547        assert_eq!(edges.len(), 1);
3548        assert_eq!(edges[0].1, e);
3549    }
3550
3551    #[test]
3552    fn test_node_versioned_creation() {
3553        let store = LpgStore::new();
3554
3555        let epoch = store.new_epoch();
3556        let tx_id = TxId::new(1);
3557
3558        let id = store.create_node_versioned(&["Person"], epoch, tx_id);
3559        assert!(store.get_node(id).is_some());
3560    }
3561
3562    #[test]
3563    fn test_edge_versioned_creation() {
3564        let store = LpgStore::new();
3565
3566        let a = store.create_node(&["A"]);
3567        let b = store.create_node(&["B"]);
3568
3569        let epoch = store.new_epoch();
3570        let tx_id = TxId::new(1);
3571
3572        let edge_id = store.create_edge_versioned(a, b, "REL", epoch, tx_id);
3573        assert!(store.get_edge(edge_id).is_some());
3574    }
3575
3576    #[test]
3577    fn test_node_with_props_versioned() {
3578        let store = LpgStore::new();
3579
3580        let epoch = store.new_epoch();
3581        let tx_id = TxId::new(1);
3582
3583        let id = store.create_node_with_props_versioned(
3584            &["Person"],
3585            [("name", Value::from("Alice"))],
3586            epoch,
3587            tx_id,
3588        );
3589
3590        let node = store.get_node(id).unwrap();
3591        assert_eq!(
3592            node.get_property("name").and_then(|v| v.as_str()),
3593            Some("Alice")
3594        );
3595    }
3596
3597    #[test]
3598    fn test_discard_uncommitted_versions() {
3599        let store = LpgStore::new();
3600
3601        let epoch = store.new_epoch();
3602        let tx_id = TxId::new(42);
3603
3604        // Create node with specific tx
3605        let node_id = store.create_node_versioned(&["Person"], epoch, tx_id);
3606        assert!(store.get_node(node_id).is_some());
3607
3608        // Discard uncommitted versions for this tx
3609        store.discard_uncommitted_versions(tx_id);
3610
3611        // Node should be gone (version chain was removed)
3612        assert!(store.get_node(node_id).is_none());
3613    }
3614
3615    // === Property Index Tests ===
3616
3617    #[test]
3618    fn test_property_index_create_and_lookup() {
3619        let store = LpgStore::new();
3620
3621        // Create nodes with properties
3622        let alice = store.create_node(&["Person"]);
3623        let bob = store.create_node(&["Person"]);
3624        let charlie = store.create_node(&["Person"]);
3625
3626        store.set_node_property(alice, "city", Value::from("NYC"));
3627        store.set_node_property(bob, "city", Value::from("NYC"));
3628        store.set_node_property(charlie, "city", Value::from("LA"));
3629
3630        // Before indexing, lookup still works (via scan)
3631        let nyc_people = store.find_nodes_by_property("city", &Value::from("NYC"));
3632        assert_eq!(nyc_people.len(), 2);
3633
3634        // Create index
3635        store.create_property_index("city");
3636        assert!(store.has_property_index("city"));
3637
3638        // Indexed lookup should return same results
3639        let nyc_people = store.find_nodes_by_property("city", &Value::from("NYC"));
3640        assert_eq!(nyc_people.len(), 2);
3641        assert!(nyc_people.contains(&alice));
3642        assert!(nyc_people.contains(&bob));
3643
3644        let la_people = store.find_nodes_by_property("city", &Value::from("LA"));
3645        assert_eq!(la_people.len(), 1);
3646        assert!(la_people.contains(&charlie));
3647    }
3648
3649    #[test]
3650    fn test_property_index_maintained_on_update() {
3651        let store = LpgStore::new();
3652
3653        // Create index first
3654        store.create_property_index("status");
3655
3656        let node = store.create_node(&["Task"]);
3657        store.set_node_property(node, "status", Value::from("pending"));
3658
3659        // Should find by initial value
3660        let pending = store.find_nodes_by_property("status", &Value::from("pending"));
3661        assert_eq!(pending.len(), 1);
3662        assert!(pending.contains(&node));
3663
3664        // Update the property
3665        store.set_node_property(node, "status", Value::from("done"));
3666
3667        // Old value should not find it
3668        let pending = store.find_nodes_by_property("status", &Value::from("pending"));
3669        assert!(pending.is_empty());
3670
3671        // New value should find it
3672        let done = store.find_nodes_by_property("status", &Value::from("done"));
3673        assert_eq!(done.len(), 1);
3674        assert!(done.contains(&node));
3675    }
3676
3677    #[test]
3678    fn test_property_index_maintained_on_remove() {
3679        let store = LpgStore::new();
3680
3681        store.create_property_index("tag");
3682
3683        let node = store.create_node(&["Item"]);
3684        store.set_node_property(node, "tag", Value::from("important"));
3685
3686        // Should find it
3687        let found = store.find_nodes_by_property("tag", &Value::from("important"));
3688        assert_eq!(found.len(), 1);
3689
3690        // Remove the property
3691        store.remove_node_property(node, "tag");
3692
3693        // Should no longer find it
3694        let found = store.find_nodes_by_property("tag", &Value::from("important"));
3695        assert!(found.is_empty());
3696    }
3697
3698    #[test]
3699    fn test_property_index_drop() {
3700        let store = LpgStore::new();
3701
3702        store.create_property_index("key");
3703        assert!(store.has_property_index("key"));
3704
3705        assert!(store.drop_property_index("key"));
3706        assert!(!store.has_property_index("key"));
3707
3708        // Dropping non-existent index returns false
3709        assert!(!store.drop_property_index("key"));
3710    }
3711
3712    #[test]
3713    fn test_property_index_multiple_values() {
3714        let store = LpgStore::new();
3715
3716        store.create_property_index("age");
3717
3718        // Create multiple nodes with same and different ages
3719        let n1 = store.create_node(&["Person"]);
3720        let n2 = store.create_node(&["Person"]);
3721        let n3 = store.create_node(&["Person"]);
3722        let n4 = store.create_node(&["Person"]);
3723
3724        store.set_node_property(n1, "age", Value::from(25i64));
3725        store.set_node_property(n2, "age", Value::from(25i64));
3726        store.set_node_property(n3, "age", Value::from(30i64));
3727        store.set_node_property(n4, "age", Value::from(25i64));
3728
3729        let age_25 = store.find_nodes_by_property("age", &Value::from(25i64));
3730        assert_eq!(age_25.len(), 3);
3731
3732        let age_30 = store.find_nodes_by_property("age", &Value::from(30i64));
3733        assert_eq!(age_30.len(), 1);
3734
3735        let age_40 = store.find_nodes_by_property("age", &Value::from(40i64));
3736        assert!(age_40.is_empty());
3737    }
3738
3739    #[test]
3740    fn test_property_index_builds_from_existing_data() {
3741        let store = LpgStore::new();
3742
3743        // Create nodes first
3744        let n1 = store.create_node(&["Person"]);
3745        let n2 = store.create_node(&["Person"]);
3746        store.set_node_property(n1, "email", Value::from("alice@example.com"));
3747        store.set_node_property(n2, "email", Value::from("bob@example.com"));
3748
3749        // Create index after data exists
3750        store.create_property_index("email");
3751
3752        // Index should include existing data
3753        let alice = store.find_nodes_by_property("email", &Value::from("alice@example.com"));
3754        assert_eq!(alice.len(), 1);
3755        assert!(alice.contains(&n1));
3756
3757        let bob = store.find_nodes_by_property("email", &Value::from("bob@example.com"));
3758        assert_eq!(bob.len(), 1);
3759        assert!(bob.contains(&n2));
3760    }
3761
3762    #[test]
3763    fn test_get_node_property_batch() {
3764        let store = LpgStore::new();
3765
3766        let n1 = store.create_node(&["Person"]);
3767        let n2 = store.create_node(&["Person"]);
3768        let n3 = store.create_node(&["Person"]);
3769
3770        store.set_node_property(n1, "age", Value::from(25i64));
3771        store.set_node_property(n2, "age", Value::from(30i64));
3772        // n3 has no age property
3773
3774        let age_key = PropertyKey::new("age");
3775        let values = store.get_node_property_batch(&[n1, n2, n3], &age_key);
3776
3777        assert_eq!(values.len(), 3);
3778        assert_eq!(values[0], Some(Value::from(25i64)));
3779        assert_eq!(values[1], Some(Value::from(30i64)));
3780        assert_eq!(values[2], None);
3781    }
3782
3783    #[test]
3784    fn test_get_node_property_batch_empty() {
3785        let store = LpgStore::new();
3786        let key = PropertyKey::new("any");
3787
3788        let values = store.get_node_property_batch(&[], &key);
3789        assert!(values.is_empty());
3790    }
3791
3792    #[test]
3793    fn test_get_nodes_properties_batch() {
3794        let store = LpgStore::new();
3795
3796        let n1 = store.create_node(&["Person"]);
3797        let n2 = store.create_node(&["Person"]);
3798        let n3 = store.create_node(&["Person"]);
3799
3800        store.set_node_property(n1, "name", Value::from("Alice"));
3801        store.set_node_property(n1, "age", Value::from(25i64));
3802        store.set_node_property(n2, "name", Value::from("Bob"));
3803        // n3 has no properties
3804
3805        let all_props = store.get_nodes_properties_batch(&[n1, n2, n3]);
3806
3807        assert_eq!(all_props.len(), 3);
3808        assert_eq!(all_props[0].len(), 2); // name and age
3809        assert_eq!(all_props[1].len(), 1); // name only
3810        assert_eq!(all_props[2].len(), 0); // no properties
3811
3812        assert_eq!(
3813            all_props[0].get(&PropertyKey::new("name")),
3814            Some(&Value::from("Alice"))
3815        );
3816        assert_eq!(
3817            all_props[1].get(&PropertyKey::new("name")),
3818            Some(&Value::from("Bob"))
3819        );
3820    }
3821
3822    #[test]
3823    fn test_get_nodes_properties_batch_empty() {
3824        let store = LpgStore::new();
3825
3826        let all_props = store.get_nodes_properties_batch(&[]);
3827        assert!(all_props.is_empty());
3828    }
3829}