Skip to main content

grafeo_core/graph/lpg/
store.rs

1//! The in-memory LPG graph store.
2//!
3//! This is where your nodes and edges actually live. Most users interact
4//! through [`GrafeoDB`](grafeo_engine::GrafeoDB), but algorithm implementers
5//! sometimes need the raw [`LpgStore`] for direct adjacency traversal.
6//!
7//! Key features:
8//! - MVCC versioning - concurrent readers don't block each other
9//! - Columnar properties with zone maps for fast filtering
10//! - Forward and backward adjacency indexes
11
12use super::property::CompareOp;
13use super::{Edge, EdgeRecord, Node, NodeRecord, PropertyStorage};
14use crate::graph::Direction;
15use crate::index::adjacency::ChunkedAdjacency;
16use crate::index::zone_map::ZoneMapEntry;
17use crate::statistics::{EdgeTypeStatistics, LabelStatistics, Statistics};
18use dashmap::DashMap;
19#[cfg(not(feature = "tiered-storage"))]
20use grafeo_common::mvcc::VersionChain;
21use grafeo_common::types::{EdgeId, EpochId, HashableValue, NodeId, PropertyKey, TxId, Value};
22use grafeo_common::utils::hash::{FxHashMap, FxHashSet};
23use parking_lot::RwLock;
24use std::cmp::Ordering as CmpOrdering;
25use std::sync::Arc;
26use std::sync::atomic::{AtomicU64, Ordering};
27
28/// Compares two values for ordering (used for range checks).
29fn compare_values_for_range(a: &Value, b: &Value) -> Option<CmpOrdering> {
30    match (a, b) {
31        (Value::Int64(a), Value::Int64(b)) => Some(a.cmp(b)),
32        (Value::Float64(a), Value::Float64(b)) => a.partial_cmp(b),
33        (Value::String(a), Value::String(b)) => Some(a.cmp(b)),
34        (Value::Bool(a), Value::Bool(b)) => Some(a.cmp(b)),
35        _ => None,
36    }
37}
38
39/// Checks if a value is within a range.
40fn value_in_range(
41    value: &Value,
42    min: Option<&Value>,
43    max: Option<&Value>,
44    min_inclusive: bool,
45    max_inclusive: bool,
46) -> bool {
47    // Check lower bound
48    if let Some(min_val) = min {
49        match compare_values_for_range(value, min_val) {
50            Some(CmpOrdering::Less) => return false,
51            Some(CmpOrdering::Equal) if !min_inclusive => return false,
52            None => return false, // Can't compare
53            _ => {}
54        }
55    }
56
57    // Check upper bound
58    if let Some(max_val) = max {
59        match compare_values_for_range(value, max_val) {
60            Some(CmpOrdering::Greater) => return false,
61            Some(CmpOrdering::Equal) if !max_inclusive => return false,
62            None => return false,
63            _ => {}
64        }
65    }
66
67    true
68}
69
70// Tiered storage imports
71#[cfg(feature = "tiered-storage")]
72use crate::storage::EpochStore;
73#[cfg(feature = "tiered-storage")]
74use grafeo_common::memory::arena::ArenaAllocator;
75#[cfg(feature = "tiered-storage")]
76use grafeo_common::mvcc::{ColdVersionRef, HotVersionRef, VersionIndex, VersionRef};
77
78/// Configuration for the LPG store.
79///
80/// The defaults work well for most cases. Tune `backward_edges` if you only
81/// traverse in one direction (saves memory), or adjust capacities if you know
82/// your graph size upfront (avoids reallocations).
83#[derive(Debug, Clone)]
84pub struct LpgStoreConfig {
85    /// Maintain backward adjacency for incoming edge queries. Turn off if
86    /// you only traverse outgoing edges - saves ~50% adjacency memory.
87    pub backward_edges: bool,
88    /// Initial capacity for nodes (avoids early reallocations).
89    pub initial_node_capacity: usize,
90    /// Initial capacity for edges (avoids early reallocations).
91    pub initial_edge_capacity: usize,
92}
93
94impl Default for LpgStoreConfig {
95    fn default() -> Self {
96        Self {
97            backward_edges: true,
98            initial_node_capacity: 1024,
99            initial_edge_capacity: 4096,
100        }
101    }
102}
103
104/// The core in-memory graph storage.
105///
106/// Everything lives here: nodes, edges, properties, adjacency indexes, and
107/// version chains for MVCC. Concurrent reads never block each other.
108///
109/// Most users should go through `GrafeoDB` (from the `grafeo_engine` crate) which
110/// adds transaction management and query execution. Use `LpgStore` directly
111/// when you need raw performance for algorithm implementations.
112///
113/// # Example
114///
115/// ```
116/// use grafeo_core::graph::lpg::LpgStore;
117/// use grafeo_core::graph::Direction;
118///
119/// let store = LpgStore::new();
120///
121/// // Create a small social network
122/// let alice = store.create_node(&["Person"]);
123/// let bob = store.create_node(&["Person"]);
124/// store.create_edge(alice, bob, "KNOWS");
125///
126/// // Traverse outgoing edges
127/// for neighbor in store.neighbors(alice, Direction::Outgoing) {
128///     println!("Alice knows node {:?}", neighbor);
129/// }
130/// ```
131pub struct LpgStore {
132    /// Configuration.
133    #[allow(dead_code)]
134    config: LpgStoreConfig,
135
136    /// Node records indexed by NodeId, with version chains for MVCC.
137    /// Used when `tiered-storage` feature is disabled.
138    #[cfg(not(feature = "tiered-storage"))]
139    nodes: RwLock<FxHashMap<NodeId, VersionChain<NodeRecord>>>,
140
141    /// Edge records indexed by EdgeId, with version chains for MVCC.
142    /// Used when `tiered-storage` feature is disabled.
143    #[cfg(not(feature = "tiered-storage"))]
144    edges: RwLock<FxHashMap<EdgeId, VersionChain<EdgeRecord>>>,
145
146    // === Tiered Storage Fields (feature-gated) ===
147    /// Arena allocator for hot data storage.
148    /// Data is stored in per-epoch arenas for fast allocation and bulk deallocation.
149    #[cfg(feature = "tiered-storage")]
150    arena_allocator: Arc<ArenaAllocator>,
151
152    /// Node version indexes - store metadata and arena offsets.
153    /// The actual NodeRecord data is stored in the arena.
154    #[cfg(feature = "tiered-storage")]
155    node_versions: RwLock<FxHashMap<NodeId, VersionIndex>>,
156
157    /// Edge version indexes - store metadata and arena offsets.
158    /// The actual EdgeRecord data is stored in the arena.
159    #[cfg(feature = "tiered-storage")]
160    edge_versions: RwLock<FxHashMap<EdgeId, VersionIndex>>,
161
162    /// Cold storage for frozen epochs.
163    /// Contains compressed epoch blocks for historical data.
164    #[cfg(feature = "tiered-storage")]
165    epoch_store: Arc<EpochStore>,
166
167    /// Property storage for nodes.
168    node_properties: PropertyStorage<NodeId>,
169
170    /// Property storage for edges.
171    edge_properties: PropertyStorage<EdgeId>,
172
173    /// Label name to ID mapping.
174    label_to_id: RwLock<FxHashMap<Arc<str>, u32>>,
175
176    /// Label ID to name mapping.
177    id_to_label: RwLock<Vec<Arc<str>>>,
178
179    /// Edge type name to ID mapping.
180    edge_type_to_id: RwLock<FxHashMap<Arc<str>, u32>>,
181
182    /// Edge type ID to name mapping.
183    id_to_edge_type: RwLock<Vec<Arc<str>>>,
184
185    /// Forward adjacency lists (outgoing edges).
186    forward_adj: ChunkedAdjacency,
187
188    /// Backward adjacency lists (incoming edges).
189    /// Only populated if config.backward_edges is true.
190    backward_adj: Option<ChunkedAdjacency>,
191
192    /// Label index: label_id -> set of node IDs.
193    label_index: RwLock<Vec<FxHashMap<NodeId, ()>>>,
194
195    /// Node labels: node_id -> set of label IDs.
196    /// Reverse mapping to efficiently get labels for a node.
197    node_labels: RwLock<FxHashMap<NodeId, FxHashSet<u32>>>,
198
199    /// Property indexes: property_key -> (value -> set of node IDs).
200    ///
201    /// When a property is indexed, lookups by value are O(1) instead of O(n).
202    /// Use [`create_property_index`] to enable indexing for a property.
203    property_indexes: RwLock<FxHashMap<PropertyKey, DashMap<HashableValue, FxHashSet<NodeId>>>>,
204
205    /// Next node ID.
206    next_node_id: AtomicU64,
207
208    /// Next edge ID.
209    next_edge_id: AtomicU64,
210
211    /// Current epoch.
212    current_epoch: AtomicU64,
213
214    /// Statistics for cost-based optimization.
215    statistics: RwLock<Statistics>,
216}
217
218impl LpgStore {
219    /// Creates a new LPG store with default configuration.
220    #[must_use]
221    pub fn new() -> Self {
222        Self::with_config(LpgStoreConfig::default())
223    }
224
225    /// Creates a new LPG store with custom configuration.
226    #[must_use]
227    pub fn with_config(config: LpgStoreConfig) -> Self {
228        let backward_adj = if config.backward_edges {
229            Some(ChunkedAdjacency::new())
230        } else {
231            None
232        };
233
234        Self {
235            #[cfg(not(feature = "tiered-storage"))]
236            nodes: RwLock::new(FxHashMap::default()),
237            #[cfg(not(feature = "tiered-storage"))]
238            edges: RwLock::new(FxHashMap::default()),
239            #[cfg(feature = "tiered-storage")]
240            arena_allocator: Arc::new(ArenaAllocator::new()),
241            #[cfg(feature = "tiered-storage")]
242            node_versions: RwLock::new(FxHashMap::default()),
243            #[cfg(feature = "tiered-storage")]
244            edge_versions: RwLock::new(FxHashMap::default()),
245            #[cfg(feature = "tiered-storage")]
246            epoch_store: Arc::new(EpochStore::new()),
247            node_properties: PropertyStorage::new(),
248            edge_properties: PropertyStorage::new(),
249            label_to_id: RwLock::new(FxHashMap::default()),
250            id_to_label: RwLock::new(Vec::new()),
251            edge_type_to_id: RwLock::new(FxHashMap::default()),
252            id_to_edge_type: RwLock::new(Vec::new()),
253            forward_adj: ChunkedAdjacency::new(),
254            backward_adj,
255            label_index: RwLock::new(Vec::new()),
256            node_labels: RwLock::new(FxHashMap::default()),
257            property_indexes: RwLock::new(FxHashMap::default()),
258            next_node_id: AtomicU64::new(0),
259            next_edge_id: AtomicU64::new(0),
260            current_epoch: AtomicU64::new(0),
261            statistics: RwLock::new(Statistics::new()),
262            config,
263        }
264    }
265
266    /// Returns the current epoch.
267    #[must_use]
268    pub fn current_epoch(&self) -> EpochId {
269        EpochId::new(self.current_epoch.load(Ordering::Acquire))
270    }
271
272    /// Creates a new epoch.
273    pub fn new_epoch(&self) -> EpochId {
274        let id = self.current_epoch.fetch_add(1, Ordering::AcqRel) + 1;
275        EpochId::new(id)
276    }
277
278    // === Node Operations ===
279
280    /// Creates a new node with the given labels.
281    ///
282    /// Uses the system transaction for non-transactional operations.
283    pub fn create_node(&self, labels: &[&str]) -> NodeId {
284        self.create_node_versioned(labels, self.current_epoch(), TxId::SYSTEM)
285    }
286
287    /// Creates a new node with the given labels within a transaction context.
288    #[cfg(not(feature = "tiered-storage"))]
289    pub fn create_node_versioned(&self, labels: &[&str], epoch: EpochId, tx_id: TxId) -> NodeId {
290        let id = NodeId::new(self.next_node_id.fetch_add(1, Ordering::Relaxed));
291
292        let mut record = NodeRecord::new(id, epoch);
293        record.set_label_count(labels.len() as u16);
294
295        // Store labels in node_labels map and label_index
296        let mut node_label_set = FxHashSet::default();
297        for label in labels {
298            let label_id = self.get_or_create_label_id(*label);
299            node_label_set.insert(label_id);
300
301            // Update label index
302            let mut index = self.label_index.write();
303            while index.len() <= label_id as usize {
304                index.push(FxHashMap::default());
305            }
306            index[label_id as usize].insert(id, ());
307        }
308
309        // Store node's labels
310        self.node_labels.write().insert(id, node_label_set);
311
312        // Create version chain with initial version
313        let chain = VersionChain::with_initial(record, epoch, tx_id);
314        self.nodes.write().insert(id, chain);
315        id
316    }
317
318    /// Creates a new node with the given labels within a transaction context.
319    /// (Tiered storage version: stores data in arena, metadata in VersionIndex)
320    #[cfg(feature = "tiered-storage")]
321    pub fn create_node_versioned(&self, labels: &[&str], epoch: EpochId, tx_id: TxId) -> NodeId {
322        let id = NodeId::new(self.next_node_id.fetch_add(1, Ordering::Relaxed));
323
324        let mut record = NodeRecord::new(id, epoch);
325        record.set_label_count(labels.len() as u16);
326
327        // Store labels in node_labels map and label_index
328        let mut node_label_set = FxHashSet::default();
329        for label in labels {
330            let label_id = self.get_or_create_label_id(*label);
331            node_label_set.insert(label_id);
332
333            // Update label index
334            let mut index = self.label_index.write();
335            while index.len() <= label_id as usize {
336                index.push(FxHashMap::default());
337            }
338            index[label_id as usize].insert(id, ());
339        }
340
341        // Store node's labels
342        self.node_labels.write().insert(id, node_label_set);
343
344        // Allocate record in arena and get offset (create epoch if needed)
345        let arena = self.arena_allocator.arena_or_create(epoch);
346        let (offset, _stored) = arena.alloc_value_with_offset(record);
347
348        // Create HotVersionRef pointing to arena data
349        let hot_ref = HotVersionRef::new(epoch, offset, tx_id);
350
351        // Create or update version index
352        let mut versions = self.node_versions.write();
353        if let Some(index) = versions.get_mut(&id) {
354            index.add_hot(hot_ref);
355        } else {
356            versions.insert(id, VersionIndex::with_initial(hot_ref));
357        }
358
359        id
360    }
361
362    /// Creates a new node with labels and properties.
363    pub fn create_node_with_props(
364        &self,
365        labels: &[&str],
366        properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
367    ) -> NodeId {
368        self.create_node_with_props_versioned(
369            labels,
370            properties,
371            self.current_epoch(),
372            TxId::SYSTEM,
373        )
374    }
375
376    /// Creates a new node with labels and properties within a transaction context.
377    #[cfg(not(feature = "tiered-storage"))]
378    pub fn create_node_with_props_versioned(
379        &self,
380        labels: &[&str],
381        properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
382        epoch: EpochId,
383        tx_id: TxId,
384    ) -> NodeId {
385        let id = self.create_node_versioned(labels, epoch, tx_id);
386
387        for (key, value) in properties {
388            self.node_properties.set(id, key.into(), value.into());
389        }
390
391        // Update props_count in record
392        let count = self.node_properties.get_all(id).len() as u16;
393        if let Some(chain) = self.nodes.write().get_mut(&id) {
394            if let Some(record) = chain.latest_mut() {
395                record.props_count = count;
396            }
397        }
398
399        id
400    }
401
402    /// Creates a new node with labels and properties within a transaction context.
403    /// (Tiered storage version)
404    #[cfg(feature = "tiered-storage")]
405    pub fn create_node_with_props_versioned(
406        &self,
407        labels: &[&str],
408        properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
409        epoch: EpochId,
410        tx_id: TxId,
411    ) -> NodeId {
412        let id = self.create_node_versioned(labels, epoch, tx_id);
413
414        for (key, value) in properties {
415            self.node_properties.set(id, key.into(), value.into());
416        }
417
418        // Note: props_count in record is not updated for tiered storage.
419        // The record is immutable once allocated in the arena.
420
421        id
422    }
423
424    /// Gets a node by ID (latest visible version).
425    #[must_use]
426    pub fn get_node(&self, id: NodeId) -> Option<Node> {
427        self.get_node_at_epoch(id, self.current_epoch())
428    }
429
430    /// Gets a node by ID at a specific epoch.
431    #[must_use]
432    #[cfg(not(feature = "tiered-storage"))]
433    pub fn get_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> Option<Node> {
434        let nodes = self.nodes.read();
435        let chain = nodes.get(&id)?;
436        let record = chain.visible_at(epoch)?;
437
438        if record.is_deleted() {
439            return None;
440        }
441
442        let mut node = Node::new(id);
443
444        // Get labels from node_labels map
445        let id_to_label = self.id_to_label.read();
446        let node_labels = self.node_labels.read();
447        if let Some(label_ids) = node_labels.get(&id) {
448            for &label_id in label_ids {
449                if let Some(label) = id_to_label.get(label_id as usize) {
450                    node.labels.push(label.clone());
451                }
452            }
453        }
454
455        // Get properties
456        node.properties = self.node_properties.get_all(id).into_iter().collect();
457
458        Some(node)
459    }
460
461    /// Gets a node by ID at a specific epoch.
462    /// (Tiered storage version: reads from arena via VersionIndex)
463    #[must_use]
464    #[cfg(feature = "tiered-storage")]
465    pub fn get_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> Option<Node> {
466        let versions = self.node_versions.read();
467        let index = versions.get(&id)?;
468        let version_ref = index.visible_at(epoch)?;
469
470        // Read the record from arena
471        let record = self.read_node_record(&version_ref)?;
472
473        if record.is_deleted() {
474            return None;
475        }
476
477        let mut node = Node::new(id);
478
479        // Get labels from node_labels map
480        let id_to_label = self.id_to_label.read();
481        let node_labels = self.node_labels.read();
482        if let Some(label_ids) = node_labels.get(&id) {
483            for &label_id in label_ids {
484                if let Some(label) = id_to_label.get(label_id as usize) {
485                    node.labels.push(label.clone());
486                }
487            }
488        }
489
490        // Get properties
491        node.properties = self.node_properties.get_all(id).into_iter().collect();
492
493        Some(node)
494    }
495
496    /// Gets a node visible to a specific transaction.
497    #[must_use]
498    #[cfg(not(feature = "tiered-storage"))]
499    pub fn get_node_versioned(&self, id: NodeId, epoch: EpochId, tx_id: TxId) -> Option<Node> {
500        let nodes = self.nodes.read();
501        let chain = nodes.get(&id)?;
502        let record = chain.visible_to(epoch, tx_id)?;
503
504        if record.is_deleted() {
505            return None;
506        }
507
508        let mut node = Node::new(id);
509
510        // Get labels from node_labels map
511        let id_to_label = self.id_to_label.read();
512        let node_labels = self.node_labels.read();
513        if let Some(label_ids) = node_labels.get(&id) {
514            for &label_id in label_ids {
515                if let Some(label) = id_to_label.get(label_id as usize) {
516                    node.labels.push(label.clone());
517                }
518            }
519        }
520
521        // Get properties
522        node.properties = self.node_properties.get_all(id).into_iter().collect();
523
524        Some(node)
525    }
526
527    /// Gets a node visible to a specific transaction.
528    /// (Tiered storage version: reads from arena via VersionIndex)
529    #[must_use]
530    #[cfg(feature = "tiered-storage")]
531    pub fn get_node_versioned(&self, id: NodeId, epoch: EpochId, tx_id: TxId) -> Option<Node> {
532        let versions = self.node_versions.read();
533        let index = versions.get(&id)?;
534        let version_ref = index.visible_to(epoch, tx_id)?;
535
536        // Read the record from arena
537        let record = self.read_node_record(&version_ref)?;
538
539        if record.is_deleted() {
540            return None;
541        }
542
543        let mut node = Node::new(id);
544
545        // Get labels from node_labels map
546        let id_to_label = self.id_to_label.read();
547        let node_labels = self.node_labels.read();
548        if let Some(label_ids) = node_labels.get(&id) {
549            for &label_id in label_ids {
550                if let Some(label) = id_to_label.get(label_id as usize) {
551                    node.labels.push(label.clone());
552                }
553            }
554        }
555
556        // Get properties
557        node.properties = self.node_properties.get_all(id).into_iter().collect();
558
559        Some(node)
560    }
561
562    /// Reads a NodeRecord from arena (hot) or epoch store (cold) using a VersionRef.
563    #[cfg(feature = "tiered-storage")]
564    #[allow(unsafe_code)]
565    fn read_node_record(&self, version_ref: &VersionRef) -> Option<NodeRecord> {
566        match version_ref {
567            VersionRef::Hot(hot_ref) => {
568                let arena = self.arena_allocator.arena(hot_ref.epoch);
569                // SAFETY: The offset was returned by alloc_value_with_offset for a NodeRecord
570                let record: &NodeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
571                Some(record.clone())
572            }
573            VersionRef::Cold(cold_ref) => {
574                // Read from compressed epoch store
575                self.epoch_store
576                    .get_node(cold_ref.epoch, cold_ref.block_offset, cold_ref.length)
577            }
578        }
579    }
580
581    /// Deletes a node and all its edges (using latest epoch).
582    pub fn delete_node(&self, id: NodeId) -> bool {
583        self.delete_node_at_epoch(id, self.current_epoch())
584    }
585
586    /// Deletes a node at a specific epoch.
587    #[cfg(not(feature = "tiered-storage"))]
588    pub fn delete_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> bool {
589        let mut nodes = self.nodes.write();
590        if let Some(chain) = nodes.get_mut(&id) {
591            // Check if visible at this epoch (not already deleted)
592            if let Some(record) = chain.visible_at(epoch) {
593                if record.is_deleted() {
594                    return false;
595                }
596            } else {
597                // Not visible at this epoch (already deleted or doesn't exist)
598                return false;
599            }
600
601            // Mark the version chain as deleted at this epoch
602            chain.mark_deleted(epoch);
603
604            // Remove from label index using node_labels map
605            let mut index = self.label_index.write();
606            let mut node_labels = self.node_labels.write();
607            if let Some(label_ids) = node_labels.remove(&id) {
608                for label_id in label_ids {
609                    if let Some(set) = index.get_mut(label_id as usize) {
610                        set.remove(&id);
611                    }
612                }
613            }
614
615            // Remove properties
616            drop(nodes); // Release lock before removing properties
617            drop(index);
618            drop(node_labels);
619            self.node_properties.remove_all(id);
620
621            // Note: Caller should use delete_node_edges() first if detach is needed
622
623            true
624        } else {
625            false
626        }
627    }
628
629    /// Deletes a node at a specific epoch.
630    /// (Tiered storage version)
631    #[cfg(feature = "tiered-storage")]
632    pub fn delete_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> bool {
633        let mut versions = self.node_versions.write();
634        if let Some(index) = versions.get_mut(&id) {
635            // Check if visible at this epoch
636            if let Some(version_ref) = index.visible_at(epoch) {
637                if let Some(record) = self.read_node_record(&version_ref) {
638                    if record.is_deleted() {
639                        return false;
640                    }
641                } else {
642                    return false;
643                }
644            } else {
645                return false;
646            }
647
648            // Mark as deleted in version index
649            index.mark_deleted(epoch);
650
651            // Remove from label index using node_labels map
652            let mut label_index = self.label_index.write();
653            let mut node_labels = self.node_labels.write();
654            if let Some(label_ids) = node_labels.remove(&id) {
655                for label_id in label_ids {
656                    if let Some(set) = label_index.get_mut(label_id as usize) {
657                        set.remove(&id);
658                    }
659                }
660            }
661
662            // Remove properties
663            drop(versions);
664            drop(label_index);
665            drop(node_labels);
666            self.node_properties.remove_all(id);
667
668            true
669        } else {
670            false
671        }
672    }
673
674    /// Deletes all edges connected to a node (implements DETACH DELETE).
675    ///
676    /// Call this before `delete_node()` if you want to remove a node that
677    /// has edges. Grafeo doesn't auto-delete edges - you have to be explicit.
678    #[cfg(not(feature = "tiered-storage"))]
679    pub fn delete_node_edges(&self, node_id: NodeId) {
680        // Get outgoing edges
681        let outgoing: Vec<EdgeId> = self
682            .forward_adj
683            .edges_from(node_id)
684            .into_iter()
685            .map(|(_, edge_id)| edge_id)
686            .collect();
687
688        // Get incoming edges
689        let incoming: Vec<EdgeId> = if let Some(ref backward) = self.backward_adj {
690            backward
691                .edges_from(node_id)
692                .into_iter()
693                .map(|(_, edge_id)| edge_id)
694                .collect()
695        } else {
696            // No backward adjacency - scan all edges
697            let epoch = self.current_epoch();
698            self.edges
699                .read()
700                .iter()
701                .filter_map(|(id, chain)| {
702                    chain.visible_at(epoch).and_then(|r| {
703                        if !r.is_deleted() && r.dst == node_id {
704                            Some(*id)
705                        } else {
706                            None
707                        }
708                    })
709                })
710                .collect()
711        };
712
713        // Delete all edges
714        for edge_id in outgoing.into_iter().chain(incoming) {
715            self.delete_edge(edge_id);
716        }
717    }
718
719    /// Deletes all edges connected to a node (implements DETACH DELETE).
720    /// (Tiered storage version)
721    #[cfg(feature = "tiered-storage")]
722    pub fn delete_node_edges(&self, node_id: NodeId) {
723        // Get outgoing edges
724        let outgoing: Vec<EdgeId> = self
725            .forward_adj
726            .edges_from(node_id)
727            .into_iter()
728            .map(|(_, edge_id)| edge_id)
729            .collect();
730
731        // Get incoming edges
732        let incoming: Vec<EdgeId> = if let Some(ref backward) = self.backward_adj {
733            backward
734                .edges_from(node_id)
735                .into_iter()
736                .map(|(_, edge_id)| edge_id)
737                .collect()
738        } else {
739            // No backward adjacency - scan all edges
740            let epoch = self.current_epoch();
741            let versions = self.edge_versions.read();
742            versions
743                .iter()
744                .filter_map(|(id, index)| {
745                    index.visible_at(epoch).and_then(|vref| {
746                        self.read_edge_record(&vref).and_then(|r| {
747                            if !r.is_deleted() && r.dst == node_id {
748                                Some(*id)
749                            } else {
750                                None
751                            }
752                        })
753                    })
754                })
755                .collect()
756        };
757
758        // Delete all edges
759        for edge_id in outgoing.into_iter().chain(incoming) {
760            self.delete_edge(edge_id);
761        }
762    }
763
764    /// Sets a property on a node.
765    #[cfg(not(feature = "tiered-storage"))]
766    pub fn set_node_property(&self, id: NodeId, key: &str, value: Value) {
767        let prop_key: PropertyKey = key.into();
768
769        // Update property index before setting the property (needs to read old value)
770        self.update_property_index_on_set(id, &prop_key, &value);
771
772        self.node_properties.set(id, prop_key, value);
773
774        // Update props_count in record
775        let count = self.node_properties.get_all(id).len() as u16;
776        if let Some(chain) = self.nodes.write().get_mut(&id) {
777            if let Some(record) = chain.latest_mut() {
778                record.props_count = count;
779            }
780        }
781    }
782
783    /// Sets a property on a node.
784    /// (Tiered storage version: properties stored separately, record is immutable)
785    #[cfg(feature = "tiered-storage")]
786    pub fn set_node_property(&self, id: NodeId, key: &str, value: Value) {
787        let prop_key: PropertyKey = key.into();
788
789        // Update property index before setting the property (needs to read old value)
790        self.update_property_index_on_set(id, &prop_key, &value);
791
792        self.node_properties.set(id, prop_key, value);
793        // Note: props_count in record is not updated for tiered storage.
794        // The record is immutable once allocated in the arena.
795        // Property count can be derived from PropertyStorage if needed.
796    }
797
798    /// Sets a property on an edge.
799    pub fn set_edge_property(&self, id: EdgeId, key: &str, value: Value) {
800        self.edge_properties.set(id, key.into(), value);
801    }
802
803    /// Removes a property from a node.
804    ///
805    /// Returns the previous value if it existed, or None if the property didn't exist.
806    #[cfg(not(feature = "tiered-storage"))]
807    pub fn remove_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
808        let prop_key: PropertyKey = key.into();
809
810        // Update property index before removing (needs to read old value)
811        self.update_property_index_on_remove(id, &prop_key);
812
813        let result = self.node_properties.remove(id, &prop_key);
814
815        // Update props_count in record
816        let count = self.node_properties.get_all(id).len() as u16;
817        if let Some(chain) = self.nodes.write().get_mut(&id) {
818            if let Some(record) = chain.latest_mut() {
819                record.props_count = count;
820            }
821        }
822
823        result
824    }
825
826    /// Removes a property from a node.
827    /// (Tiered storage version)
828    #[cfg(feature = "tiered-storage")]
829    pub fn remove_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
830        let prop_key: PropertyKey = key.into();
831
832        // Update property index before removing (needs to read old value)
833        self.update_property_index_on_remove(id, &prop_key);
834
835        self.node_properties.remove(id, &prop_key)
836        // Note: props_count in record is not updated for tiered storage.
837    }
838
839    /// Removes a property from an edge.
840    ///
841    /// Returns the previous value if it existed, or None if the property didn't exist.
842    pub fn remove_edge_property(&self, id: EdgeId, key: &str) -> Option<Value> {
843        self.edge_properties.remove(id, &key.into())
844    }
845
846    /// Gets a single property from a node without loading all properties.
847    ///
848    /// This is O(1) vs O(properties) for `get_node().get_property()`.
849    /// Use this for filter predicates where you only need one property value.
850    ///
851    /// # Example
852    ///
853    /// ```ignore
854    /// // Fast: Direct single-property lookup
855    /// let age = store.get_node_property(node_id, "age");
856    ///
857    /// // Slow: Loads all properties, then extracts one
858    /// let age = store.get_node(node_id).and_then(|n| n.get_property("age").cloned());
859    /// ```
860    #[must_use]
861    pub fn get_node_property(&self, id: NodeId, key: &PropertyKey) -> Option<Value> {
862        self.node_properties.get(id, key)
863    }
864
865    /// Gets a single property from an edge without loading all properties.
866    ///
867    /// This is O(1) vs O(properties) for `get_edge().get_property()`.
868    #[must_use]
869    pub fn get_edge_property(&self, id: EdgeId, key: &PropertyKey) -> Option<Value> {
870        self.edge_properties.get(id, key)
871    }
872
873    // === Batch Property Operations ===
874
875    /// Gets a property for multiple nodes in a single batch operation.
876    ///
877    /// More efficient than calling [`Self::get_node_property`] in a loop because it
878    /// reduces lock overhead and enables better cache utilization.
879    ///
880    /// # Example
881    ///
882    /// ```
883    /// use grafeo_core::graph::lpg::LpgStore;
884    /// use grafeo_common::types::{NodeId, PropertyKey, Value};
885    ///
886    /// let store = LpgStore::new();
887    /// let n1 = store.create_node(&["Person"]);
888    /// let n2 = store.create_node(&["Person"]);
889    /// store.set_node_property(n1, "age", Value::from(25i64));
890    /// store.set_node_property(n2, "age", Value::from(30i64));
891    ///
892    /// let ages = store.get_node_property_batch(&[n1, n2], &PropertyKey::new("age"));
893    /// assert_eq!(ages, vec![Some(Value::from(25i64)), Some(Value::from(30i64))]);
894    /// ```
895    #[must_use]
896    pub fn get_node_property_batch(&self, ids: &[NodeId], key: &PropertyKey) -> Vec<Option<Value>> {
897        ids.iter()
898            .map(|&id| self.node_properties.get(id, key))
899            .collect()
900    }
901
902    /// Gets all properties for multiple nodes in a single batch operation.
903    ///
904    /// Returns a vector of property maps, one per node ID (empty map if no properties).
905    /// More efficient than calling [`Self::get_node`] in a loop.
906    #[must_use]
907    pub fn get_nodes_properties_batch(&self, ids: &[NodeId]) -> Vec<FxHashMap<PropertyKey, Value>> {
908        ids.iter()
909            .map(|&id| self.node_properties.get_all(id))
910            .collect()
911    }
912
913    /// Finds nodes where a property value is in a range.
914    ///
915    /// This is useful for queries like `n.age > 30` or `n.price BETWEEN 10 AND 100`.
916    /// Uses zone maps to skip scanning when the range definitely doesn't match.
917    ///
918    /// # Arguments
919    ///
920    /// * `property` - The property to check
921    /// * `min` - Optional lower bound (None for unbounded)
922    /// * `max` - Optional upper bound (None for unbounded)
923    /// * `min_inclusive` - Whether lower bound is inclusive (>= vs >)
924    /// * `max_inclusive` - Whether upper bound is inclusive (<= vs <)
925    ///
926    /// # Example
927    ///
928    /// ```
929    /// use grafeo_core::graph::lpg::LpgStore;
930    /// use grafeo_common::types::Value;
931    ///
932    /// let store = LpgStore::new();
933    /// let n1 = store.create_node(&["Person"]);
934    /// let n2 = store.create_node(&["Person"]);
935    /// store.set_node_property(n1, "age", Value::from(25i64));
936    /// store.set_node_property(n2, "age", Value::from(35i64));
937    ///
938    /// // Find nodes where age > 30
939    /// let result = store.find_nodes_in_range(
940    ///     "age",
941    ///     Some(&Value::from(30i64)),
942    ///     None,
943    ///     false, // exclusive lower bound
944    ///     true,  // inclusive upper bound (doesn't matter since None)
945    /// );
946    /// assert_eq!(result.len(), 1); // Only n2 matches
947    /// ```
948    #[must_use]
949    pub fn find_nodes_in_range(
950        &self,
951        property: &str,
952        min: Option<&Value>,
953        max: Option<&Value>,
954        min_inclusive: bool,
955        max_inclusive: bool,
956    ) -> Vec<NodeId> {
957        let key = PropertyKey::new(property);
958
959        // Check zone map first - if no values could match, return empty
960        if !self
961            .node_properties
962            .might_match_range(&key, min, max, min_inclusive, max_inclusive)
963        {
964            return Vec::new();
965        }
966
967        // Scan all nodes and filter by range
968        self.node_ids()
969            .into_iter()
970            .filter(|&node_id| {
971                self.node_properties
972                    .get(node_id, &key)
973                    .is_some_and(|v| value_in_range(&v, min, max, min_inclusive, max_inclusive))
974            })
975            .collect()
976    }
977
978    /// Finds nodes matching multiple property equality conditions.
979    ///
980    /// This is more efficient than intersecting multiple single-property lookups
981    /// because it can use indexes when available and short-circuits on the first
982    /// miss.
983    ///
984    /// # Example
985    ///
986    /// ```
987    /// use grafeo_core::graph::lpg::LpgStore;
988    /// use grafeo_common::types::Value;
989    ///
990    /// let store = LpgStore::new();
991    /// let alice = store.create_node(&["Person"]);
992    /// store.set_node_property(alice, "name", Value::from("Alice"));
993    /// store.set_node_property(alice, "city", Value::from("NYC"));
994    ///
995    /// // Find nodes where name = "Alice" AND city = "NYC"
996    /// let matches = store.find_nodes_by_properties(&[
997    ///     ("name", Value::from("Alice")),
998    ///     ("city", Value::from("NYC")),
999    /// ]);
1000    /// assert!(matches.contains(&alice));
1001    /// ```
1002    #[must_use]
1003    pub fn find_nodes_by_properties(&self, conditions: &[(&str, Value)]) -> Vec<NodeId> {
1004        if conditions.is_empty() {
1005            return self.node_ids();
1006        }
1007
1008        // Find the most selective condition (smallest result set) to start
1009        // If any condition has an index, use that first
1010        let mut best_start: Option<(usize, Vec<NodeId>)> = None;
1011        let indexes = self.property_indexes.read();
1012
1013        for (i, (prop, value)) in conditions.iter().enumerate() {
1014            let key = PropertyKey::new(*prop);
1015            let hv = HashableValue::new(value.clone());
1016
1017            if let Some(index) = indexes.get(&key) {
1018                let matches: Vec<NodeId> = index
1019                    .get(&hv)
1020                    .map(|nodes| nodes.iter().copied().collect())
1021                    .unwrap_or_default();
1022
1023                // Short-circuit if any indexed condition has no matches
1024                if matches.is_empty() {
1025                    return Vec::new();
1026                }
1027
1028                // Use smallest indexed result as starting point
1029                if best_start
1030                    .as_ref()
1031                    .is_none_or(|(_, best)| matches.len() < best.len())
1032                {
1033                    best_start = Some((i, matches));
1034                }
1035            }
1036        }
1037        drop(indexes);
1038
1039        // Start from best indexed result or fall back to full node scan
1040        let (start_idx, mut candidates) = best_start.unwrap_or_else(|| {
1041            // No indexes available, start with first condition via full scan
1042            let (prop, value) = &conditions[0];
1043            (0, self.find_nodes_by_property(prop, value))
1044        });
1045
1046        // Filter candidates through remaining conditions
1047        for (i, (prop, value)) in conditions.iter().enumerate() {
1048            if i == start_idx {
1049                continue;
1050            }
1051
1052            let key = PropertyKey::new(*prop);
1053            candidates.retain(|&node_id| {
1054                self.node_properties
1055                    .get(node_id, &key)
1056                    .is_some_and(|v| v == *value)
1057            });
1058
1059            // Short-circuit if no candidates remain
1060            if candidates.is_empty() {
1061                return Vec::new();
1062            }
1063        }
1064
1065        candidates
1066    }
1067
1068    // === Property Index Operations ===
1069
1070    /// Creates an index on a node property for O(1) lookups by value.
1071    ///
1072    /// After creating an index, calls to [`Self::find_nodes_by_property`] will be
1073    /// O(1) instead of O(n) for this property. The index is automatically
1074    /// maintained when properties are set or removed.
1075    ///
1076    /// # Example
1077    ///
1078    /// ```
1079    /// use grafeo_core::graph::lpg::LpgStore;
1080    /// use grafeo_common::types::Value;
1081    ///
1082    /// let store = LpgStore::new();
1083    ///
1084    /// // Create nodes with an 'id' property
1085    /// let alice = store.create_node(&["Person"]);
1086    /// store.set_node_property(alice, "id", Value::from("alice_123"));
1087    ///
1088    /// // Create an index on the 'id' property
1089    /// store.create_property_index("id");
1090    ///
1091    /// // Now lookups by 'id' are O(1)
1092    /// let found = store.find_nodes_by_property("id", &Value::from("alice_123"));
1093    /// assert!(found.contains(&alice));
1094    /// ```
1095    pub fn create_property_index(&self, property: &str) {
1096        let key = PropertyKey::new(property);
1097
1098        let mut indexes = self.property_indexes.write();
1099        if indexes.contains_key(&key) {
1100            return; // Already indexed
1101        }
1102
1103        // Create the index and populate it with existing data
1104        let index: DashMap<HashableValue, FxHashSet<NodeId>> = DashMap::new();
1105
1106        // Scan all nodes to build the index
1107        for node_id in self.node_ids() {
1108            if let Some(value) = self.node_properties.get(node_id, &key) {
1109                let hv = HashableValue::new(value);
1110                index
1111                    .entry(hv)
1112                    .or_insert_with(FxHashSet::default)
1113                    .insert(node_id);
1114            }
1115        }
1116
1117        indexes.insert(key, index);
1118    }
1119
1120    /// Drops an index on a node property.
1121    ///
1122    /// Returns `true` if the index existed and was removed.
1123    pub fn drop_property_index(&self, property: &str) -> bool {
1124        let key = PropertyKey::new(property);
1125        self.property_indexes.write().remove(&key).is_some()
1126    }
1127
1128    /// Returns `true` if the property has an index.
1129    #[must_use]
1130    pub fn has_property_index(&self, property: &str) -> bool {
1131        let key = PropertyKey::new(property);
1132        self.property_indexes.read().contains_key(&key)
1133    }
1134
1135    /// Finds all nodes that have a specific property value.
1136    ///
1137    /// If the property is indexed, this is O(1). Otherwise, it scans all nodes
1138    /// which is O(n). Use [`Self::create_property_index`] for frequently queried properties.
1139    ///
1140    /// # Example
1141    ///
1142    /// ```
1143    /// use grafeo_core::graph::lpg::LpgStore;
1144    /// use grafeo_common::types::Value;
1145    ///
1146    /// let store = LpgStore::new();
1147    /// store.create_property_index("city"); // Optional but makes lookups fast
1148    ///
1149    /// let alice = store.create_node(&["Person"]);
1150    /// let bob = store.create_node(&["Person"]);
1151    /// store.set_node_property(alice, "city", Value::from("NYC"));
1152    /// store.set_node_property(bob, "city", Value::from("NYC"));
1153    ///
1154    /// let nyc_people = store.find_nodes_by_property("city", &Value::from("NYC"));
1155    /// assert_eq!(nyc_people.len(), 2);
1156    /// ```
1157    #[must_use]
1158    pub fn find_nodes_by_property(&self, property: &str, value: &Value) -> Vec<NodeId> {
1159        let key = PropertyKey::new(property);
1160        let hv = HashableValue::new(value.clone());
1161
1162        // Try indexed lookup first
1163        let indexes = self.property_indexes.read();
1164        if let Some(index) = indexes.get(&key) {
1165            if let Some(nodes) = index.get(&hv) {
1166                return nodes.iter().copied().collect();
1167            }
1168            return Vec::new();
1169        }
1170        drop(indexes);
1171
1172        // Fall back to full scan
1173        self.node_ids()
1174            .into_iter()
1175            .filter(|&node_id| {
1176                self.node_properties
1177                    .get(node_id, &key)
1178                    .is_some_and(|v| v == *value)
1179            })
1180            .collect()
1181    }
1182
1183    /// Updates property indexes when a property is set.
1184    fn update_property_index_on_set(&self, node_id: NodeId, key: &PropertyKey, new_value: &Value) {
1185        let indexes = self.property_indexes.read();
1186        if let Some(index) = indexes.get(key) {
1187            // Get old value to remove from index
1188            if let Some(old_value) = self.node_properties.get(node_id, key) {
1189                let old_hv = HashableValue::new(old_value);
1190                if let Some(mut nodes) = index.get_mut(&old_hv) {
1191                    nodes.remove(&node_id);
1192                    if nodes.is_empty() {
1193                        drop(nodes);
1194                        index.remove(&old_hv);
1195                    }
1196                }
1197            }
1198
1199            // Add new value to index
1200            let new_hv = HashableValue::new(new_value.clone());
1201            index
1202                .entry(new_hv)
1203                .or_insert_with(FxHashSet::default)
1204                .insert(node_id);
1205        }
1206    }
1207
1208    /// Updates property indexes when a property is removed.
1209    fn update_property_index_on_remove(&self, node_id: NodeId, key: &PropertyKey) {
1210        let indexes = self.property_indexes.read();
1211        if let Some(index) = indexes.get(key) {
1212            // Get old value to remove from index
1213            if let Some(old_value) = self.node_properties.get(node_id, key) {
1214                let old_hv = HashableValue::new(old_value);
1215                if let Some(mut nodes) = index.get_mut(&old_hv) {
1216                    nodes.remove(&node_id);
1217                    if nodes.is_empty() {
1218                        drop(nodes);
1219                        index.remove(&old_hv);
1220                    }
1221                }
1222            }
1223        }
1224    }
1225
1226    /// Adds a label to a node.
1227    ///
1228    /// Returns true if the label was added, false if the node doesn't exist
1229    /// or already has the label.
1230    #[cfg(not(feature = "tiered-storage"))]
1231    pub fn add_label(&self, node_id: NodeId, label: &str) -> bool {
1232        let epoch = self.current_epoch();
1233
1234        // Check if node exists
1235        let nodes = self.nodes.read();
1236        if let Some(chain) = nodes.get(&node_id) {
1237            if chain.visible_at(epoch).map_or(true, |r| r.is_deleted()) {
1238                return false;
1239            }
1240        } else {
1241            return false;
1242        }
1243        drop(nodes);
1244
1245        // Get or create label ID
1246        let label_id = self.get_or_create_label_id(label);
1247
1248        // Add to node_labels map
1249        let mut node_labels = self.node_labels.write();
1250        let label_set = node_labels
1251            .entry(node_id)
1252            .or_insert_with(FxHashSet::default);
1253
1254        if label_set.contains(&label_id) {
1255            return false; // Already has this label
1256        }
1257
1258        label_set.insert(label_id);
1259        drop(node_labels);
1260
1261        // Add to label_index
1262        let mut index = self.label_index.write();
1263        if (label_id as usize) >= index.len() {
1264            index.resize(label_id as usize + 1, FxHashMap::default());
1265        }
1266        index[label_id as usize].insert(node_id, ());
1267
1268        // Update label count in node record
1269        if let Some(chain) = self.nodes.write().get_mut(&node_id) {
1270            if let Some(record) = chain.latest_mut() {
1271                let count = self.node_labels.read().get(&node_id).map_or(0, |s| s.len());
1272                record.set_label_count(count as u16);
1273            }
1274        }
1275
1276        true
1277    }
1278
1279    /// Adds a label to a node.
1280    /// (Tiered storage version)
1281    #[cfg(feature = "tiered-storage")]
1282    pub fn add_label(&self, node_id: NodeId, label: &str) -> bool {
1283        let epoch = self.current_epoch();
1284
1285        // Check if node exists
1286        let versions = self.node_versions.read();
1287        if let Some(index) = versions.get(&node_id) {
1288            if let Some(vref) = index.visible_at(epoch) {
1289                if let Some(record) = self.read_node_record(&vref) {
1290                    if record.is_deleted() {
1291                        return false;
1292                    }
1293                } else {
1294                    return false;
1295                }
1296            } else {
1297                return false;
1298            }
1299        } else {
1300            return false;
1301        }
1302        drop(versions);
1303
1304        // Get or create label ID
1305        let label_id = self.get_or_create_label_id(label);
1306
1307        // Add to node_labels map
1308        let mut node_labels = self.node_labels.write();
1309        let label_set = node_labels
1310            .entry(node_id)
1311            .or_insert_with(FxHashSet::default);
1312
1313        if label_set.contains(&label_id) {
1314            return false; // Already has this label
1315        }
1316
1317        label_set.insert(label_id);
1318        drop(node_labels);
1319
1320        // Add to label_index
1321        let mut index = self.label_index.write();
1322        if (label_id as usize) >= index.len() {
1323            index.resize(label_id as usize + 1, FxHashMap::default());
1324        }
1325        index[label_id as usize].insert(node_id, ());
1326
1327        // Note: label_count in record is not updated for tiered storage.
1328        // The record is immutable once allocated in the arena.
1329
1330        true
1331    }
1332
1333    /// Removes a label from a node.
1334    ///
1335    /// Returns true if the label was removed, false if the node doesn't exist
1336    /// or doesn't have the label.
1337    #[cfg(not(feature = "tiered-storage"))]
1338    pub fn remove_label(&self, node_id: NodeId, label: &str) -> bool {
1339        let epoch = self.current_epoch();
1340
1341        // Check if node exists
1342        let nodes = self.nodes.read();
1343        if let Some(chain) = nodes.get(&node_id) {
1344            if chain.visible_at(epoch).map_or(true, |r| r.is_deleted()) {
1345                return false;
1346            }
1347        } else {
1348            return false;
1349        }
1350        drop(nodes);
1351
1352        // Get label ID
1353        let label_id = {
1354            let label_ids = self.label_to_id.read();
1355            match label_ids.get(label) {
1356                Some(&id) => id,
1357                None => return false, // Label doesn't exist
1358            }
1359        };
1360
1361        // Remove from node_labels map
1362        let mut node_labels = self.node_labels.write();
1363        if let Some(label_set) = node_labels.get_mut(&node_id) {
1364            if !label_set.remove(&label_id) {
1365                return false; // Node doesn't have this label
1366            }
1367        } else {
1368            return false;
1369        }
1370        drop(node_labels);
1371
1372        // Remove from label_index
1373        let mut index = self.label_index.write();
1374        if (label_id as usize) < index.len() {
1375            index[label_id as usize].remove(&node_id);
1376        }
1377
1378        // Update label count in node record
1379        if let Some(chain) = self.nodes.write().get_mut(&node_id) {
1380            if let Some(record) = chain.latest_mut() {
1381                let count = self.node_labels.read().get(&node_id).map_or(0, |s| s.len());
1382                record.set_label_count(count as u16);
1383            }
1384        }
1385
1386        true
1387    }
1388
1389    /// Removes a label from a node.
1390    /// (Tiered storage version)
1391    #[cfg(feature = "tiered-storage")]
1392    pub fn remove_label(&self, node_id: NodeId, label: &str) -> bool {
1393        let epoch = self.current_epoch();
1394
1395        // Check if node exists
1396        let versions = self.node_versions.read();
1397        if let Some(index) = versions.get(&node_id) {
1398            if let Some(vref) = index.visible_at(epoch) {
1399                if let Some(record) = self.read_node_record(&vref) {
1400                    if record.is_deleted() {
1401                        return false;
1402                    }
1403                } else {
1404                    return false;
1405                }
1406            } else {
1407                return false;
1408            }
1409        } else {
1410            return false;
1411        }
1412        drop(versions);
1413
1414        // Get label ID
1415        let label_id = {
1416            let label_ids = self.label_to_id.read();
1417            match label_ids.get(label) {
1418                Some(&id) => id,
1419                None => return false, // Label doesn't exist
1420            }
1421        };
1422
1423        // Remove from node_labels map
1424        let mut node_labels = self.node_labels.write();
1425        if let Some(label_set) = node_labels.get_mut(&node_id) {
1426            if !label_set.remove(&label_id) {
1427                return false; // Node doesn't have this label
1428            }
1429        } else {
1430            return false;
1431        }
1432        drop(node_labels);
1433
1434        // Remove from label_index
1435        let mut index = self.label_index.write();
1436        if (label_id as usize) < index.len() {
1437            index[label_id as usize].remove(&node_id);
1438        }
1439
1440        // Note: label_count in record is not updated for tiered storage.
1441
1442        true
1443    }
1444
1445    /// Returns the number of nodes (non-deleted at current epoch).
1446    #[must_use]
1447    #[cfg(not(feature = "tiered-storage"))]
1448    pub fn node_count(&self) -> usize {
1449        let epoch = self.current_epoch();
1450        self.nodes
1451            .read()
1452            .values()
1453            .filter_map(|chain| chain.visible_at(epoch))
1454            .filter(|r| !r.is_deleted())
1455            .count()
1456    }
1457
1458    /// Returns the number of nodes (non-deleted at current epoch).
1459    /// (Tiered storage version)
1460    #[must_use]
1461    #[cfg(feature = "tiered-storage")]
1462    pub fn node_count(&self) -> usize {
1463        let epoch = self.current_epoch();
1464        let versions = self.node_versions.read();
1465        versions
1466            .iter()
1467            .filter(|(_, index)| {
1468                index.visible_at(epoch).map_or(false, |vref| {
1469                    self.read_node_record(&vref)
1470                        .map_or(false, |r| !r.is_deleted())
1471                })
1472            })
1473            .count()
1474    }
1475
1476    /// Returns all node IDs in the store.
1477    ///
1478    /// This returns a snapshot of current node IDs. The returned vector
1479    /// excludes deleted nodes. Results are sorted by NodeId for deterministic
1480    /// iteration order.
1481    #[must_use]
1482    #[cfg(not(feature = "tiered-storage"))]
1483    pub fn node_ids(&self) -> Vec<NodeId> {
1484        let epoch = self.current_epoch();
1485        let mut ids: Vec<NodeId> = self
1486            .nodes
1487            .read()
1488            .iter()
1489            .filter_map(|(id, chain)| {
1490                chain
1491                    .visible_at(epoch)
1492                    .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
1493            })
1494            .collect();
1495        ids.sort_unstable();
1496        ids
1497    }
1498
1499    /// Returns all node IDs in the store.
1500    /// (Tiered storage version)
1501    #[must_use]
1502    #[cfg(feature = "tiered-storage")]
1503    pub fn node_ids(&self) -> Vec<NodeId> {
1504        let epoch = self.current_epoch();
1505        let versions = self.node_versions.read();
1506        let mut ids: Vec<NodeId> = versions
1507            .iter()
1508            .filter_map(|(id, index)| {
1509                index.visible_at(epoch).and_then(|vref| {
1510                    self.read_node_record(&vref)
1511                        .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
1512                })
1513            })
1514            .collect();
1515        ids.sort_unstable();
1516        ids
1517    }
1518
1519    // === Edge Operations ===
1520
1521    /// Creates a new edge.
1522    pub fn create_edge(&self, src: NodeId, dst: NodeId, edge_type: &str) -> EdgeId {
1523        self.create_edge_versioned(src, dst, edge_type, self.current_epoch(), TxId::SYSTEM)
1524    }
1525
1526    /// Creates a new edge within a transaction context.
1527    #[cfg(not(feature = "tiered-storage"))]
1528    pub fn create_edge_versioned(
1529        &self,
1530        src: NodeId,
1531        dst: NodeId,
1532        edge_type: &str,
1533        epoch: EpochId,
1534        tx_id: TxId,
1535    ) -> EdgeId {
1536        let id = EdgeId::new(self.next_edge_id.fetch_add(1, Ordering::Relaxed));
1537        let type_id = self.get_or_create_edge_type_id(edge_type);
1538
1539        let record = EdgeRecord::new(id, src, dst, type_id, epoch);
1540        let chain = VersionChain::with_initial(record, epoch, tx_id);
1541        self.edges.write().insert(id, chain);
1542
1543        // Update adjacency
1544        self.forward_adj.add_edge(src, dst, id);
1545        if let Some(ref backward) = self.backward_adj {
1546            backward.add_edge(dst, src, id);
1547        }
1548
1549        id
1550    }
1551
1552    /// Creates a new edge within a transaction context.
1553    /// (Tiered storage version)
1554    #[cfg(feature = "tiered-storage")]
1555    pub fn create_edge_versioned(
1556        &self,
1557        src: NodeId,
1558        dst: NodeId,
1559        edge_type: &str,
1560        epoch: EpochId,
1561        tx_id: TxId,
1562    ) -> EdgeId {
1563        let id = EdgeId::new(self.next_edge_id.fetch_add(1, Ordering::Relaxed));
1564        let type_id = self.get_or_create_edge_type_id(edge_type);
1565
1566        let record = EdgeRecord::new(id, src, dst, type_id, epoch);
1567
1568        // Allocate record in arena and get offset (create epoch if needed)
1569        let arena = self.arena_allocator.arena_or_create(epoch);
1570        let (offset, _stored) = arena.alloc_value_with_offset(record);
1571
1572        // Create HotVersionRef pointing to arena data
1573        let hot_ref = HotVersionRef::new(epoch, offset, tx_id);
1574
1575        // Create or update version index
1576        let mut versions = self.edge_versions.write();
1577        if let Some(index) = versions.get_mut(&id) {
1578            index.add_hot(hot_ref);
1579        } else {
1580            versions.insert(id, VersionIndex::with_initial(hot_ref));
1581        }
1582
1583        // Update adjacency
1584        self.forward_adj.add_edge(src, dst, id);
1585        if let Some(ref backward) = self.backward_adj {
1586            backward.add_edge(dst, src, id);
1587        }
1588
1589        id
1590    }
1591
1592    /// Creates a new edge with properties.
1593    pub fn create_edge_with_props(
1594        &self,
1595        src: NodeId,
1596        dst: NodeId,
1597        edge_type: &str,
1598        properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
1599    ) -> EdgeId {
1600        let id = self.create_edge(src, dst, edge_type);
1601
1602        for (key, value) in properties {
1603            self.edge_properties.set(id, key.into(), value.into());
1604        }
1605
1606        id
1607    }
1608
1609    /// Gets an edge by ID (latest visible version).
1610    #[must_use]
1611    pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
1612        self.get_edge_at_epoch(id, self.current_epoch())
1613    }
1614
1615    /// Gets an edge by ID at a specific epoch.
1616    #[must_use]
1617    #[cfg(not(feature = "tiered-storage"))]
1618    pub fn get_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> Option<Edge> {
1619        let edges = self.edges.read();
1620        let chain = edges.get(&id)?;
1621        let record = chain.visible_at(epoch)?;
1622
1623        if record.is_deleted() {
1624            return None;
1625        }
1626
1627        let edge_type = {
1628            let id_to_type = self.id_to_edge_type.read();
1629            id_to_type.get(record.type_id as usize)?.clone()
1630        };
1631
1632        let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1633
1634        // Get properties
1635        edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1636
1637        Some(edge)
1638    }
1639
1640    /// Gets an edge by ID at a specific epoch.
1641    /// (Tiered storage version)
1642    #[must_use]
1643    #[cfg(feature = "tiered-storage")]
1644    pub fn get_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> Option<Edge> {
1645        let versions = self.edge_versions.read();
1646        let index = versions.get(&id)?;
1647        let version_ref = index.visible_at(epoch)?;
1648
1649        let record = self.read_edge_record(&version_ref)?;
1650
1651        if record.is_deleted() {
1652            return None;
1653        }
1654
1655        let edge_type = {
1656            let id_to_type = self.id_to_edge_type.read();
1657            id_to_type.get(record.type_id as usize)?.clone()
1658        };
1659
1660        let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1661
1662        // Get properties
1663        edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1664
1665        Some(edge)
1666    }
1667
1668    /// Gets an edge visible to a specific transaction.
1669    #[must_use]
1670    #[cfg(not(feature = "tiered-storage"))]
1671    pub fn get_edge_versioned(&self, id: EdgeId, epoch: EpochId, tx_id: TxId) -> Option<Edge> {
1672        let edges = self.edges.read();
1673        let chain = edges.get(&id)?;
1674        let record = chain.visible_to(epoch, tx_id)?;
1675
1676        if record.is_deleted() {
1677            return None;
1678        }
1679
1680        let edge_type = {
1681            let id_to_type = self.id_to_edge_type.read();
1682            id_to_type.get(record.type_id as usize)?.clone()
1683        };
1684
1685        let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1686
1687        // Get properties
1688        edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1689
1690        Some(edge)
1691    }
1692
1693    /// Gets an edge visible to a specific transaction.
1694    /// (Tiered storage version)
1695    #[must_use]
1696    #[cfg(feature = "tiered-storage")]
1697    pub fn get_edge_versioned(&self, id: EdgeId, epoch: EpochId, tx_id: TxId) -> Option<Edge> {
1698        let versions = self.edge_versions.read();
1699        let index = versions.get(&id)?;
1700        let version_ref = index.visible_to(epoch, tx_id)?;
1701
1702        let record = self.read_edge_record(&version_ref)?;
1703
1704        if record.is_deleted() {
1705            return None;
1706        }
1707
1708        let edge_type = {
1709            let id_to_type = self.id_to_edge_type.read();
1710            id_to_type.get(record.type_id as usize)?.clone()
1711        };
1712
1713        let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1714
1715        // Get properties
1716        edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1717
1718        Some(edge)
1719    }
1720
1721    /// Reads an EdgeRecord from arena using a VersionRef.
1722    #[cfg(feature = "tiered-storage")]
1723    #[allow(unsafe_code)]
1724    fn read_edge_record(&self, version_ref: &VersionRef) -> Option<EdgeRecord> {
1725        match version_ref {
1726            VersionRef::Hot(hot_ref) => {
1727                let arena = self.arena_allocator.arena(hot_ref.epoch);
1728                // SAFETY: The offset was returned by alloc_value_with_offset for an EdgeRecord
1729                let record: &EdgeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
1730                Some(record.clone())
1731            }
1732            VersionRef::Cold(cold_ref) => {
1733                // Read from compressed epoch store
1734                self.epoch_store
1735                    .get_edge(cold_ref.epoch, cold_ref.block_offset, cold_ref.length)
1736            }
1737        }
1738    }
1739
1740    /// Deletes an edge (using latest epoch).
1741    pub fn delete_edge(&self, id: EdgeId) -> bool {
1742        self.delete_edge_at_epoch(id, self.current_epoch())
1743    }
1744
1745    /// Deletes an edge at a specific epoch.
1746    #[cfg(not(feature = "tiered-storage"))]
1747    pub fn delete_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> bool {
1748        let mut edges = self.edges.write();
1749        if let Some(chain) = edges.get_mut(&id) {
1750            // Get the visible record to check if deleted and get src/dst
1751            let (src, dst) = {
1752                match chain.visible_at(epoch) {
1753                    Some(record) => {
1754                        if record.is_deleted() {
1755                            return false;
1756                        }
1757                        (record.src, record.dst)
1758                    }
1759                    None => return false, // Not visible at this epoch (already deleted)
1760                }
1761            };
1762
1763            // Mark the version chain as deleted
1764            chain.mark_deleted(epoch);
1765
1766            drop(edges); // Release lock
1767
1768            // Mark as deleted in adjacency (soft delete)
1769            self.forward_adj.mark_deleted(src, id);
1770            if let Some(ref backward) = self.backward_adj {
1771                backward.mark_deleted(dst, id);
1772            }
1773
1774            // Remove properties
1775            self.edge_properties.remove_all(id);
1776
1777            true
1778        } else {
1779            false
1780        }
1781    }
1782
1783    /// Deletes an edge at a specific epoch.
1784    /// (Tiered storage version)
1785    #[cfg(feature = "tiered-storage")]
1786    pub fn delete_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> bool {
1787        let mut versions = self.edge_versions.write();
1788        if let Some(index) = versions.get_mut(&id) {
1789            // Get the visible record to check if deleted and get src/dst
1790            let (src, dst) = {
1791                match index.visible_at(epoch) {
1792                    Some(version_ref) => {
1793                        if let Some(record) = self.read_edge_record(&version_ref) {
1794                            if record.is_deleted() {
1795                                return false;
1796                            }
1797                            (record.src, record.dst)
1798                        } else {
1799                            return false;
1800                        }
1801                    }
1802                    None => return false,
1803                }
1804            };
1805
1806            // Mark as deleted in version index
1807            index.mark_deleted(epoch);
1808
1809            drop(versions); // Release lock
1810
1811            // Mark as deleted in adjacency (soft delete)
1812            self.forward_adj.mark_deleted(src, id);
1813            if let Some(ref backward) = self.backward_adj {
1814                backward.mark_deleted(dst, id);
1815            }
1816
1817            // Remove properties
1818            self.edge_properties.remove_all(id);
1819
1820            true
1821        } else {
1822            false
1823        }
1824    }
1825
1826    /// Returns the number of edges (non-deleted at current epoch).
1827    #[must_use]
1828    #[cfg(not(feature = "tiered-storage"))]
1829    pub fn edge_count(&self) -> usize {
1830        let epoch = self.current_epoch();
1831        self.edges
1832            .read()
1833            .values()
1834            .filter_map(|chain| chain.visible_at(epoch))
1835            .filter(|r| !r.is_deleted())
1836            .count()
1837    }
1838
1839    /// Returns the number of edges (non-deleted at current epoch).
1840    /// (Tiered storage version)
1841    #[must_use]
1842    #[cfg(feature = "tiered-storage")]
1843    pub fn edge_count(&self) -> usize {
1844        let epoch = self.current_epoch();
1845        let versions = self.edge_versions.read();
1846        versions
1847            .iter()
1848            .filter(|(_, index)| {
1849                index.visible_at(epoch).map_or(false, |vref| {
1850                    self.read_edge_record(&vref)
1851                        .map_or(false, |r| !r.is_deleted())
1852                })
1853            })
1854            .count()
1855    }
1856
1857    /// Discards all uncommitted versions created by a transaction.
1858    ///
1859    /// This is called during transaction rollback to clean up uncommitted changes.
1860    /// The method removes version chain entries created by the specified transaction.
1861    #[cfg(not(feature = "tiered-storage"))]
1862    pub fn discard_uncommitted_versions(&self, tx_id: TxId) {
1863        // Remove uncommitted node versions
1864        {
1865            let mut nodes = self.nodes.write();
1866            for chain in nodes.values_mut() {
1867                chain.remove_versions_by(tx_id);
1868            }
1869            // Remove completely empty chains (no versions left)
1870            nodes.retain(|_, chain| !chain.is_empty());
1871        }
1872
1873        // Remove uncommitted edge versions
1874        {
1875            let mut edges = self.edges.write();
1876            for chain in edges.values_mut() {
1877                chain.remove_versions_by(tx_id);
1878            }
1879            // Remove completely empty chains (no versions left)
1880            edges.retain(|_, chain| !chain.is_empty());
1881        }
1882    }
1883
1884    /// Discards all uncommitted versions created by a transaction.
1885    /// (Tiered storage version)
1886    #[cfg(feature = "tiered-storage")]
1887    pub fn discard_uncommitted_versions(&self, tx_id: TxId) {
1888        // Remove uncommitted node versions
1889        {
1890            let mut versions = self.node_versions.write();
1891            for index in versions.values_mut() {
1892                index.remove_versions_by(tx_id);
1893            }
1894            // Remove completely empty indexes (no versions left)
1895            versions.retain(|_, index| !index.is_empty());
1896        }
1897
1898        // Remove uncommitted edge versions
1899        {
1900            let mut versions = self.edge_versions.write();
1901            for index in versions.values_mut() {
1902                index.remove_versions_by(tx_id);
1903            }
1904            // Remove completely empty indexes (no versions left)
1905            versions.retain(|_, index| !index.is_empty());
1906        }
1907    }
1908
1909    /// Freezes an epoch from hot (arena) storage to cold (compressed) storage.
1910    ///
1911    /// This is called by the transaction manager when an epoch becomes eligible
1912    /// for freezing (no active transactions can see it). The freeze process:
1913    ///
1914    /// 1. Collects all hot version refs for the epoch
1915    /// 2. Reads the corresponding records from arena
1916    /// 3. Compresses them into a `CompressedEpochBlock`
1917    /// 4. Updates `VersionIndex` entries to point to cold storage
1918    /// 5. The arena can be deallocated after all epochs in it are frozen
1919    ///
1920    /// # Arguments
1921    ///
1922    /// * `epoch` - The epoch to freeze
1923    ///
1924    /// # Returns
1925    ///
1926    /// The number of records frozen (nodes + edges).
1927    #[cfg(feature = "tiered-storage")]
1928    #[allow(unsafe_code)]
1929    pub fn freeze_epoch(&self, epoch: EpochId) -> usize {
1930        // Collect node records to freeze
1931        let mut node_records: Vec<(u64, NodeRecord)> = Vec::new();
1932        let mut node_hot_refs: Vec<(NodeId, HotVersionRef)> = Vec::new();
1933
1934        {
1935            let versions = self.node_versions.read();
1936            for (node_id, index) in versions.iter() {
1937                for hot_ref in index.hot_refs_for_epoch(epoch) {
1938                    let arena = self.arena_allocator.arena(hot_ref.epoch);
1939                    // SAFETY: The offset was returned by alloc_value_with_offset for a NodeRecord
1940                    let record: &NodeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
1941                    node_records.push((node_id.as_u64(), record.clone()));
1942                    node_hot_refs.push((*node_id, *hot_ref));
1943                }
1944            }
1945        }
1946
1947        // Collect edge records to freeze
1948        let mut edge_records: Vec<(u64, EdgeRecord)> = Vec::new();
1949        let mut edge_hot_refs: Vec<(EdgeId, HotVersionRef)> = Vec::new();
1950
1951        {
1952            let versions = self.edge_versions.read();
1953            for (edge_id, index) in versions.iter() {
1954                for hot_ref in index.hot_refs_for_epoch(epoch) {
1955                    let arena = self.arena_allocator.arena(hot_ref.epoch);
1956                    // SAFETY: The offset was returned by alloc_value_with_offset for an EdgeRecord
1957                    let record: &EdgeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
1958                    edge_records.push((edge_id.as_u64(), record.clone()));
1959                    edge_hot_refs.push((*edge_id, *hot_ref));
1960                }
1961            }
1962        }
1963
1964        let total_frozen = node_records.len() + edge_records.len();
1965
1966        if total_frozen == 0 {
1967            return 0;
1968        }
1969
1970        // Freeze to compressed storage
1971        let (node_entries, edge_entries) =
1972            self.epoch_store
1973                .freeze_epoch(epoch, node_records, edge_records);
1974
1975        // Build lookup maps for index entries
1976        let node_entry_map: FxHashMap<u64, _> = node_entries
1977            .iter()
1978            .map(|e| (e.entity_id, (e.offset, e.length)))
1979            .collect();
1980        let edge_entry_map: FxHashMap<u64, _> = edge_entries
1981            .iter()
1982            .map(|e| (e.entity_id, (e.offset, e.length)))
1983            .collect();
1984
1985        // Update version indexes to use cold refs
1986        {
1987            let mut versions = self.node_versions.write();
1988            for (node_id, hot_ref) in &node_hot_refs {
1989                if let Some(index) = versions.get_mut(node_id) {
1990                    if let Some(&(offset, length)) = node_entry_map.get(&node_id.as_u64()) {
1991                        let cold_ref = ColdVersionRef {
1992                            epoch,
1993                            block_offset: offset,
1994                            length,
1995                            created_by: hot_ref.created_by,
1996                            deleted_epoch: hot_ref.deleted_epoch,
1997                        };
1998                        index.freeze_epoch(epoch, std::iter::once(cold_ref));
1999                    }
2000                }
2001            }
2002        }
2003
2004        {
2005            let mut versions = self.edge_versions.write();
2006            for (edge_id, hot_ref) in &edge_hot_refs {
2007                if let Some(index) = versions.get_mut(edge_id) {
2008                    if let Some(&(offset, length)) = edge_entry_map.get(&edge_id.as_u64()) {
2009                        let cold_ref = ColdVersionRef {
2010                            epoch,
2011                            block_offset: offset,
2012                            length,
2013                            created_by: hot_ref.created_by,
2014                            deleted_epoch: hot_ref.deleted_epoch,
2015                        };
2016                        index.freeze_epoch(epoch, std::iter::once(cold_ref));
2017                    }
2018                }
2019            }
2020        }
2021
2022        total_frozen
2023    }
2024
2025    /// Returns the epoch store for cold storage statistics.
2026    #[cfg(feature = "tiered-storage")]
2027    #[must_use]
2028    pub fn epoch_store(&self) -> &EpochStore {
2029        &self.epoch_store
2030    }
2031
2032    /// Returns the number of distinct labels in the store.
2033    #[must_use]
2034    pub fn label_count(&self) -> usize {
2035        self.id_to_label.read().len()
2036    }
2037
2038    /// Returns the number of distinct property keys in the store.
2039    ///
2040    /// This counts unique property keys across both nodes and edges.
2041    #[must_use]
2042    pub fn property_key_count(&self) -> usize {
2043        let node_keys = self.node_properties.column_count();
2044        let edge_keys = self.edge_properties.column_count();
2045        // Note: This may count some keys twice if the same key is used
2046        // for both nodes and edges. A more precise count would require
2047        // tracking unique keys across both storages.
2048        node_keys + edge_keys
2049    }
2050
2051    /// Returns the number of distinct edge types in the store.
2052    #[must_use]
2053    pub fn edge_type_count(&self) -> usize {
2054        self.id_to_edge_type.read().len()
2055    }
2056
2057    // === Traversal ===
2058
2059    /// Iterates over neighbors of a node in the specified direction.
2060    ///
2061    /// This is the fast path for graph traversal - goes straight to the
2062    /// adjacency index without loading full node data.
2063    pub fn neighbors(
2064        &self,
2065        node: NodeId,
2066        direction: Direction,
2067    ) -> impl Iterator<Item = NodeId> + '_ {
2068        let forward: Box<dyn Iterator<Item = NodeId>> = match direction {
2069            Direction::Outgoing | Direction::Both => {
2070                Box::new(self.forward_adj.neighbors(node).into_iter())
2071            }
2072            Direction::Incoming => Box::new(std::iter::empty()),
2073        };
2074
2075        let backward: Box<dyn Iterator<Item = NodeId>> = match direction {
2076            Direction::Incoming | Direction::Both => {
2077                if let Some(ref adj) = self.backward_adj {
2078                    Box::new(adj.neighbors(node).into_iter())
2079                } else {
2080                    Box::new(std::iter::empty())
2081                }
2082            }
2083            Direction::Outgoing => Box::new(std::iter::empty()),
2084        };
2085
2086        forward.chain(backward)
2087    }
2088
2089    /// Returns edges from a node with their targets.
2090    ///
2091    /// Returns an iterator of (target_node, edge_id) pairs.
2092    pub fn edges_from(
2093        &self,
2094        node: NodeId,
2095        direction: Direction,
2096    ) -> impl Iterator<Item = (NodeId, EdgeId)> + '_ {
2097        let forward: Box<dyn Iterator<Item = (NodeId, EdgeId)>> = match direction {
2098            Direction::Outgoing | Direction::Both => {
2099                Box::new(self.forward_adj.edges_from(node).into_iter())
2100            }
2101            Direction::Incoming => Box::new(std::iter::empty()),
2102        };
2103
2104        let backward: Box<dyn Iterator<Item = (NodeId, EdgeId)>> = match direction {
2105            Direction::Incoming | Direction::Both => {
2106                if let Some(ref adj) = self.backward_adj {
2107                    Box::new(adj.edges_from(node).into_iter())
2108                } else {
2109                    Box::new(std::iter::empty())
2110                }
2111            }
2112            Direction::Outgoing => Box::new(std::iter::empty()),
2113        };
2114
2115        forward.chain(backward)
2116    }
2117
2118    /// Returns edges to a node (where the node is the destination).
2119    ///
2120    /// Returns (source_node, edge_id) pairs for all edges pointing TO this node.
2121    /// Uses the backward adjacency index for O(degree) lookup.
2122    ///
2123    /// # Example
2124    ///
2125    /// ```ignore
2126    /// // For edges: A->B, C->B
2127    /// let incoming = store.edges_to(B);
2128    /// // Returns: [(A, edge1), (C, edge2)]
2129    /// ```
2130    pub fn edges_to(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
2131        if let Some(ref backward) = self.backward_adj {
2132            backward.edges_from(node)
2133        } else {
2134            // Fallback: scan all edges (slow but correct)
2135            self.all_edges()
2136                .filter_map(|edge| {
2137                    if edge.dst == node {
2138                        Some((edge.src, edge.id))
2139                    } else {
2140                        None
2141                    }
2142                })
2143                .collect()
2144        }
2145    }
2146
2147    /// Returns the out-degree of a node (number of outgoing edges).
2148    ///
2149    /// Uses the forward adjacency index for O(1) lookup.
2150    #[must_use]
2151    pub fn out_degree(&self, node: NodeId) -> usize {
2152        self.forward_adj.out_degree(node)
2153    }
2154
2155    /// Returns the in-degree of a node (number of incoming edges).
2156    ///
2157    /// Uses the backward adjacency index for O(1) lookup if available,
2158    /// otherwise falls back to scanning edges.
2159    #[must_use]
2160    pub fn in_degree(&self, node: NodeId) -> usize {
2161        if let Some(ref backward) = self.backward_adj {
2162            backward.in_degree(node)
2163        } else {
2164            // Fallback: count edges (slow)
2165            self.all_edges().filter(|edge| edge.dst == node).count()
2166        }
2167    }
2168
2169    /// Gets the type of an edge by ID.
2170    #[must_use]
2171    #[cfg(not(feature = "tiered-storage"))]
2172    pub fn edge_type(&self, id: EdgeId) -> Option<Arc<str>> {
2173        let edges = self.edges.read();
2174        let chain = edges.get(&id)?;
2175        let epoch = self.current_epoch();
2176        let record = chain.visible_at(epoch)?;
2177        let id_to_type = self.id_to_edge_type.read();
2178        id_to_type.get(record.type_id as usize).cloned()
2179    }
2180
2181    /// Gets the type of an edge by ID.
2182    /// (Tiered storage version)
2183    #[must_use]
2184    #[cfg(feature = "tiered-storage")]
2185    pub fn edge_type(&self, id: EdgeId) -> Option<Arc<str>> {
2186        let versions = self.edge_versions.read();
2187        let index = versions.get(&id)?;
2188        let epoch = self.current_epoch();
2189        let vref = index.visible_at(epoch)?;
2190        let record = self.read_edge_record(&vref)?;
2191        let id_to_type = self.id_to_edge_type.read();
2192        id_to_type.get(record.type_id as usize).cloned()
2193    }
2194
2195    /// Returns all nodes with a specific label.
2196    ///
2197    /// Uses the label index for O(1) lookup per label. Returns a snapshot -
2198    /// concurrent modifications won't affect the returned vector. Results are
2199    /// sorted by NodeId for deterministic iteration order.
2200    pub fn nodes_by_label(&self, label: &str) -> Vec<NodeId> {
2201        let label_to_id = self.label_to_id.read();
2202        if let Some(&label_id) = label_to_id.get(label) {
2203            let index = self.label_index.read();
2204            if let Some(set) = index.get(label_id as usize) {
2205                let mut ids: Vec<NodeId> = set.keys().copied().collect();
2206                ids.sort_unstable();
2207                return ids;
2208            }
2209        }
2210        Vec::new()
2211    }
2212
2213    // === Admin API: Iteration ===
2214
2215    /// Returns an iterator over all nodes in the database.
2216    ///
2217    /// This creates a snapshot of all visible nodes at the current epoch.
2218    /// Useful for dump/export operations.
2219    #[cfg(not(feature = "tiered-storage"))]
2220    pub fn all_nodes(&self) -> impl Iterator<Item = Node> + '_ {
2221        let epoch = self.current_epoch();
2222        let node_ids: Vec<NodeId> = self
2223            .nodes
2224            .read()
2225            .iter()
2226            .filter_map(|(id, chain)| {
2227                chain
2228                    .visible_at(epoch)
2229                    .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
2230            })
2231            .collect();
2232
2233        node_ids.into_iter().filter_map(move |id| self.get_node(id))
2234    }
2235
2236    /// Returns an iterator over all nodes in the database.
2237    /// (Tiered storage version)
2238    #[cfg(feature = "tiered-storage")]
2239    pub fn all_nodes(&self) -> impl Iterator<Item = Node> + '_ {
2240        let node_ids = self.node_ids();
2241        node_ids.into_iter().filter_map(move |id| self.get_node(id))
2242    }
2243
2244    /// Returns an iterator over all edges in the database.
2245    ///
2246    /// This creates a snapshot of all visible edges at the current epoch.
2247    /// Useful for dump/export operations.
2248    #[cfg(not(feature = "tiered-storage"))]
2249    pub fn all_edges(&self) -> impl Iterator<Item = Edge> + '_ {
2250        let epoch = self.current_epoch();
2251        let edge_ids: Vec<EdgeId> = self
2252            .edges
2253            .read()
2254            .iter()
2255            .filter_map(|(id, chain)| {
2256                chain
2257                    .visible_at(epoch)
2258                    .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
2259            })
2260            .collect();
2261
2262        edge_ids.into_iter().filter_map(move |id| self.get_edge(id))
2263    }
2264
2265    /// Returns an iterator over all edges in the database.
2266    /// (Tiered storage version)
2267    #[cfg(feature = "tiered-storage")]
2268    pub fn all_edges(&self) -> impl Iterator<Item = Edge> + '_ {
2269        let epoch = self.current_epoch();
2270        let versions = self.edge_versions.read();
2271        let edge_ids: Vec<EdgeId> = versions
2272            .iter()
2273            .filter_map(|(id, index)| {
2274                index.visible_at(epoch).and_then(|vref| {
2275                    self.read_edge_record(&vref)
2276                        .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
2277                })
2278            })
2279            .collect();
2280
2281        edge_ids.into_iter().filter_map(move |id| self.get_edge(id))
2282    }
2283
2284    /// Returns all label names in the database.
2285    pub fn all_labels(&self) -> Vec<String> {
2286        self.id_to_label
2287            .read()
2288            .iter()
2289            .map(|s| s.to_string())
2290            .collect()
2291    }
2292
2293    /// Returns all edge type names in the database.
2294    pub fn all_edge_types(&self) -> Vec<String> {
2295        self.id_to_edge_type
2296            .read()
2297            .iter()
2298            .map(|s| s.to_string())
2299            .collect()
2300    }
2301
2302    /// Returns all property keys used in the database.
2303    pub fn all_property_keys(&self) -> Vec<String> {
2304        let mut keys = std::collections::HashSet::new();
2305        for key in self.node_properties.keys() {
2306            keys.insert(key.to_string());
2307        }
2308        for key in self.edge_properties.keys() {
2309            keys.insert(key.to_string());
2310        }
2311        keys.into_iter().collect()
2312    }
2313
2314    /// Returns an iterator over nodes with a specific label.
2315    pub fn nodes_with_label<'a>(&'a self, label: &str) -> impl Iterator<Item = Node> + 'a {
2316        let node_ids = self.nodes_by_label(label);
2317        node_ids.into_iter().filter_map(move |id| self.get_node(id))
2318    }
2319
2320    /// Returns an iterator over edges with a specific type.
2321    #[cfg(not(feature = "tiered-storage"))]
2322    pub fn edges_with_type<'a>(&'a self, edge_type: &str) -> impl Iterator<Item = Edge> + 'a {
2323        let epoch = self.current_epoch();
2324        let type_to_id = self.edge_type_to_id.read();
2325
2326        if let Some(&type_id) = type_to_id.get(edge_type) {
2327            let edge_ids: Vec<EdgeId> = self
2328                .edges
2329                .read()
2330                .iter()
2331                .filter_map(|(id, chain)| {
2332                    chain.visible_at(epoch).and_then(|r| {
2333                        if !r.is_deleted() && r.type_id == type_id {
2334                            Some(*id)
2335                        } else {
2336                            None
2337                        }
2338                    })
2339                })
2340                .collect();
2341
2342            // Return a boxed iterator for the found edges
2343            Box::new(edge_ids.into_iter().filter_map(move |id| self.get_edge(id)))
2344                as Box<dyn Iterator<Item = Edge> + 'a>
2345        } else {
2346            // Return empty iterator
2347            Box::new(std::iter::empty()) as Box<dyn Iterator<Item = Edge> + 'a>
2348        }
2349    }
2350
2351    /// Returns an iterator over edges with a specific type.
2352    /// (Tiered storage version)
2353    #[cfg(feature = "tiered-storage")]
2354    pub fn edges_with_type<'a>(&'a self, edge_type: &str) -> impl Iterator<Item = Edge> + 'a {
2355        let epoch = self.current_epoch();
2356        let type_to_id = self.edge_type_to_id.read();
2357
2358        if let Some(&type_id) = type_to_id.get(edge_type) {
2359            let versions = self.edge_versions.read();
2360            let edge_ids: Vec<EdgeId> = versions
2361                .iter()
2362                .filter_map(|(id, index)| {
2363                    index.visible_at(epoch).and_then(|vref| {
2364                        self.read_edge_record(&vref).and_then(|r| {
2365                            if !r.is_deleted() && r.type_id == type_id {
2366                                Some(*id)
2367                            } else {
2368                                None
2369                            }
2370                        })
2371                    })
2372                })
2373                .collect();
2374
2375            Box::new(edge_ids.into_iter().filter_map(move |id| self.get_edge(id)))
2376                as Box<dyn Iterator<Item = Edge> + 'a>
2377        } else {
2378            Box::new(std::iter::empty()) as Box<dyn Iterator<Item = Edge> + 'a>
2379        }
2380    }
2381
2382    // === Zone Map Support ===
2383
2384    /// Checks if a node property predicate might match any nodes.
2385    ///
2386    /// Uses zone maps for early filtering. Returns `true` if there might be
2387    /// matching nodes, `false` if there definitely aren't.
2388    #[must_use]
2389    pub fn node_property_might_match(
2390        &self,
2391        property: &PropertyKey,
2392        op: CompareOp,
2393        value: &Value,
2394    ) -> bool {
2395        self.node_properties.might_match(property, op, value)
2396    }
2397
2398    /// Checks if an edge property predicate might match any edges.
2399    #[must_use]
2400    pub fn edge_property_might_match(
2401        &self,
2402        property: &PropertyKey,
2403        op: CompareOp,
2404        value: &Value,
2405    ) -> bool {
2406        self.edge_properties.might_match(property, op, value)
2407    }
2408
2409    /// Gets the zone map for a node property.
2410    #[must_use]
2411    pub fn node_property_zone_map(&self, property: &PropertyKey) -> Option<ZoneMapEntry> {
2412        self.node_properties.zone_map(property)
2413    }
2414
2415    /// Gets the zone map for an edge property.
2416    #[must_use]
2417    pub fn edge_property_zone_map(&self, property: &PropertyKey) -> Option<ZoneMapEntry> {
2418        self.edge_properties.zone_map(property)
2419    }
2420
2421    /// Rebuilds zone maps for all properties.
2422    pub fn rebuild_zone_maps(&self) {
2423        self.node_properties.rebuild_zone_maps();
2424        self.edge_properties.rebuild_zone_maps();
2425    }
2426
2427    // === Statistics ===
2428
2429    /// Returns the current statistics.
2430    #[must_use]
2431    pub fn statistics(&self) -> Statistics {
2432        self.statistics.read().clone()
2433    }
2434
2435    /// Recomputes statistics from current data.
2436    ///
2437    /// Scans all labels and edge types to build cardinality estimates for the
2438    /// query optimizer. Call this periodically or after bulk data loads.
2439    #[cfg(not(feature = "tiered-storage"))]
2440    pub fn compute_statistics(&self) {
2441        let mut stats = Statistics::new();
2442
2443        // Compute total counts
2444        stats.total_nodes = self.node_count() as u64;
2445        stats.total_edges = self.edge_count() as u64;
2446
2447        // Compute per-label statistics
2448        let id_to_label = self.id_to_label.read();
2449        let label_index = self.label_index.read();
2450
2451        for (label_id, label_name) in id_to_label.iter().enumerate() {
2452            let node_count = label_index
2453                .get(label_id)
2454                .map(|set| set.len() as u64)
2455                .unwrap_or(0);
2456
2457            if node_count > 0 {
2458                // Estimate average degree
2459                let avg_out_degree = if stats.total_nodes > 0 {
2460                    stats.total_edges as f64 / stats.total_nodes as f64
2461                } else {
2462                    0.0
2463                };
2464
2465                let label_stats =
2466                    LabelStatistics::new(node_count).with_degrees(avg_out_degree, avg_out_degree);
2467
2468                stats.update_label(label_name.as_ref(), label_stats);
2469            }
2470        }
2471
2472        // Compute per-edge-type statistics
2473        let id_to_edge_type = self.id_to_edge_type.read();
2474        let edges = self.edges.read();
2475        let epoch = self.current_epoch();
2476
2477        let mut edge_type_counts: FxHashMap<u32, u64> = FxHashMap::default();
2478        for chain in edges.values() {
2479            if let Some(record) = chain.visible_at(epoch) {
2480                if !record.is_deleted() {
2481                    *edge_type_counts.entry(record.type_id).or_default() += 1;
2482                }
2483            }
2484        }
2485
2486        for (type_id, count) in edge_type_counts {
2487            if let Some(type_name) = id_to_edge_type.get(type_id as usize) {
2488                let avg_degree = if stats.total_nodes > 0 {
2489                    count as f64 / stats.total_nodes as f64
2490                } else {
2491                    0.0
2492                };
2493
2494                let edge_stats = EdgeTypeStatistics::new(count, avg_degree, avg_degree);
2495                stats.update_edge_type(type_name.as_ref(), edge_stats);
2496            }
2497        }
2498
2499        *self.statistics.write() = stats;
2500    }
2501
2502    /// Recomputes statistics from current data.
2503    /// (Tiered storage version)
2504    #[cfg(feature = "tiered-storage")]
2505    pub fn compute_statistics(&self) {
2506        let mut stats = Statistics::new();
2507
2508        // Compute total counts
2509        stats.total_nodes = self.node_count() as u64;
2510        stats.total_edges = self.edge_count() as u64;
2511
2512        // Compute per-label statistics
2513        let id_to_label = self.id_to_label.read();
2514        let label_index = self.label_index.read();
2515
2516        for (label_id, label_name) in id_to_label.iter().enumerate() {
2517            let node_count = label_index
2518                .get(label_id)
2519                .map(|set| set.len() as u64)
2520                .unwrap_or(0);
2521
2522            if node_count > 0 {
2523                let avg_out_degree = if stats.total_nodes > 0 {
2524                    stats.total_edges as f64 / stats.total_nodes as f64
2525                } else {
2526                    0.0
2527                };
2528
2529                let label_stats =
2530                    LabelStatistics::new(node_count).with_degrees(avg_out_degree, avg_out_degree);
2531
2532                stats.update_label(label_name.as_ref(), label_stats);
2533            }
2534        }
2535
2536        // Compute per-edge-type statistics
2537        let id_to_edge_type = self.id_to_edge_type.read();
2538        let versions = self.edge_versions.read();
2539        let epoch = self.current_epoch();
2540
2541        let mut edge_type_counts: FxHashMap<u32, u64> = FxHashMap::default();
2542        for index in versions.values() {
2543            if let Some(vref) = index.visible_at(epoch) {
2544                if let Some(record) = self.read_edge_record(&vref) {
2545                    if !record.is_deleted() {
2546                        *edge_type_counts.entry(record.type_id).or_default() += 1;
2547                    }
2548                }
2549            }
2550        }
2551
2552        for (type_id, count) in edge_type_counts {
2553            if let Some(type_name) = id_to_edge_type.get(type_id as usize) {
2554                let avg_degree = if stats.total_nodes > 0 {
2555                    count as f64 / stats.total_nodes as f64
2556                } else {
2557                    0.0
2558                };
2559
2560                let edge_stats = EdgeTypeStatistics::new(count, avg_degree, avg_degree);
2561                stats.update_edge_type(type_name.as_ref(), edge_stats);
2562            }
2563        }
2564
2565        *self.statistics.write() = stats;
2566    }
2567
2568    /// Estimates cardinality for a label scan.
2569    #[must_use]
2570    pub fn estimate_label_cardinality(&self, label: &str) -> f64 {
2571        self.statistics.read().estimate_label_cardinality(label)
2572    }
2573
2574    /// Estimates average degree for an edge type.
2575    #[must_use]
2576    pub fn estimate_avg_degree(&self, edge_type: &str, outgoing: bool) -> f64 {
2577        self.statistics
2578            .read()
2579            .estimate_avg_degree(edge_type, outgoing)
2580    }
2581
2582    // === Internal Helpers ===
2583
2584    fn get_or_create_label_id(&self, label: &str) -> u32 {
2585        {
2586            let label_to_id = self.label_to_id.read();
2587            if let Some(&id) = label_to_id.get(label) {
2588                return id;
2589            }
2590        }
2591
2592        let mut label_to_id = self.label_to_id.write();
2593        let mut id_to_label = self.id_to_label.write();
2594
2595        // Double-check after acquiring write lock
2596        if let Some(&id) = label_to_id.get(label) {
2597            return id;
2598        }
2599
2600        let id = id_to_label.len() as u32;
2601
2602        let label: Arc<str> = label.into();
2603        label_to_id.insert(label.clone(), id);
2604        id_to_label.push(label);
2605
2606        id
2607    }
2608
2609    fn get_or_create_edge_type_id(&self, edge_type: &str) -> u32 {
2610        {
2611            let type_to_id = self.edge_type_to_id.read();
2612            if let Some(&id) = type_to_id.get(edge_type) {
2613                return id;
2614            }
2615        }
2616
2617        let mut type_to_id = self.edge_type_to_id.write();
2618        let mut id_to_type = self.id_to_edge_type.write();
2619
2620        // Double-check
2621        if let Some(&id) = type_to_id.get(edge_type) {
2622            return id;
2623        }
2624
2625        let id = id_to_type.len() as u32;
2626        let edge_type: Arc<str> = edge_type.into();
2627        type_to_id.insert(edge_type.clone(), id);
2628        id_to_type.push(edge_type);
2629
2630        id
2631    }
2632
2633    // === Recovery Support ===
2634
2635    /// Creates a node with a specific ID during recovery.
2636    ///
2637    /// This is used for WAL recovery to restore nodes with their original IDs.
2638    /// The caller must ensure IDs don't conflict with existing nodes.
2639    #[cfg(not(feature = "tiered-storage"))]
2640    pub fn create_node_with_id(&self, id: NodeId, labels: &[&str]) {
2641        let epoch = self.current_epoch();
2642        let mut record = NodeRecord::new(id, epoch);
2643        record.set_label_count(labels.len() as u16);
2644
2645        // Store labels in node_labels map and label_index
2646        let mut node_label_set = FxHashSet::default();
2647        for label in labels {
2648            let label_id = self.get_or_create_label_id(*label);
2649            node_label_set.insert(label_id);
2650
2651            // Update label index
2652            let mut index = self.label_index.write();
2653            while index.len() <= label_id as usize {
2654                index.push(FxHashMap::default());
2655            }
2656            index[label_id as usize].insert(id, ());
2657        }
2658
2659        // Store node's labels
2660        self.node_labels.write().insert(id, node_label_set);
2661
2662        // Create version chain with initial version (using SYSTEM tx for recovery)
2663        let chain = VersionChain::with_initial(record, epoch, TxId::SYSTEM);
2664        self.nodes.write().insert(id, chain);
2665
2666        // Update next_node_id if necessary to avoid future collisions
2667        let id_val = id.as_u64();
2668        let _ = self
2669            .next_node_id
2670            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
2671                if id_val >= current {
2672                    Some(id_val + 1)
2673                } else {
2674                    None
2675                }
2676            });
2677    }
2678
2679    /// Creates a node with a specific ID during recovery.
2680    /// (Tiered storage version)
2681    #[cfg(feature = "tiered-storage")]
2682    pub fn create_node_with_id(&self, id: NodeId, labels: &[&str]) {
2683        let epoch = self.current_epoch();
2684        let mut record = NodeRecord::new(id, epoch);
2685        record.set_label_count(labels.len() as u16);
2686
2687        // Store labels in node_labels map and label_index
2688        let mut node_label_set = FxHashSet::default();
2689        for label in labels {
2690            let label_id = self.get_or_create_label_id(*label);
2691            node_label_set.insert(label_id);
2692
2693            // Update label index
2694            let mut index = self.label_index.write();
2695            while index.len() <= label_id as usize {
2696                index.push(FxHashMap::default());
2697            }
2698            index[label_id as usize].insert(id, ());
2699        }
2700
2701        // Store node's labels
2702        self.node_labels.write().insert(id, node_label_set);
2703
2704        // Allocate record in arena and get offset (create epoch if needed)
2705        let arena = self.arena_allocator.arena_or_create(epoch);
2706        let (offset, _stored) = arena.alloc_value_with_offset(record);
2707
2708        // Create HotVersionRef (using SYSTEM tx for recovery)
2709        let hot_ref = HotVersionRef::new(epoch, offset, TxId::SYSTEM);
2710        let mut versions = self.node_versions.write();
2711        versions.insert(id, VersionIndex::with_initial(hot_ref));
2712
2713        // Update next_node_id if necessary to avoid future collisions
2714        let id_val = id.as_u64();
2715        let _ = self
2716            .next_node_id
2717            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
2718                if id_val >= current {
2719                    Some(id_val + 1)
2720                } else {
2721                    None
2722                }
2723            });
2724    }
2725
2726    /// Creates an edge with a specific ID during recovery.
2727    ///
2728    /// This is used for WAL recovery to restore edges with their original IDs.
2729    #[cfg(not(feature = "tiered-storage"))]
2730    pub fn create_edge_with_id(&self, id: EdgeId, src: NodeId, dst: NodeId, edge_type: &str) {
2731        let epoch = self.current_epoch();
2732        let type_id = self.get_or_create_edge_type_id(edge_type);
2733
2734        let record = EdgeRecord::new(id, src, dst, type_id, epoch);
2735        let chain = VersionChain::with_initial(record, epoch, TxId::SYSTEM);
2736        self.edges.write().insert(id, chain);
2737
2738        // Update adjacency
2739        self.forward_adj.add_edge(src, dst, id);
2740        if let Some(ref backward) = self.backward_adj {
2741            backward.add_edge(dst, src, id);
2742        }
2743
2744        // Update next_edge_id if necessary
2745        let id_val = id.as_u64();
2746        let _ = self
2747            .next_edge_id
2748            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
2749                if id_val >= current {
2750                    Some(id_val + 1)
2751                } else {
2752                    None
2753                }
2754            });
2755    }
2756
2757    /// Creates an edge with a specific ID during recovery.
2758    /// (Tiered storage version)
2759    #[cfg(feature = "tiered-storage")]
2760    pub fn create_edge_with_id(&self, id: EdgeId, src: NodeId, dst: NodeId, edge_type: &str) {
2761        let epoch = self.current_epoch();
2762        let type_id = self.get_or_create_edge_type_id(edge_type);
2763
2764        let record = EdgeRecord::new(id, src, dst, type_id, epoch);
2765
2766        // Allocate record in arena and get offset (create epoch if needed)
2767        let arena = self.arena_allocator.arena_or_create(epoch);
2768        let (offset, _stored) = arena.alloc_value_with_offset(record);
2769
2770        // Create HotVersionRef (using SYSTEM tx for recovery)
2771        let hot_ref = HotVersionRef::new(epoch, offset, TxId::SYSTEM);
2772        let mut versions = self.edge_versions.write();
2773        versions.insert(id, VersionIndex::with_initial(hot_ref));
2774
2775        // Update adjacency
2776        self.forward_adj.add_edge(src, dst, id);
2777        if let Some(ref backward) = self.backward_adj {
2778            backward.add_edge(dst, src, id);
2779        }
2780
2781        // Update next_edge_id if necessary
2782        let id_val = id.as_u64();
2783        let _ = self
2784            .next_edge_id
2785            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
2786                if id_val >= current {
2787                    Some(id_val + 1)
2788                } else {
2789                    None
2790                }
2791            });
2792    }
2793
2794    /// Sets the current epoch during recovery.
2795    pub fn set_epoch(&self, epoch: EpochId) {
2796        self.current_epoch.store(epoch.as_u64(), Ordering::SeqCst);
2797    }
2798}
2799
2800impl Default for LpgStore {
2801    fn default() -> Self {
2802        Self::new()
2803    }
2804}
2805
2806#[cfg(test)]
2807mod tests {
2808    use super::*;
2809
2810    #[test]
2811    fn test_create_node() {
2812        let store = LpgStore::new();
2813
2814        let id = store.create_node(&["Person"]);
2815        assert!(id.is_valid());
2816
2817        let node = store.get_node(id).unwrap();
2818        assert!(node.has_label("Person"));
2819        assert!(!node.has_label("Animal"));
2820    }
2821
2822    #[test]
2823    fn test_create_node_with_props() {
2824        let store = LpgStore::new();
2825
2826        let id = store.create_node_with_props(
2827            &["Person"],
2828            [("name", Value::from("Alice")), ("age", Value::from(30i64))],
2829        );
2830
2831        let node = store.get_node(id).unwrap();
2832        assert_eq!(
2833            node.get_property("name").and_then(|v| v.as_str()),
2834            Some("Alice")
2835        );
2836        assert_eq!(
2837            node.get_property("age").and_then(|v| v.as_int64()),
2838            Some(30)
2839        );
2840    }
2841
2842    #[test]
2843    fn test_delete_node() {
2844        let store = LpgStore::new();
2845
2846        let id = store.create_node(&["Person"]);
2847        assert_eq!(store.node_count(), 1);
2848
2849        assert!(store.delete_node(id));
2850        assert_eq!(store.node_count(), 0);
2851        assert!(store.get_node(id).is_none());
2852
2853        // Double delete should return false
2854        assert!(!store.delete_node(id));
2855    }
2856
2857    #[test]
2858    fn test_create_edge() {
2859        let store = LpgStore::new();
2860
2861        let alice = store.create_node(&["Person"]);
2862        let bob = store.create_node(&["Person"]);
2863
2864        let edge_id = store.create_edge(alice, bob, "KNOWS");
2865        assert!(edge_id.is_valid());
2866
2867        let edge = store.get_edge(edge_id).unwrap();
2868        assert_eq!(edge.src, alice);
2869        assert_eq!(edge.dst, bob);
2870        assert_eq!(edge.edge_type.as_ref(), "KNOWS");
2871    }
2872
2873    #[test]
2874    fn test_neighbors() {
2875        let store = LpgStore::new();
2876
2877        let a = store.create_node(&["Person"]);
2878        let b = store.create_node(&["Person"]);
2879        let c = store.create_node(&["Person"]);
2880
2881        store.create_edge(a, b, "KNOWS");
2882        store.create_edge(a, c, "KNOWS");
2883
2884        let outgoing: Vec<_> = store.neighbors(a, Direction::Outgoing).collect();
2885        assert_eq!(outgoing.len(), 2);
2886        assert!(outgoing.contains(&b));
2887        assert!(outgoing.contains(&c));
2888
2889        let incoming: Vec<_> = store.neighbors(b, Direction::Incoming).collect();
2890        assert_eq!(incoming.len(), 1);
2891        assert!(incoming.contains(&a));
2892    }
2893
2894    #[test]
2895    fn test_nodes_by_label() {
2896        let store = LpgStore::new();
2897
2898        let p1 = store.create_node(&["Person"]);
2899        let p2 = store.create_node(&["Person"]);
2900        let _a = store.create_node(&["Animal"]);
2901
2902        let persons = store.nodes_by_label("Person");
2903        assert_eq!(persons.len(), 2);
2904        assert!(persons.contains(&p1));
2905        assert!(persons.contains(&p2));
2906
2907        let animals = store.nodes_by_label("Animal");
2908        assert_eq!(animals.len(), 1);
2909    }
2910
2911    #[test]
2912    fn test_delete_edge() {
2913        let store = LpgStore::new();
2914
2915        let a = store.create_node(&["Person"]);
2916        let b = store.create_node(&["Person"]);
2917        let edge_id = store.create_edge(a, b, "KNOWS");
2918
2919        assert_eq!(store.edge_count(), 1);
2920
2921        assert!(store.delete_edge(edge_id));
2922        assert_eq!(store.edge_count(), 0);
2923        assert!(store.get_edge(edge_id).is_none());
2924    }
2925
2926    // === New tests for improved coverage ===
2927
2928    #[test]
2929    fn test_lpg_store_config() {
2930        // Test with_config
2931        let config = LpgStoreConfig {
2932            backward_edges: false,
2933            initial_node_capacity: 100,
2934            initial_edge_capacity: 200,
2935        };
2936        let store = LpgStore::with_config(config);
2937
2938        // Store should work but without backward adjacency
2939        let a = store.create_node(&["Person"]);
2940        let b = store.create_node(&["Person"]);
2941        store.create_edge(a, b, "KNOWS");
2942
2943        // Outgoing should work
2944        let outgoing: Vec<_> = store.neighbors(a, Direction::Outgoing).collect();
2945        assert_eq!(outgoing.len(), 1);
2946
2947        // Incoming should be empty (no backward adjacency)
2948        let incoming: Vec<_> = store.neighbors(b, Direction::Incoming).collect();
2949        assert_eq!(incoming.len(), 0);
2950    }
2951
2952    #[test]
2953    fn test_epoch_management() {
2954        let store = LpgStore::new();
2955
2956        let epoch0 = store.current_epoch();
2957        assert_eq!(epoch0.as_u64(), 0);
2958
2959        let epoch1 = store.new_epoch();
2960        assert_eq!(epoch1.as_u64(), 1);
2961
2962        let current = store.current_epoch();
2963        assert_eq!(current.as_u64(), 1);
2964    }
2965
2966    #[test]
2967    fn test_node_properties() {
2968        let store = LpgStore::new();
2969        let id = store.create_node(&["Person"]);
2970
2971        // Set and get property
2972        store.set_node_property(id, "name", Value::from("Alice"));
2973        let name = store.get_node_property(id, &"name".into());
2974        assert!(matches!(name, Some(Value::String(s)) if s.as_ref() == "Alice"));
2975
2976        // Update property
2977        store.set_node_property(id, "name", Value::from("Bob"));
2978        let name = store.get_node_property(id, &"name".into());
2979        assert!(matches!(name, Some(Value::String(s)) if s.as_ref() == "Bob"));
2980
2981        // Remove property
2982        let old = store.remove_node_property(id, "name");
2983        assert!(matches!(old, Some(Value::String(s)) if s.as_ref() == "Bob"));
2984
2985        // Property should be gone
2986        let name = store.get_node_property(id, &"name".into());
2987        assert!(name.is_none());
2988
2989        // Remove non-existent property
2990        let none = store.remove_node_property(id, "nonexistent");
2991        assert!(none.is_none());
2992    }
2993
2994    #[test]
2995    fn test_edge_properties() {
2996        let store = LpgStore::new();
2997        let a = store.create_node(&["Person"]);
2998        let b = store.create_node(&["Person"]);
2999        let edge_id = store.create_edge(a, b, "KNOWS");
3000
3001        // Set and get property
3002        store.set_edge_property(edge_id, "since", Value::from(2020i64));
3003        let since = store.get_edge_property(edge_id, &"since".into());
3004        assert_eq!(since.and_then(|v| v.as_int64()), Some(2020));
3005
3006        // Remove property
3007        let old = store.remove_edge_property(edge_id, "since");
3008        assert_eq!(old.and_then(|v| v.as_int64()), Some(2020));
3009
3010        let since = store.get_edge_property(edge_id, &"since".into());
3011        assert!(since.is_none());
3012    }
3013
3014    #[test]
3015    fn test_add_remove_label() {
3016        let store = LpgStore::new();
3017        let id = store.create_node(&["Person"]);
3018
3019        // Add new label
3020        assert!(store.add_label(id, "Employee"));
3021
3022        let node = store.get_node(id).unwrap();
3023        assert!(node.has_label("Person"));
3024        assert!(node.has_label("Employee"));
3025
3026        // Adding same label again should fail
3027        assert!(!store.add_label(id, "Employee"));
3028
3029        // Remove label
3030        assert!(store.remove_label(id, "Employee"));
3031
3032        let node = store.get_node(id).unwrap();
3033        assert!(node.has_label("Person"));
3034        assert!(!node.has_label("Employee"));
3035
3036        // Removing non-existent label should fail
3037        assert!(!store.remove_label(id, "Employee"));
3038        assert!(!store.remove_label(id, "NonExistent"));
3039    }
3040
3041    #[test]
3042    fn test_add_label_to_nonexistent_node() {
3043        let store = LpgStore::new();
3044        let fake_id = NodeId::new(999);
3045        assert!(!store.add_label(fake_id, "Label"));
3046    }
3047
3048    #[test]
3049    fn test_remove_label_from_nonexistent_node() {
3050        let store = LpgStore::new();
3051        let fake_id = NodeId::new(999);
3052        assert!(!store.remove_label(fake_id, "Label"));
3053    }
3054
3055    #[test]
3056    fn test_node_ids() {
3057        let store = LpgStore::new();
3058
3059        let n1 = store.create_node(&["Person"]);
3060        let n2 = store.create_node(&["Person"]);
3061        let n3 = store.create_node(&["Person"]);
3062
3063        let ids = store.node_ids();
3064        assert_eq!(ids.len(), 3);
3065        assert!(ids.contains(&n1));
3066        assert!(ids.contains(&n2));
3067        assert!(ids.contains(&n3));
3068
3069        // Delete one
3070        store.delete_node(n2);
3071        let ids = store.node_ids();
3072        assert_eq!(ids.len(), 2);
3073        assert!(!ids.contains(&n2));
3074    }
3075
3076    #[test]
3077    fn test_delete_node_nonexistent() {
3078        let store = LpgStore::new();
3079        let fake_id = NodeId::new(999);
3080        assert!(!store.delete_node(fake_id));
3081    }
3082
3083    #[test]
3084    fn test_delete_edge_nonexistent() {
3085        let store = LpgStore::new();
3086        let fake_id = EdgeId::new(999);
3087        assert!(!store.delete_edge(fake_id));
3088    }
3089
3090    #[test]
3091    fn test_delete_edge_double() {
3092        let store = LpgStore::new();
3093        let a = store.create_node(&["Person"]);
3094        let b = store.create_node(&["Person"]);
3095        let edge_id = store.create_edge(a, b, "KNOWS");
3096
3097        assert!(store.delete_edge(edge_id));
3098        assert!(!store.delete_edge(edge_id)); // Double delete
3099    }
3100
3101    #[test]
3102    fn test_create_edge_with_props() {
3103        let store = LpgStore::new();
3104        let a = store.create_node(&["Person"]);
3105        let b = store.create_node(&["Person"]);
3106
3107        let edge_id = store.create_edge_with_props(
3108            a,
3109            b,
3110            "KNOWS",
3111            [
3112                ("since", Value::from(2020i64)),
3113                ("weight", Value::from(1.0)),
3114            ],
3115        );
3116
3117        let edge = store.get_edge(edge_id).unwrap();
3118        assert_eq!(
3119            edge.get_property("since").and_then(|v| v.as_int64()),
3120            Some(2020)
3121        );
3122        assert_eq!(
3123            edge.get_property("weight").and_then(|v| v.as_float64()),
3124            Some(1.0)
3125        );
3126    }
3127
3128    #[test]
3129    fn test_delete_node_edges() {
3130        let store = LpgStore::new();
3131
3132        let a = store.create_node(&["Person"]);
3133        let b = store.create_node(&["Person"]);
3134        let c = store.create_node(&["Person"]);
3135
3136        store.create_edge(a, b, "KNOWS"); // a -> b
3137        store.create_edge(c, a, "KNOWS"); // c -> a
3138
3139        assert_eq!(store.edge_count(), 2);
3140
3141        // Delete all edges connected to a
3142        store.delete_node_edges(a);
3143
3144        assert_eq!(store.edge_count(), 0);
3145    }
3146
3147    #[test]
3148    fn test_neighbors_both_directions() {
3149        let store = LpgStore::new();
3150
3151        let a = store.create_node(&["Person"]);
3152        let b = store.create_node(&["Person"]);
3153        let c = store.create_node(&["Person"]);
3154
3155        store.create_edge(a, b, "KNOWS"); // a -> b
3156        store.create_edge(c, a, "KNOWS"); // c -> a
3157
3158        // Direction::Both for node a
3159        let neighbors: Vec<_> = store.neighbors(a, Direction::Both).collect();
3160        assert_eq!(neighbors.len(), 2);
3161        assert!(neighbors.contains(&b)); // outgoing
3162        assert!(neighbors.contains(&c)); // incoming
3163    }
3164
3165    #[test]
3166    fn test_edges_from() {
3167        let store = LpgStore::new();
3168
3169        let a = store.create_node(&["Person"]);
3170        let b = store.create_node(&["Person"]);
3171        let c = store.create_node(&["Person"]);
3172
3173        let e1 = store.create_edge(a, b, "KNOWS");
3174        let e2 = store.create_edge(a, c, "KNOWS");
3175
3176        let edges: Vec<_> = store.edges_from(a, Direction::Outgoing).collect();
3177        assert_eq!(edges.len(), 2);
3178        assert!(edges.iter().any(|(_, e)| *e == e1));
3179        assert!(edges.iter().any(|(_, e)| *e == e2));
3180
3181        // Incoming edges to b
3182        let incoming: Vec<_> = store.edges_from(b, Direction::Incoming).collect();
3183        assert_eq!(incoming.len(), 1);
3184        assert_eq!(incoming[0].1, e1);
3185    }
3186
3187    #[test]
3188    fn test_edges_to() {
3189        let store = LpgStore::new();
3190
3191        let a = store.create_node(&["Person"]);
3192        let b = store.create_node(&["Person"]);
3193        let c = store.create_node(&["Person"]);
3194
3195        let e1 = store.create_edge(a, b, "KNOWS");
3196        let e2 = store.create_edge(c, b, "KNOWS");
3197
3198        // Edges pointing TO b
3199        let to_b = store.edges_to(b);
3200        assert_eq!(to_b.len(), 2);
3201        assert!(to_b.iter().any(|(src, e)| *src == a && *e == e1));
3202        assert!(to_b.iter().any(|(src, e)| *src == c && *e == e2));
3203    }
3204
3205    #[test]
3206    fn test_out_degree_in_degree() {
3207        let store = LpgStore::new();
3208
3209        let a = store.create_node(&["Person"]);
3210        let b = store.create_node(&["Person"]);
3211        let c = store.create_node(&["Person"]);
3212
3213        store.create_edge(a, b, "KNOWS");
3214        store.create_edge(a, c, "KNOWS");
3215        store.create_edge(c, b, "KNOWS");
3216
3217        assert_eq!(store.out_degree(a), 2);
3218        assert_eq!(store.out_degree(b), 0);
3219        assert_eq!(store.out_degree(c), 1);
3220
3221        assert_eq!(store.in_degree(a), 0);
3222        assert_eq!(store.in_degree(b), 2);
3223        assert_eq!(store.in_degree(c), 1);
3224    }
3225
3226    #[test]
3227    fn test_edge_type() {
3228        let store = LpgStore::new();
3229
3230        let a = store.create_node(&["Person"]);
3231        let b = store.create_node(&["Person"]);
3232        let edge_id = store.create_edge(a, b, "KNOWS");
3233
3234        let edge_type = store.edge_type(edge_id);
3235        assert_eq!(edge_type.as_deref(), Some("KNOWS"));
3236
3237        // Non-existent edge
3238        let fake_id = EdgeId::new(999);
3239        assert!(store.edge_type(fake_id).is_none());
3240    }
3241
3242    #[test]
3243    fn test_count_methods() {
3244        let store = LpgStore::new();
3245
3246        assert_eq!(store.label_count(), 0);
3247        assert_eq!(store.edge_type_count(), 0);
3248        assert_eq!(store.property_key_count(), 0);
3249
3250        let a = store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
3251        let b = store.create_node(&["Company"]);
3252        store.create_edge_with_props(a, b, "WORKS_AT", [("since", Value::from(2020i64))]);
3253
3254        assert_eq!(store.label_count(), 2); // Person, Company
3255        assert_eq!(store.edge_type_count(), 1); // WORKS_AT
3256        assert_eq!(store.property_key_count(), 2); // age, since
3257    }
3258
3259    #[test]
3260    fn test_all_nodes_and_edges() {
3261        let store = LpgStore::new();
3262
3263        let a = store.create_node(&["Person"]);
3264        let b = store.create_node(&["Person"]);
3265        store.create_edge(a, b, "KNOWS");
3266
3267        let nodes: Vec<_> = store.all_nodes().collect();
3268        assert_eq!(nodes.len(), 2);
3269
3270        let edges: Vec<_> = store.all_edges().collect();
3271        assert_eq!(edges.len(), 1);
3272    }
3273
3274    #[test]
3275    fn test_all_labels_and_edge_types() {
3276        let store = LpgStore::new();
3277
3278        store.create_node(&["Person"]);
3279        store.create_node(&["Company"]);
3280        let a = store.create_node(&["Animal"]);
3281        let b = store.create_node(&["Animal"]);
3282        store.create_edge(a, b, "EATS");
3283
3284        let labels = store.all_labels();
3285        assert_eq!(labels.len(), 3);
3286        assert!(labels.contains(&"Person".to_string()));
3287        assert!(labels.contains(&"Company".to_string()));
3288        assert!(labels.contains(&"Animal".to_string()));
3289
3290        let edge_types = store.all_edge_types();
3291        assert_eq!(edge_types.len(), 1);
3292        assert!(edge_types.contains(&"EATS".to_string()));
3293    }
3294
3295    #[test]
3296    fn test_all_property_keys() {
3297        let store = LpgStore::new();
3298
3299        let a = store.create_node_with_props(&["Person"], [("name", Value::from("Alice"))]);
3300        let b = store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
3301        store.create_edge_with_props(a, b, "KNOWS", [("since", Value::from(2020i64))]);
3302
3303        let keys = store.all_property_keys();
3304        assert!(keys.contains(&"name".to_string()));
3305        assert!(keys.contains(&"age".to_string()));
3306        assert!(keys.contains(&"since".to_string()));
3307    }
3308
3309    #[test]
3310    fn test_nodes_with_label() {
3311        let store = LpgStore::new();
3312
3313        store.create_node(&["Person"]);
3314        store.create_node(&["Person"]);
3315        store.create_node(&["Company"]);
3316
3317        let persons: Vec<_> = store.nodes_with_label("Person").collect();
3318        assert_eq!(persons.len(), 2);
3319
3320        let companies: Vec<_> = store.nodes_with_label("Company").collect();
3321        assert_eq!(companies.len(), 1);
3322
3323        let none: Vec<_> = store.nodes_with_label("NonExistent").collect();
3324        assert_eq!(none.len(), 0);
3325    }
3326
3327    #[test]
3328    fn test_edges_with_type() {
3329        let store = LpgStore::new();
3330
3331        let a = store.create_node(&["Person"]);
3332        let b = store.create_node(&["Person"]);
3333        let c = store.create_node(&["Company"]);
3334
3335        store.create_edge(a, b, "KNOWS");
3336        store.create_edge(a, c, "WORKS_AT");
3337
3338        let knows: Vec<_> = store.edges_with_type("KNOWS").collect();
3339        assert_eq!(knows.len(), 1);
3340
3341        let works_at: Vec<_> = store.edges_with_type("WORKS_AT").collect();
3342        assert_eq!(works_at.len(), 1);
3343
3344        let none: Vec<_> = store.edges_with_type("NonExistent").collect();
3345        assert_eq!(none.len(), 0);
3346    }
3347
3348    #[test]
3349    fn test_nodes_by_label_nonexistent() {
3350        let store = LpgStore::new();
3351        store.create_node(&["Person"]);
3352
3353        let empty = store.nodes_by_label("NonExistent");
3354        assert!(empty.is_empty());
3355    }
3356
3357    #[test]
3358    fn test_statistics() {
3359        let store = LpgStore::new();
3360
3361        let a = store.create_node(&["Person"]);
3362        let b = store.create_node(&["Person"]);
3363        let c = store.create_node(&["Company"]);
3364
3365        store.create_edge(a, b, "KNOWS");
3366        store.create_edge(a, c, "WORKS_AT");
3367
3368        store.compute_statistics();
3369        let stats = store.statistics();
3370
3371        assert_eq!(stats.total_nodes, 3);
3372        assert_eq!(stats.total_edges, 2);
3373
3374        // Estimates
3375        let person_card = store.estimate_label_cardinality("Person");
3376        assert!(person_card > 0.0);
3377
3378        let avg_degree = store.estimate_avg_degree("KNOWS", true);
3379        assert!(avg_degree >= 0.0);
3380    }
3381
3382    #[test]
3383    fn test_zone_maps() {
3384        let store = LpgStore::new();
3385
3386        store.create_node_with_props(&["Person"], [("age", Value::from(25i64))]);
3387        store.create_node_with_props(&["Person"], [("age", Value::from(35i64))]);
3388
3389        // Zone map should indicate possible matches (30 is within [25, 35] range)
3390        let might_match =
3391            store.node_property_might_match(&"age".into(), CompareOp::Eq, &Value::from(30i64));
3392        // Zone maps return true conservatively when value is within min/max range
3393        assert!(might_match);
3394
3395        let zone = store.node_property_zone_map(&"age".into());
3396        assert!(zone.is_some());
3397
3398        // Non-existent property
3399        let no_zone = store.node_property_zone_map(&"nonexistent".into());
3400        assert!(no_zone.is_none());
3401
3402        // Edge zone maps
3403        let a = store.create_node(&["A"]);
3404        let b = store.create_node(&["B"]);
3405        store.create_edge_with_props(a, b, "REL", [("weight", Value::from(1.0))]);
3406
3407        let edge_zone = store.edge_property_zone_map(&"weight".into());
3408        assert!(edge_zone.is_some());
3409    }
3410
3411    #[test]
3412    fn test_rebuild_zone_maps() {
3413        let store = LpgStore::new();
3414        store.create_node_with_props(&["Person"], [("age", Value::from(25i64))]);
3415
3416        // Should not panic
3417        store.rebuild_zone_maps();
3418    }
3419
3420    #[test]
3421    fn test_create_node_with_id() {
3422        let store = LpgStore::new();
3423
3424        let specific_id = NodeId::new(100);
3425        store.create_node_with_id(specific_id, &["Person", "Employee"]);
3426
3427        let node = store.get_node(specific_id).unwrap();
3428        assert!(node.has_label("Person"));
3429        assert!(node.has_label("Employee"));
3430
3431        // Next auto-generated ID should be > 100
3432        let next = store.create_node(&["Other"]);
3433        assert!(next.as_u64() > 100);
3434    }
3435
3436    #[test]
3437    fn test_create_edge_with_id() {
3438        let store = LpgStore::new();
3439
3440        let a = store.create_node(&["A"]);
3441        let b = store.create_node(&["B"]);
3442
3443        let specific_id = EdgeId::new(500);
3444        store.create_edge_with_id(specific_id, a, b, "REL");
3445
3446        let edge = store.get_edge(specific_id).unwrap();
3447        assert_eq!(edge.src, a);
3448        assert_eq!(edge.dst, b);
3449        assert_eq!(edge.edge_type.as_ref(), "REL");
3450
3451        // Next auto-generated ID should be > 500
3452        let next = store.create_edge(a, b, "OTHER");
3453        assert!(next.as_u64() > 500);
3454    }
3455
3456    #[test]
3457    fn test_set_epoch() {
3458        let store = LpgStore::new();
3459
3460        assert_eq!(store.current_epoch().as_u64(), 0);
3461
3462        store.set_epoch(EpochId::new(42));
3463        assert_eq!(store.current_epoch().as_u64(), 42);
3464    }
3465
3466    #[test]
3467    fn test_get_node_nonexistent() {
3468        let store = LpgStore::new();
3469        let fake_id = NodeId::new(999);
3470        assert!(store.get_node(fake_id).is_none());
3471    }
3472
3473    #[test]
3474    fn test_get_edge_nonexistent() {
3475        let store = LpgStore::new();
3476        let fake_id = EdgeId::new(999);
3477        assert!(store.get_edge(fake_id).is_none());
3478    }
3479
3480    #[test]
3481    fn test_multiple_labels() {
3482        let store = LpgStore::new();
3483
3484        let id = store.create_node(&["Person", "Employee", "Manager"]);
3485        let node = store.get_node(id).unwrap();
3486
3487        assert!(node.has_label("Person"));
3488        assert!(node.has_label("Employee"));
3489        assert!(node.has_label("Manager"));
3490        assert!(!node.has_label("Other"));
3491    }
3492
3493    #[test]
3494    fn test_default_impl() {
3495        let store: LpgStore = Default::default();
3496        assert_eq!(store.node_count(), 0);
3497        assert_eq!(store.edge_count(), 0);
3498    }
3499
3500    #[test]
3501    fn test_edges_from_both_directions() {
3502        let store = LpgStore::new();
3503
3504        let a = store.create_node(&["A"]);
3505        let b = store.create_node(&["B"]);
3506        let c = store.create_node(&["C"]);
3507
3508        let e1 = store.create_edge(a, b, "R1"); // a -> b
3509        let e2 = store.create_edge(c, a, "R2"); // c -> a
3510
3511        // Both directions from a
3512        let edges: Vec<_> = store.edges_from(a, Direction::Both).collect();
3513        assert_eq!(edges.len(), 2);
3514        assert!(edges.iter().any(|(_, e)| *e == e1)); // outgoing
3515        assert!(edges.iter().any(|(_, e)| *e == e2)); // incoming
3516    }
3517
3518    #[test]
3519    fn test_no_backward_adj_in_degree() {
3520        let config = LpgStoreConfig {
3521            backward_edges: false,
3522            initial_node_capacity: 10,
3523            initial_edge_capacity: 10,
3524        };
3525        let store = LpgStore::with_config(config);
3526
3527        let a = store.create_node(&["A"]);
3528        let b = store.create_node(&["B"]);
3529        store.create_edge(a, b, "R");
3530
3531        // in_degree should still work (falls back to scanning)
3532        let degree = store.in_degree(b);
3533        assert_eq!(degree, 1);
3534    }
3535
3536    #[test]
3537    fn test_no_backward_adj_edges_to() {
3538        let config = LpgStoreConfig {
3539            backward_edges: false,
3540            initial_node_capacity: 10,
3541            initial_edge_capacity: 10,
3542        };
3543        let store = LpgStore::with_config(config);
3544
3545        let a = store.create_node(&["A"]);
3546        let b = store.create_node(&["B"]);
3547        let e = store.create_edge(a, b, "R");
3548
3549        // edges_to should still work (falls back to scanning)
3550        let edges = store.edges_to(b);
3551        assert_eq!(edges.len(), 1);
3552        assert_eq!(edges[0].1, e);
3553    }
3554
3555    #[test]
3556    fn test_node_versioned_creation() {
3557        let store = LpgStore::new();
3558
3559        let epoch = store.new_epoch();
3560        let tx_id = TxId::new(1);
3561
3562        let id = store.create_node_versioned(&["Person"], epoch, tx_id);
3563        assert!(store.get_node(id).is_some());
3564    }
3565
3566    #[test]
3567    fn test_edge_versioned_creation() {
3568        let store = LpgStore::new();
3569
3570        let a = store.create_node(&["A"]);
3571        let b = store.create_node(&["B"]);
3572
3573        let epoch = store.new_epoch();
3574        let tx_id = TxId::new(1);
3575
3576        let edge_id = store.create_edge_versioned(a, b, "REL", epoch, tx_id);
3577        assert!(store.get_edge(edge_id).is_some());
3578    }
3579
3580    #[test]
3581    fn test_node_with_props_versioned() {
3582        let store = LpgStore::new();
3583
3584        let epoch = store.new_epoch();
3585        let tx_id = TxId::new(1);
3586
3587        let id = store.create_node_with_props_versioned(
3588            &["Person"],
3589            [("name", Value::from("Alice"))],
3590            epoch,
3591            tx_id,
3592        );
3593
3594        let node = store.get_node(id).unwrap();
3595        assert_eq!(
3596            node.get_property("name").and_then(|v| v.as_str()),
3597            Some("Alice")
3598        );
3599    }
3600
3601    #[test]
3602    fn test_discard_uncommitted_versions() {
3603        let store = LpgStore::new();
3604
3605        let epoch = store.new_epoch();
3606        let tx_id = TxId::new(42);
3607
3608        // Create node with specific tx
3609        let node_id = store.create_node_versioned(&["Person"], epoch, tx_id);
3610        assert!(store.get_node(node_id).is_some());
3611
3612        // Discard uncommitted versions for this tx
3613        store.discard_uncommitted_versions(tx_id);
3614
3615        // Node should be gone (version chain was removed)
3616        assert!(store.get_node(node_id).is_none());
3617    }
3618
3619    // === Property Index Tests ===
3620
3621    #[test]
3622    fn test_property_index_create_and_lookup() {
3623        let store = LpgStore::new();
3624
3625        // Create nodes with properties
3626        let alice = store.create_node(&["Person"]);
3627        let bob = store.create_node(&["Person"]);
3628        let charlie = store.create_node(&["Person"]);
3629
3630        store.set_node_property(alice, "city", Value::from("NYC"));
3631        store.set_node_property(bob, "city", Value::from("NYC"));
3632        store.set_node_property(charlie, "city", Value::from("LA"));
3633
3634        // Before indexing, lookup still works (via scan)
3635        let nyc_people = store.find_nodes_by_property("city", &Value::from("NYC"));
3636        assert_eq!(nyc_people.len(), 2);
3637
3638        // Create index
3639        store.create_property_index("city");
3640        assert!(store.has_property_index("city"));
3641
3642        // Indexed lookup should return same results
3643        let nyc_people = store.find_nodes_by_property("city", &Value::from("NYC"));
3644        assert_eq!(nyc_people.len(), 2);
3645        assert!(nyc_people.contains(&alice));
3646        assert!(nyc_people.contains(&bob));
3647
3648        let la_people = store.find_nodes_by_property("city", &Value::from("LA"));
3649        assert_eq!(la_people.len(), 1);
3650        assert!(la_people.contains(&charlie));
3651    }
3652
3653    #[test]
3654    fn test_property_index_maintained_on_update() {
3655        let store = LpgStore::new();
3656
3657        // Create index first
3658        store.create_property_index("status");
3659
3660        let node = store.create_node(&["Task"]);
3661        store.set_node_property(node, "status", Value::from("pending"));
3662
3663        // Should find by initial value
3664        let pending = store.find_nodes_by_property("status", &Value::from("pending"));
3665        assert_eq!(pending.len(), 1);
3666        assert!(pending.contains(&node));
3667
3668        // Update the property
3669        store.set_node_property(node, "status", Value::from("done"));
3670
3671        // Old value should not find it
3672        let pending = store.find_nodes_by_property("status", &Value::from("pending"));
3673        assert!(pending.is_empty());
3674
3675        // New value should find it
3676        let done = store.find_nodes_by_property("status", &Value::from("done"));
3677        assert_eq!(done.len(), 1);
3678        assert!(done.contains(&node));
3679    }
3680
3681    #[test]
3682    fn test_property_index_maintained_on_remove() {
3683        let store = LpgStore::new();
3684
3685        store.create_property_index("tag");
3686
3687        let node = store.create_node(&["Item"]);
3688        store.set_node_property(node, "tag", Value::from("important"));
3689
3690        // Should find it
3691        let found = store.find_nodes_by_property("tag", &Value::from("important"));
3692        assert_eq!(found.len(), 1);
3693
3694        // Remove the property
3695        store.remove_node_property(node, "tag");
3696
3697        // Should no longer find it
3698        let found = store.find_nodes_by_property("tag", &Value::from("important"));
3699        assert!(found.is_empty());
3700    }
3701
3702    #[test]
3703    fn test_property_index_drop() {
3704        let store = LpgStore::new();
3705
3706        store.create_property_index("key");
3707        assert!(store.has_property_index("key"));
3708
3709        assert!(store.drop_property_index("key"));
3710        assert!(!store.has_property_index("key"));
3711
3712        // Dropping non-existent index returns false
3713        assert!(!store.drop_property_index("key"));
3714    }
3715
3716    #[test]
3717    fn test_property_index_multiple_values() {
3718        let store = LpgStore::new();
3719
3720        store.create_property_index("age");
3721
3722        // Create multiple nodes with same and different ages
3723        let n1 = store.create_node(&["Person"]);
3724        let n2 = store.create_node(&["Person"]);
3725        let n3 = store.create_node(&["Person"]);
3726        let n4 = store.create_node(&["Person"]);
3727
3728        store.set_node_property(n1, "age", Value::from(25i64));
3729        store.set_node_property(n2, "age", Value::from(25i64));
3730        store.set_node_property(n3, "age", Value::from(30i64));
3731        store.set_node_property(n4, "age", Value::from(25i64));
3732
3733        let age_25 = store.find_nodes_by_property("age", &Value::from(25i64));
3734        assert_eq!(age_25.len(), 3);
3735
3736        let age_30 = store.find_nodes_by_property("age", &Value::from(30i64));
3737        assert_eq!(age_30.len(), 1);
3738
3739        let age_40 = store.find_nodes_by_property("age", &Value::from(40i64));
3740        assert!(age_40.is_empty());
3741    }
3742
3743    #[test]
3744    fn test_property_index_builds_from_existing_data() {
3745        let store = LpgStore::new();
3746
3747        // Create nodes first
3748        let n1 = store.create_node(&["Person"]);
3749        let n2 = store.create_node(&["Person"]);
3750        store.set_node_property(n1, "email", Value::from("alice@example.com"));
3751        store.set_node_property(n2, "email", Value::from("bob@example.com"));
3752
3753        // Create index after data exists
3754        store.create_property_index("email");
3755
3756        // Index should include existing data
3757        let alice = store.find_nodes_by_property("email", &Value::from("alice@example.com"));
3758        assert_eq!(alice.len(), 1);
3759        assert!(alice.contains(&n1));
3760
3761        let bob = store.find_nodes_by_property("email", &Value::from("bob@example.com"));
3762        assert_eq!(bob.len(), 1);
3763        assert!(bob.contains(&n2));
3764    }
3765}