Skip to main content

grafeo_core/graph/lpg/
store.rs

1//! The in-memory LPG graph store.
2//!
3//! This is where your nodes and edges actually live. Most users interact
4//! through [`GrafeoDB`](grafeo_engine::GrafeoDB), but algorithm implementers
5//! sometimes need the raw [`LpgStore`] for direct adjacency traversal.
6//!
7//! Key features:
8//! - MVCC versioning - concurrent readers don't block each other
9//! - Columnar properties with zone maps for fast filtering
10//! - Forward and backward adjacency indexes
11
12use super::property::CompareOp;
13use super::{Edge, EdgeRecord, Node, NodeRecord, PropertyStorage};
14use crate::graph::Direction;
15use crate::index::adjacency::ChunkedAdjacency;
16use crate::index::zone_map::ZoneMapEntry;
17use crate::statistics::{EdgeTypeStatistics, LabelStatistics, Statistics};
18use arcstr::ArcStr;
19use dashmap::DashMap;
20#[cfg(not(feature = "tiered-storage"))]
21use grafeo_common::mvcc::VersionChain;
22use grafeo_common::types::{EdgeId, EpochId, HashableValue, NodeId, PropertyKey, TxId, Value};
23use grafeo_common::utils::hash::{FxHashMap, FxHashSet};
24use parking_lot::RwLock;
25use std::cmp::Ordering as CmpOrdering;
26#[cfg(feature = "tiered-storage")]
27use std::sync::Arc;
28use std::sync::atomic::{AtomicU64, Ordering};
29
30/// Compares two values for ordering (used for range checks).
31fn compare_values_for_range(a: &Value, b: &Value) -> Option<CmpOrdering> {
32    match (a, b) {
33        (Value::Int64(a), Value::Int64(b)) => Some(a.cmp(b)),
34        (Value::Float64(a), Value::Float64(b)) => a.partial_cmp(b),
35        (Value::String(a), Value::String(b)) => Some(a.cmp(b)),
36        (Value::Bool(a), Value::Bool(b)) => Some(a.cmp(b)),
37        _ => None,
38    }
39}
40
41/// Checks if a value is within a range.
42fn value_in_range(
43    value: &Value,
44    min: Option<&Value>,
45    max: Option<&Value>,
46    min_inclusive: bool,
47    max_inclusive: bool,
48) -> bool {
49    // Check lower bound
50    if let Some(min_val) = min {
51        match compare_values_for_range(value, min_val) {
52            Some(CmpOrdering::Less) => return false,
53            Some(CmpOrdering::Equal) if !min_inclusive => return false,
54            None => return false, // Can't compare
55            _ => {}
56        }
57    }
58
59    // Check upper bound
60    if let Some(max_val) = max {
61        match compare_values_for_range(value, max_val) {
62            Some(CmpOrdering::Greater) => return false,
63            Some(CmpOrdering::Equal) if !max_inclusive => return false,
64            None => return false,
65            _ => {}
66        }
67    }
68
69    true
70}
71
72// Tiered storage imports
73#[cfg(feature = "tiered-storage")]
74use crate::storage::EpochStore;
75#[cfg(feature = "tiered-storage")]
76use grafeo_common::memory::arena::ArenaAllocator;
77#[cfg(feature = "tiered-storage")]
78use grafeo_common::mvcc::{ColdVersionRef, HotVersionRef, VersionIndex, VersionRef};
79
80/// Configuration for the LPG store.
81///
82/// The defaults work well for most cases. Tune `backward_edges` if you only
83/// traverse in one direction (saves memory), or adjust capacities if you know
84/// your graph size upfront (avoids reallocations).
85#[derive(Debug, Clone)]
86pub struct LpgStoreConfig {
87    /// Maintain backward adjacency for incoming edge queries. Turn off if
88    /// you only traverse outgoing edges - saves ~50% adjacency memory.
89    pub backward_edges: bool,
90    /// Initial capacity for nodes (avoids early reallocations).
91    pub initial_node_capacity: usize,
92    /// Initial capacity for edges (avoids early reallocations).
93    pub initial_edge_capacity: usize,
94}
95
96impl Default for LpgStoreConfig {
97    fn default() -> Self {
98        Self {
99            backward_edges: true,
100            initial_node_capacity: 1024,
101            initial_edge_capacity: 4096,
102        }
103    }
104}
105
106/// The core in-memory graph storage.
107///
108/// Everything lives here: nodes, edges, properties, adjacency indexes, and
109/// version chains for MVCC. Concurrent reads never block each other.
110///
111/// Most users should go through `GrafeoDB` (from the `grafeo_engine` crate) which
112/// adds transaction management and query execution. Use `LpgStore` directly
113/// when you need raw performance for algorithm implementations.
114///
115/// # Example
116///
117/// ```
118/// use grafeo_core::graph::lpg::LpgStore;
119/// use grafeo_core::graph::Direction;
120///
121/// let store = LpgStore::new();
122///
123/// // Create a small social network
124/// let alice = store.create_node(&["Person"]);
125/// let bob = store.create_node(&["Person"]);
126/// store.create_edge(alice, bob, "KNOWS");
127///
128/// // Traverse outgoing edges
129/// for neighbor in store.neighbors(alice, Direction::Outgoing) {
130///     println!("Alice knows node {:?}", neighbor);
131/// }
132/// ```
133///
134/// # Lock Ordering
135///
136/// `LpgStore` contains multiple `RwLock` fields that must be acquired in a
137/// consistent order to prevent deadlocks. Always acquire locks in this order:
138///
139/// ## Level 1 - Entity Storage (mutually exclusive via feature flag)
140/// 1. `nodes` / `node_versions`
141/// 2. `edges` / `edge_versions`
142///
143/// ## Level 2 - Catalogs (acquire as pairs when writing)
144/// 3. `label_to_id` + `id_to_label`
145/// 4. `edge_type_to_id` + `id_to_edge_type`
146///
147/// ## Level 3 - Indexes
148/// 5. `label_index`
149/// 6. `node_labels`
150/// 7. `property_indexes`
151///
152/// ## Level 4 - Statistics
153/// 8. `statistics`
154///
155/// ## Level 5 - Nested Locks (internal to other structs)
156/// 9. `PropertyStorage::columns` (via `node_properties`/`edge_properties`)
157/// 10. `ChunkedAdjacency::lists` (via `forward_adj`/`backward_adj`)
158///
159/// ## Rules
160/// - Catalog pairs must be acquired together when writing.
161/// - Never hold entity locks while acquiring catalog locks in a different scope.
162/// - Statistics lock is always last.
163/// - Read locks are generally safe, but avoid read-to-write upgrades.
164pub struct LpgStore {
165    /// Configuration.
166    #[allow(dead_code)]
167    config: LpgStoreConfig,
168
169    /// Node records indexed by NodeId, with version chains for MVCC.
170    /// Used when `tiered-storage` feature is disabled.
171    /// Lock order: 1
172    #[cfg(not(feature = "tiered-storage"))]
173    nodes: RwLock<FxHashMap<NodeId, VersionChain<NodeRecord>>>,
174
175    /// Edge records indexed by EdgeId, with version chains for MVCC.
176    /// Used when `tiered-storage` feature is disabled.
177    /// Lock order: 2
178    #[cfg(not(feature = "tiered-storage"))]
179    edges: RwLock<FxHashMap<EdgeId, VersionChain<EdgeRecord>>>,
180
181    // === Tiered Storage Fields (feature-gated) ===
182    /// Arena allocator for hot data storage.
183    /// Data is stored in per-epoch arenas for fast allocation and bulk deallocation.
184    #[cfg(feature = "tiered-storage")]
185    arena_allocator: Arc<ArenaAllocator>,
186
187    /// Node version indexes - store metadata and arena offsets.
188    /// The actual NodeRecord data is stored in the arena.
189    /// Lock order: 1
190    #[cfg(feature = "tiered-storage")]
191    node_versions: RwLock<FxHashMap<NodeId, VersionIndex>>,
192
193    /// Edge version indexes - store metadata and arena offsets.
194    /// The actual EdgeRecord data is stored in the arena.
195    /// Lock order: 2
196    #[cfg(feature = "tiered-storage")]
197    edge_versions: RwLock<FxHashMap<EdgeId, VersionIndex>>,
198
199    /// Cold storage for frozen epochs.
200    /// Contains compressed epoch blocks for historical data.
201    #[cfg(feature = "tiered-storage")]
202    epoch_store: Arc<EpochStore>,
203
204    /// Property storage for nodes.
205    node_properties: PropertyStorage<NodeId>,
206
207    /// Property storage for edges.
208    edge_properties: PropertyStorage<EdgeId>,
209
210    /// Label name to ID mapping.
211    /// Lock order: 3 (acquire with id_to_label)
212    label_to_id: RwLock<FxHashMap<ArcStr, u32>>,
213
214    /// Label ID to name mapping.
215    /// Lock order: 3 (acquire with label_to_id)
216    id_to_label: RwLock<Vec<ArcStr>>,
217
218    /// Edge type name to ID mapping.
219    /// Lock order: 4 (acquire with id_to_edge_type)
220    edge_type_to_id: RwLock<FxHashMap<ArcStr, u32>>,
221
222    /// Edge type ID to name mapping.
223    /// Lock order: 4 (acquire with edge_type_to_id)
224    id_to_edge_type: RwLock<Vec<ArcStr>>,
225
226    /// Forward adjacency lists (outgoing edges).
227    forward_adj: ChunkedAdjacency,
228
229    /// Backward adjacency lists (incoming edges).
230    /// Only populated if config.backward_edges is true.
231    backward_adj: Option<ChunkedAdjacency>,
232
233    /// Label index: label_id -> set of node IDs.
234    /// Lock order: 5
235    label_index: RwLock<Vec<FxHashMap<NodeId, ()>>>,
236
237    /// Node labels: node_id -> set of label IDs.
238    /// Reverse mapping to efficiently get labels for a node.
239    /// Lock order: 6
240    node_labels: RwLock<FxHashMap<NodeId, FxHashSet<u32>>>,
241
242    /// Property indexes: property_key -> (value -> set of node IDs).
243    ///
244    /// When a property is indexed, lookups by value are O(1) instead of O(n).
245    /// Use [`create_property_index`] to enable indexing for a property.
246    /// Lock order: 7
247    property_indexes: RwLock<FxHashMap<PropertyKey, DashMap<HashableValue, FxHashSet<NodeId>>>>,
248
249    /// Next node ID.
250    next_node_id: AtomicU64,
251
252    /// Next edge ID.
253    next_edge_id: AtomicU64,
254
255    /// Current epoch.
256    current_epoch: AtomicU64,
257
258    /// Statistics for cost-based optimization.
259    /// Lock order: 8 (always last)
260    statistics: RwLock<Statistics>,
261}
262
263impl LpgStore {
264    /// Creates a new LPG store with default configuration.
265    #[must_use]
266    pub fn new() -> Self {
267        Self::with_config(LpgStoreConfig::default())
268    }
269
270    /// Creates a new LPG store with custom configuration.
271    #[must_use]
272    pub fn with_config(config: LpgStoreConfig) -> Self {
273        let backward_adj = if config.backward_edges {
274            Some(ChunkedAdjacency::new())
275        } else {
276            None
277        };
278
279        Self {
280            #[cfg(not(feature = "tiered-storage"))]
281            nodes: RwLock::new(FxHashMap::default()),
282            #[cfg(not(feature = "tiered-storage"))]
283            edges: RwLock::new(FxHashMap::default()),
284            #[cfg(feature = "tiered-storage")]
285            arena_allocator: Arc::new(ArenaAllocator::new()),
286            #[cfg(feature = "tiered-storage")]
287            node_versions: RwLock::new(FxHashMap::default()),
288            #[cfg(feature = "tiered-storage")]
289            edge_versions: RwLock::new(FxHashMap::default()),
290            #[cfg(feature = "tiered-storage")]
291            epoch_store: Arc::new(EpochStore::new()),
292            node_properties: PropertyStorage::new(),
293            edge_properties: PropertyStorage::new(),
294            label_to_id: RwLock::new(FxHashMap::default()),
295            id_to_label: RwLock::new(Vec::new()),
296            edge_type_to_id: RwLock::new(FxHashMap::default()),
297            id_to_edge_type: RwLock::new(Vec::new()),
298            forward_adj: ChunkedAdjacency::new(),
299            backward_adj,
300            label_index: RwLock::new(Vec::new()),
301            node_labels: RwLock::new(FxHashMap::default()),
302            property_indexes: RwLock::new(FxHashMap::default()),
303            next_node_id: AtomicU64::new(0),
304            next_edge_id: AtomicU64::new(0),
305            current_epoch: AtomicU64::new(0),
306            statistics: RwLock::new(Statistics::new()),
307            config,
308        }
309    }
310
311    /// Returns the current epoch.
312    #[must_use]
313    pub fn current_epoch(&self) -> EpochId {
314        EpochId::new(self.current_epoch.load(Ordering::Acquire))
315    }
316
317    /// Creates a new epoch.
318    pub fn new_epoch(&self) -> EpochId {
319        let id = self.current_epoch.fetch_add(1, Ordering::AcqRel) + 1;
320        EpochId::new(id)
321    }
322
323    // === Node Operations ===
324
325    /// Creates a new node with the given labels.
326    ///
327    /// Uses the system transaction for non-transactional operations.
328    pub fn create_node(&self, labels: &[&str]) -> NodeId {
329        self.create_node_versioned(labels, self.current_epoch(), TxId::SYSTEM)
330    }
331
332    /// Creates a new node with the given labels within a transaction context.
333    #[cfg(not(feature = "tiered-storage"))]
334    pub fn create_node_versioned(&self, labels: &[&str], epoch: EpochId, tx_id: TxId) -> NodeId {
335        let id = NodeId::new(self.next_node_id.fetch_add(1, Ordering::Relaxed));
336
337        let mut record = NodeRecord::new(id, epoch);
338        record.set_label_count(labels.len() as u16);
339
340        // Store labels in node_labels map and label_index
341        let mut node_label_set = FxHashSet::default();
342        for label in labels {
343            let label_id = self.get_or_create_label_id(*label);
344            node_label_set.insert(label_id);
345
346            // Update label index
347            let mut index = self.label_index.write();
348            while index.len() <= label_id as usize {
349                index.push(FxHashMap::default());
350            }
351            index[label_id as usize].insert(id, ());
352        }
353
354        // Store node's labels
355        self.node_labels.write().insert(id, node_label_set);
356
357        // Create version chain with initial version
358        let chain = VersionChain::with_initial(record, epoch, tx_id);
359        self.nodes.write().insert(id, chain);
360        id
361    }
362
363    /// Creates a new node with the given labels within a transaction context.
364    /// (Tiered storage version: stores data in arena, metadata in VersionIndex)
365    #[cfg(feature = "tiered-storage")]
366    pub fn create_node_versioned(&self, labels: &[&str], epoch: EpochId, tx_id: TxId) -> NodeId {
367        let id = NodeId::new(self.next_node_id.fetch_add(1, Ordering::Relaxed));
368
369        let mut record = NodeRecord::new(id, epoch);
370        record.set_label_count(labels.len() as u16);
371
372        // Store labels in node_labels map and label_index
373        let mut node_label_set = FxHashSet::default();
374        for label in labels {
375            let label_id = self.get_or_create_label_id(*label);
376            node_label_set.insert(label_id);
377
378            // Update label index
379            let mut index = self.label_index.write();
380            while index.len() <= label_id as usize {
381                index.push(FxHashMap::default());
382            }
383            index[label_id as usize].insert(id, ());
384        }
385
386        // Store node's labels
387        self.node_labels.write().insert(id, node_label_set);
388
389        // Allocate record in arena and get offset (create epoch if needed)
390        let arena = self.arena_allocator.arena_or_create(epoch);
391        let (offset, _stored) = arena.alloc_value_with_offset(record);
392
393        // Create HotVersionRef pointing to arena data
394        let hot_ref = HotVersionRef::new(epoch, offset, tx_id);
395
396        // Create or update version index
397        let mut versions = self.node_versions.write();
398        if let Some(index) = versions.get_mut(&id) {
399            index.add_hot(hot_ref);
400        } else {
401            versions.insert(id, VersionIndex::with_initial(hot_ref));
402        }
403
404        id
405    }
406
407    /// Creates a new node with labels and properties.
408    pub fn create_node_with_props(
409        &self,
410        labels: &[&str],
411        properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
412    ) -> NodeId {
413        self.create_node_with_props_versioned(
414            labels,
415            properties,
416            self.current_epoch(),
417            TxId::SYSTEM,
418        )
419    }
420
421    /// Creates a new node with labels and properties within a transaction context.
422    #[cfg(not(feature = "tiered-storage"))]
423    pub fn create_node_with_props_versioned(
424        &self,
425        labels: &[&str],
426        properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
427        epoch: EpochId,
428        tx_id: TxId,
429    ) -> NodeId {
430        let id = self.create_node_versioned(labels, epoch, tx_id);
431
432        for (key, value) in properties {
433            self.node_properties.set(id, key.into(), value.into());
434        }
435
436        // Update props_count in record
437        let count = self.node_properties.get_all(id).len() as u16;
438        if let Some(chain) = self.nodes.write().get_mut(&id) {
439            if let Some(record) = chain.latest_mut() {
440                record.props_count = count;
441            }
442        }
443
444        id
445    }
446
447    /// Creates a new node with labels and properties within a transaction context.
448    /// (Tiered storage version)
449    #[cfg(feature = "tiered-storage")]
450    pub fn create_node_with_props_versioned(
451        &self,
452        labels: &[&str],
453        properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
454        epoch: EpochId,
455        tx_id: TxId,
456    ) -> NodeId {
457        let id = self.create_node_versioned(labels, epoch, tx_id);
458
459        for (key, value) in properties {
460            self.node_properties.set(id, key.into(), value.into());
461        }
462
463        // Note: props_count in record is not updated for tiered storage.
464        // The record is immutable once allocated in the arena.
465
466        id
467    }
468
469    /// Gets a node by ID (latest visible version).
470    #[must_use]
471    pub fn get_node(&self, id: NodeId) -> Option<Node> {
472        self.get_node_at_epoch(id, self.current_epoch())
473    }
474
475    /// Gets a node by ID at a specific epoch.
476    #[must_use]
477    #[cfg(not(feature = "tiered-storage"))]
478    pub fn get_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> Option<Node> {
479        let nodes = self.nodes.read();
480        let chain = nodes.get(&id)?;
481        let record = chain.visible_at(epoch)?;
482
483        if record.is_deleted() {
484            return None;
485        }
486
487        let mut node = Node::new(id);
488
489        // Get labels from node_labels map
490        let id_to_label = self.id_to_label.read();
491        let node_labels = self.node_labels.read();
492        if let Some(label_ids) = node_labels.get(&id) {
493            for &label_id in label_ids {
494                if let Some(label) = id_to_label.get(label_id as usize) {
495                    node.labels.push(label.clone());
496                }
497            }
498        }
499
500        // Get properties
501        node.properties = self.node_properties.get_all(id).into_iter().collect();
502
503        Some(node)
504    }
505
506    /// Gets a node by ID at a specific epoch.
507    /// (Tiered storage version: reads from arena via VersionIndex)
508    #[must_use]
509    #[cfg(feature = "tiered-storage")]
510    pub fn get_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> Option<Node> {
511        let versions = self.node_versions.read();
512        let index = versions.get(&id)?;
513        let version_ref = index.visible_at(epoch)?;
514
515        // Read the record from arena
516        let record = self.read_node_record(&version_ref)?;
517
518        if record.is_deleted() {
519            return None;
520        }
521
522        let mut node = Node::new(id);
523
524        // Get labels from node_labels map
525        let id_to_label = self.id_to_label.read();
526        let node_labels = self.node_labels.read();
527        if let Some(label_ids) = node_labels.get(&id) {
528            for &label_id in label_ids {
529                if let Some(label) = id_to_label.get(label_id as usize) {
530                    node.labels.push(label.clone());
531                }
532            }
533        }
534
535        // Get properties
536        node.properties = self.node_properties.get_all(id).into_iter().collect();
537
538        Some(node)
539    }
540
541    /// Gets a node visible to a specific transaction.
542    #[must_use]
543    #[cfg(not(feature = "tiered-storage"))]
544    pub fn get_node_versioned(&self, id: NodeId, epoch: EpochId, tx_id: TxId) -> Option<Node> {
545        let nodes = self.nodes.read();
546        let chain = nodes.get(&id)?;
547        let record = chain.visible_to(epoch, tx_id)?;
548
549        if record.is_deleted() {
550            return None;
551        }
552
553        let mut node = Node::new(id);
554
555        // Get labels from node_labels map
556        let id_to_label = self.id_to_label.read();
557        let node_labels = self.node_labels.read();
558        if let Some(label_ids) = node_labels.get(&id) {
559            for &label_id in label_ids {
560                if let Some(label) = id_to_label.get(label_id as usize) {
561                    node.labels.push(label.clone());
562                }
563            }
564        }
565
566        // Get properties
567        node.properties = self.node_properties.get_all(id).into_iter().collect();
568
569        Some(node)
570    }
571
572    /// Gets a node visible to a specific transaction.
573    /// (Tiered storage version: reads from arena via VersionIndex)
574    #[must_use]
575    #[cfg(feature = "tiered-storage")]
576    pub fn get_node_versioned(&self, id: NodeId, epoch: EpochId, tx_id: TxId) -> Option<Node> {
577        let versions = self.node_versions.read();
578        let index = versions.get(&id)?;
579        let version_ref = index.visible_to(epoch, tx_id)?;
580
581        // Read the record from arena
582        let record = self.read_node_record(&version_ref)?;
583
584        if record.is_deleted() {
585            return None;
586        }
587
588        let mut node = Node::new(id);
589
590        // Get labels from node_labels map
591        let id_to_label = self.id_to_label.read();
592        let node_labels = self.node_labels.read();
593        if let Some(label_ids) = node_labels.get(&id) {
594            for &label_id in label_ids {
595                if let Some(label) = id_to_label.get(label_id as usize) {
596                    node.labels.push(label.clone());
597                }
598            }
599        }
600
601        // Get properties
602        node.properties = self.node_properties.get_all(id).into_iter().collect();
603
604        Some(node)
605    }
606
607    /// Reads a NodeRecord from arena (hot) or epoch store (cold) using a VersionRef.
608    #[cfg(feature = "tiered-storage")]
609    #[allow(unsafe_code)]
610    fn read_node_record(&self, version_ref: &VersionRef) -> Option<NodeRecord> {
611        match version_ref {
612            VersionRef::Hot(hot_ref) => {
613                let arena = self.arena_allocator.arena(hot_ref.epoch);
614                // SAFETY: The offset was returned by alloc_value_with_offset for a NodeRecord
615                let record: &NodeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
616                Some(record.clone())
617            }
618            VersionRef::Cold(cold_ref) => {
619                // Read from compressed epoch store
620                self.epoch_store
621                    .get_node(cold_ref.epoch, cold_ref.block_offset, cold_ref.length)
622            }
623        }
624    }
625
626    /// Deletes a node and all its edges (using latest epoch).
627    pub fn delete_node(&self, id: NodeId) -> bool {
628        self.delete_node_at_epoch(id, self.current_epoch())
629    }
630
631    /// Deletes a node at a specific epoch.
632    #[cfg(not(feature = "tiered-storage"))]
633    pub fn delete_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> bool {
634        let mut nodes = self.nodes.write();
635        if let Some(chain) = nodes.get_mut(&id) {
636            // Check if visible at this epoch (not already deleted)
637            if let Some(record) = chain.visible_at(epoch) {
638                if record.is_deleted() {
639                    return false;
640                }
641            } else {
642                // Not visible at this epoch (already deleted or doesn't exist)
643                return false;
644            }
645
646            // Mark the version chain as deleted at this epoch
647            chain.mark_deleted(epoch);
648
649            // Remove from label index using node_labels map
650            let mut index = self.label_index.write();
651            let mut node_labels = self.node_labels.write();
652            if let Some(label_ids) = node_labels.remove(&id) {
653                for label_id in label_ids {
654                    if let Some(set) = index.get_mut(label_id as usize) {
655                        set.remove(&id);
656                    }
657                }
658            }
659
660            // Remove properties
661            drop(nodes); // Release lock before removing properties
662            drop(index);
663            drop(node_labels);
664            self.node_properties.remove_all(id);
665
666            // Note: Caller should use delete_node_edges() first if detach is needed
667
668            true
669        } else {
670            false
671        }
672    }
673
674    /// Deletes a node at a specific epoch.
675    /// (Tiered storage version)
676    #[cfg(feature = "tiered-storage")]
677    pub fn delete_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> bool {
678        let mut versions = self.node_versions.write();
679        if let Some(index) = versions.get_mut(&id) {
680            // Check if visible at this epoch
681            if let Some(version_ref) = index.visible_at(epoch) {
682                if let Some(record) = self.read_node_record(&version_ref) {
683                    if record.is_deleted() {
684                        return false;
685                    }
686                } else {
687                    return false;
688                }
689            } else {
690                return false;
691            }
692
693            // Mark as deleted in version index
694            index.mark_deleted(epoch);
695
696            // Remove from label index using node_labels map
697            let mut label_index = self.label_index.write();
698            let mut node_labels = self.node_labels.write();
699            if let Some(label_ids) = node_labels.remove(&id) {
700                for label_id in label_ids {
701                    if let Some(set) = label_index.get_mut(label_id as usize) {
702                        set.remove(&id);
703                    }
704                }
705            }
706
707            // Remove properties
708            drop(versions);
709            drop(label_index);
710            drop(node_labels);
711            self.node_properties.remove_all(id);
712
713            true
714        } else {
715            false
716        }
717    }
718
719    /// Deletes all edges connected to a node (implements DETACH DELETE).
720    ///
721    /// Call this before `delete_node()` if you want to remove a node that
722    /// has edges. Grafeo doesn't auto-delete edges - you have to be explicit.
723    #[cfg(not(feature = "tiered-storage"))]
724    pub fn delete_node_edges(&self, node_id: NodeId) {
725        // Get outgoing edges
726        let outgoing: Vec<EdgeId> = self
727            .forward_adj
728            .edges_from(node_id)
729            .into_iter()
730            .map(|(_, edge_id)| edge_id)
731            .collect();
732
733        // Get incoming edges
734        let incoming: Vec<EdgeId> = if let Some(ref backward) = self.backward_adj {
735            backward
736                .edges_from(node_id)
737                .into_iter()
738                .map(|(_, edge_id)| edge_id)
739                .collect()
740        } else {
741            // No backward adjacency - scan all edges
742            let epoch = self.current_epoch();
743            self.edges
744                .read()
745                .iter()
746                .filter_map(|(id, chain)| {
747                    chain.visible_at(epoch).and_then(|r| {
748                        if !r.is_deleted() && r.dst == node_id {
749                            Some(*id)
750                        } else {
751                            None
752                        }
753                    })
754                })
755                .collect()
756        };
757
758        // Delete all edges
759        for edge_id in outgoing.into_iter().chain(incoming) {
760            self.delete_edge(edge_id);
761        }
762    }
763
764    /// Deletes all edges connected to a node (implements DETACH DELETE).
765    /// (Tiered storage version)
766    #[cfg(feature = "tiered-storage")]
767    pub fn delete_node_edges(&self, node_id: NodeId) {
768        // Get outgoing edges
769        let outgoing: Vec<EdgeId> = self
770            .forward_adj
771            .edges_from(node_id)
772            .into_iter()
773            .map(|(_, edge_id)| edge_id)
774            .collect();
775
776        // Get incoming edges
777        let incoming: Vec<EdgeId> = if let Some(ref backward) = self.backward_adj {
778            backward
779                .edges_from(node_id)
780                .into_iter()
781                .map(|(_, edge_id)| edge_id)
782                .collect()
783        } else {
784            // No backward adjacency - scan all edges
785            let epoch = self.current_epoch();
786            let versions = self.edge_versions.read();
787            versions
788                .iter()
789                .filter_map(|(id, index)| {
790                    index.visible_at(epoch).and_then(|vref| {
791                        self.read_edge_record(&vref).and_then(|r| {
792                            if !r.is_deleted() && r.dst == node_id {
793                                Some(*id)
794                            } else {
795                                None
796                            }
797                        })
798                    })
799                })
800                .collect()
801        };
802
803        // Delete all edges
804        for edge_id in outgoing.into_iter().chain(incoming) {
805            self.delete_edge(edge_id);
806        }
807    }
808
809    /// Sets a property on a node.
810    #[cfg(not(feature = "tiered-storage"))]
811    pub fn set_node_property(&self, id: NodeId, key: &str, value: Value) {
812        let prop_key: PropertyKey = key.into();
813
814        // Update property index before setting the property (needs to read old value)
815        self.update_property_index_on_set(id, &prop_key, &value);
816
817        self.node_properties.set(id, prop_key, value);
818
819        // Update props_count in record
820        let count = self.node_properties.get_all(id).len() as u16;
821        if let Some(chain) = self.nodes.write().get_mut(&id) {
822            if let Some(record) = chain.latest_mut() {
823                record.props_count = count;
824            }
825        }
826    }
827
828    /// Sets a property on a node.
829    /// (Tiered storage version: properties stored separately, record is immutable)
830    #[cfg(feature = "tiered-storage")]
831    pub fn set_node_property(&self, id: NodeId, key: &str, value: Value) {
832        let prop_key: PropertyKey = key.into();
833
834        // Update property index before setting the property (needs to read old value)
835        self.update_property_index_on_set(id, &prop_key, &value);
836
837        self.node_properties.set(id, prop_key, value);
838        // Note: props_count in record is not updated for tiered storage.
839        // The record is immutable once allocated in the arena.
840        // Property count can be derived from PropertyStorage if needed.
841    }
842
843    /// Sets a property on an edge.
844    pub fn set_edge_property(&self, id: EdgeId, key: &str, value: Value) {
845        self.edge_properties.set(id, key.into(), value);
846    }
847
848    /// Removes a property from a node.
849    ///
850    /// Returns the previous value if it existed, or None if the property didn't exist.
851    #[cfg(not(feature = "tiered-storage"))]
852    pub fn remove_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
853        let prop_key: PropertyKey = key.into();
854
855        // Update property index before removing (needs to read old value)
856        self.update_property_index_on_remove(id, &prop_key);
857
858        let result = self.node_properties.remove(id, &prop_key);
859
860        // Update props_count in record
861        let count = self.node_properties.get_all(id).len() as u16;
862        if let Some(chain) = self.nodes.write().get_mut(&id) {
863            if let Some(record) = chain.latest_mut() {
864                record.props_count = count;
865            }
866        }
867
868        result
869    }
870
871    /// Removes a property from a node.
872    /// (Tiered storage version)
873    #[cfg(feature = "tiered-storage")]
874    pub fn remove_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
875        let prop_key: PropertyKey = key.into();
876
877        // Update property index before removing (needs to read old value)
878        self.update_property_index_on_remove(id, &prop_key);
879
880        self.node_properties.remove(id, &prop_key)
881        // Note: props_count in record is not updated for tiered storage.
882    }
883
884    /// Removes a property from an edge.
885    ///
886    /// Returns the previous value if it existed, or None if the property didn't exist.
887    pub fn remove_edge_property(&self, id: EdgeId, key: &str) -> Option<Value> {
888        self.edge_properties.remove(id, &key.into())
889    }
890
891    /// Gets a single property from a node without loading all properties.
892    ///
893    /// This is O(1) vs O(properties) for `get_node().get_property()`.
894    /// Use this for filter predicates where you only need one property value.
895    ///
896    /// # Example
897    ///
898    /// ```ignore
899    /// // Fast: Direct single-property lookup
900    /// let age = store.get_node_property(node_id, "age");
901    ///
902    /// // Slow: Loads all properties, then extracts one
903    /// let age = store.get_node(node_id).and_then(|n| n.get_property("age").cloned());
904    /// ```
905    #[must_use]
906    pub fn get_node_property(&self, id: NodeId, key: &PropertyKey) -> Option<Value> {
907        self.node_properties.get(id, key)
908    }
909
910    /// Gets a single property from an edge without loading all properties.
911    ///
912    /// This is O(1) vs O(properties) for `get_edge().get_property()`.
913    #[must_use]
914    pub fn get_edge_property(&self, id: EdgeId, key: &PropertyKey) -> Option<Value> {
915        self.edge_properties.get(id, key)
916    }
917
918    // === Batch Property Operations ===
919
920    /// Gets a property for multiple nodes in a single batch operation.
921    ///
922    /// More efficient than calling [`Self::get_node_property`] in a loop because it
923    /// reduces lock overhead and enables better cache utilization.
924    ///
925    /// # Example
926    ///
927    /// ```
928    /// use grafeo_core::graph::lpg::LpgStore;
929    /// use grafeo_common::types::{NodeId, PropertyKey, Value};
930    ///
931    /// let store = LpgStore::new();
932    /// let n1 = store.create_node(&["Person"]);
933    /// let n2 = store.create_node(&["Person"]);
934    /// store.set_node_property(n1, "age", Value::from(25i64));
935    /// store.set_node_property(n2, "age", Value::from(30i64));
936    ///
937    /// let ages = store.get_node_property_batch(&[n1, n2], &PropertyKey::new("age"));
938    /// assert_eq!(ages, vec![Some(Value::from(25i64)), Some(Value::from(30i64))]);
939    /// ```
940    #[must_use]
941    pub fn get_node_property_batch(&self, ids: &[NodeId], key: &PropertyKey) -> Vec<Option<Value>> {
942        self.node_properties.get_batch(ids, key)
943    }
944
945    /// Gets all properties for multiple nodes in a single batch operation.
946    ///
947    /// Returns a vector of property maps, one per node ID (empty map if no properties).
948    /// More efficient than calling [`Self::get_node`] in a loop.
949    #[must_use]
950    pub fn get_nodes_properties_batch(&self, ids: &[NodeId]) -> Vec<FxHashMap<PropertyKey, Value>> {
951        self.node_properties.get_all_batch(ids)
952    }
953
954    /// Finds nodes where a property value is in a range.
955    ///
956    /// This is useful for queries like `n.age > 30` or `n.price BETWEEN 10 AND 100`.
957    /// Uses zone maps to skip scanning when the range definitely doesn't match.
958    ///
959    /// # Arguments
960    ///
961    /// * `property` - The property to check
962    /// * `min` - Optional lower bound (None for unbounded)
963    /// * `max` - Optional upper bound (None for unbounded)
964    /// * `min_inclusive` - Whether lower bound is inclusive (>= vs >)
965    /// * `max_inclusive` - Whether upper bound is inclusive (<= vs <)
966    ///
967    /// # Example
968    ///
969    /// ```
970    /// use grafeo_core::graph::lpg::LpgStore;
971    /// use grafeo_common::types::Value;
972    ///
973    /// let store = LpgStore::new();
974    /// let n1 = store.create_node(&["Person"]);
975    /// let n2 = store.create_node(&["Person"]);
976    /// store.set_node_property(n1, "age", Value::from(25i64));
977    /// store.set_node_property(n2, "age", Value::from(35i64));
978    ///
979    /// // Find nodes where age > 30
980    /// let result = store.find_nodes_in_range(
981    ///     "age",
982    ///     Some(&Value::from(30i64)),
983    ///     None,
984    ///     false, // exclusive lower bound
985    ///     true,  // inclusive upper bound (doesn't matter since None)
986    /// );
987    /// assert_eq!(result.len(), 1); // Only n2 matches
988    /// ```
989    #[must_use]
990    pub fn find_nodes_in_range(
991        &self,
992        property: &str,
993        min: Option<&Value>,
994        max: Option<&Value>,
995        min_inclusive: bool,
996        max_inclusive: bool,
997    ) -> Vec<NodeId> {
998        let key = PropertyKey::new(property);
999
1000        // Check zone map first - if no values could match, return empty
1001        if !self
1002            .node_properties
1003            .might_match_range(&key, min, max, min_inclusive, max_inclusive)
1004        {
1005            return Vec::new();
1006        }
1007
1008        // Scan all nodes and filter by range
1009        self.node_ids()
1010            .into_iter()
1011            .filter(|&node_id| {
1012                self.node_properties
1013                    .get(node_id, &key)
1014                    .is_some_and(|v| value_in_range(&v, min, max, min_inclusive, max_inclusive))
1015            })
1016            .collect()
1017    }
1018
1019    /// Finds nodes matching multiple property equality conditions.
1020    ///
1021    /// This is more efficient than intersecting multiple single-property lookups
1022    /// because it can use indexes when available and short-circuits on the first
1023    /// miss.
1024    ///
1025    /// # Example
1026    ///
1027    /// ```
1028    /// use grafeo_core::graph::lpg::LpgStore;
1029    /// use grafeo_common::types::Value;
1030    ///
1031    /// let store = LpgStore::new();
1032    /// let alice = store.create_node(&["Person"]);
1033    /// store.set_node_property(alice, "name", Value::from("Alice"));
1034    /// store.set_node_property(alice, "city", Value::from("NYC"));
1035    ///
1036    /// // Find nodes where name = "Alice" AND city = "NYC"
1037    /// let matches = store.find_nodes_by_properties(&[
1038    ///     ("name", Value::from("Alice")),
1039    ///     ("city", Value::from("NYC")),
1040    /// ]);
1041    /// assert!(matches.contains(&alice));
1042    /// ```
1043    #[must_use]
1044    pub fn find_nodes_by_properties(&self, conditions: &[(&str, Value)]) -> Vec<NodeId> {
1045        if conditions.is_empty() {
1046            return self.node_ids();
1047        }
1048
1049        // Find the most selective condition (smallest result set) to start
1050        // If any condition has an index, use that first
1051        let mut best_start: Option<(usize, Vec<NodeId>)> = None;
1052        let indexes = self.property_indexes.read();
1053
1054        for (i, (prop, value)) in conditions.iter().enumerate() {
1055            let key = PropertyKey::new(*prop);
1056            let hv = HashableValue::new(value.clone());
1057
1058            if let Some(index) = indexes.get(&key) {
1059                let matches: Vec<NodeId> = index
1060                    .get(&hv)
1061                    .map(|nodes| nodes.iter().copied().collect())
1062                    .unwrap_or_default();
1063
1064                // Short-circuit if any indexed condition has no matches
1065                if matches.is_empty() {
1066                    return Vec::new();
1067                }
1068
1069                // Use smallest indexed result as starting point
1070                if best_start
1071                    .as_ref()
1072                    .is_none_or(|(_, best)| matches.len() < best.len())
1073                {
1074                    best_start = Some((i, matches));
1075                }
1076            }
1077        }
1078        drop(indexes);
1079
1080        // Start from best indexed result or fall back to full node scan
1081        let (start_idx, mut candidates) = best_start.unwrap_or_else(|| {
1082            // No indexes available, start with first condition via full scan
1083            let (prop, value) = &conditions[0];
1084            (0, self.find_nodes_by_property(prop, value))
1085        });
1086
1087        // Filter candidates through remaining conditions
1088        for (i, (prop, value)) in conditions.iter().enumerate() {
1089            if i == start_idx {
1090                continue;
1091            }
1092
1093            let key = PropertyKey::new(*prop);
1094            candidates.retain(|&node_id| {
1095                self.node_properties
1096                    .get(node_id, &key)
1097                    .is_some_and(|v| v == *value)
1098            });
1099
1100            // Short-circuit if no candidates remain
1101            if candidates.is_empty() {
1102                return Vec::new();
1103            }
1104        }
1105
1106        candidates
1107    }
1108
1109    // === Property Index Operations ===
1110
1111    /// Creates an index on a node property for O(1) lookups by value.
1112    ///
1113    /// After creating an index, calls to [`Self::find_nodes_by_property`] will be
1114    /// O(1) instead of O(n) for this property. The index is automatically
1115    /// maintained when properties are set or removed.
1116    ///
1117    /// # Example
1118    ///
1119    /// ```
1120    /// use grafeo_core::graph::lpg::LpgStore;
1121    /// use grafeo_common::types::Value;
1122    ///
1123    /// let store = LpgStore::new();
1124    ///
1125    /// // Create nodes with an 'id' property
1126    /// let alice = store.create_node(&["Person"]);
1127    /// store.set_node_property(alice, "id", Value::from("alice_123"));
1128    ///
1129    /// // Create an index on the 'id' property
1130    /// store.create_property_index("id");
1131    ///
1132    /// // Now lookups by 'id' are O(1)
1133    /// let found = store.find_nodes_by_property("id", &Value::from("alice_123"));
1134    /// assert!(found.contains(&alice));
1135    /// ```
1136    pub fn create_property_index(&self, property: &str) {
1137        let key = PropertyKey::new(property);
1138
1139        let mut indexes = self.property_indexes.write();
1140        if indexes.contains_key(&key) {
1141            return; // Already indexed
1142        }
1143
1144        // Create the index and populate it with existing data
1145        let index: DashMap<HashableValue, FxHashSet<NodeId>> = DashMap::new();
1146
1147        // Scan all nodes to build the index
1148        for node_id in self.node_ids() {
1149            if let Some(value) = self.node_properties.get(node_id, &key) {
1150                let hv = HashableValue::new(value);
1151                index
1152                    .entry(hv)
1153                    .or_insert_with(FxHashSet::default)
1154                    .insert(node_id);
1155            }
1156        }
1157
1158        indexes.insert(key, index);
1159    }
1160
1161    /// Drops an index on a node property.
1162    ///
1163    /// Returns `true` if the index existed and was removed.
1164    pub fn drop_property_index(&self, property: &str) -> bool {
1165        let key = PropertyKey::new(property);
1166        self.property_indexes.write().remove(&key).is_some()
1167    }
1168
1169    /// Returns `true` if the property has an index.
1170    #[must_use]
1171    pub fn has_property_index(&self, property: &str) -> bool {
1172        let key = PropertyKey::new(property);
1173        self.property_indexes.read().contains_key(&key)
1174    }
1175
1176    /// Finds all nodes that have a specific property value.
1177    ///
1178    /// If the property is indexed, this is O(1). Otherwise, it scans all nodes
1179    /// which is O(n). Use [`Self::create_property_index`] for frequently queried properties.
1180    ///
1181    /// # Example
1182    ///
1183    /// ```
1184    /// use grafeo_core::graph::lpg::LpgStore;
1185    /// use grafeo_common::types::Value;
1186    ///
1187    /// let store = LpgStore::new();
1188    /// store.create_property_index("city"); // Optional but makes lookups fast
1189    ///
1190    /// let alice = store.create_node(&["Person"]);
1191    /// let bob = store.create_node(&["Person"]);
1192    /// store.set_node_property(alice, "city", Value::from("NYC"));
1193    /// store.set_node_property(bob, "city", Value::from("NYC"));
1194    ///
1195    /// let nyc_people = store.find_nodes_by_property("city", &Value::from("NYC"));
1196    /// assert_eq!(nyc_people.len(), 2);
1197    /// ```
1198    #[must_use]
1199    pub fn find_nodes_by_property(&self, property: &str, value: &Value) -> Vec<NodeId> {
1200        let key = PropertyKey::new(property);
1201        let hv = HashableValue::new(value.clone());
1202
1203        // Try indexed lookup first
1204        let indexes = self.property_indexes.read();
1205        if let Some(index) = indexes.get(&key) {
1206            if let Some(nodes) = index.get(&hv) {
1207                return nodes.iter().copied().collect();
1208            }
1209            return Vec::new();
1210        }
1211        drop(indexes);
1212
1213        // Fall back to full scan
1214        self.node_ids()
1215            .into_iter()
1216            .filter(|&node_id| {
1217                self.node_properties
1218                    .get(node_id, &key)
1219                    .is_some_and(|v| v == *value)
1220            })
1221            .collect()
1222    }
1223
1224    /// Updates property indexes when a property is set.
1225    fn update_property_index_on_set(&self, node_id: NodeId, key: &PropertyKey, new_value: &Value) {
1226        let indexes = self.property_indexes.read();
1227        if let Some(index) = indexes.get(key) {
1228            // Get old value to remove from index
1229            if let Some(old_value) = self.node_properties.get(node_id, key) {
1230                let old_hv = HashableValue::new(old_value);
1231                if let Some(mut nodes) = index.get_mut(&old_hv) {
1232                    nodes.remove(&node_id);
1233                    if nodes.is_empty() {
1234                        drop(nodes);
1235                        index.remove(&old_hv);
1236                    }
1237                }
1238            }
1239
1240            // Add new value to index
1241            let new_hv = HashableValue::new(new_value.clone());
1242            index
1243                .entry(new_hv)
1244                .or_insert_with(FxHashSet::default)
1245                .insert(node_id);
1246        }
1247    }
1248
1249    /// Updates property indexes when a property is removed.
1250    fn update_property_index_on_remove(&self, node_id: NodeId, key: &PropertyKey) {
1251        let indexes = self.property_indexes.read();
1252        if let Some(index) = indexes.get(key) {
1253            // Get old value to remove from index
1254            if let Some(old_value) = self.node_properties.get(node_id, key) {
1255                let old_hv = HashableValue::new(old_value);
1256                if let Some(mut nodes) = index.get_mut(&old_hv) {
1257                    nodes.remove(&node_id);
1258                    if nodes.is_empty() {
1259                        drop(nodes);
1260                        index.remove(&old_hv);
1261                    }
1262                }
1263            }
1264        }
1265    }
1266
1267    /// Adds a label to a node.
1268    ///
1269    /// Returns true if the label was added, false if the node doesn't exist
1270    /// or already has the label.
1271    #[cfg(not(feature = "tiered-storage"))]
1272    pub fn add_label(&self, node_id: NodeId, label: &str) -> bool {
1273        let epoch = self.current_epoch();
1274
1275        // Check if node exists
1276        let nodes = self.nodes.read();
1277        if let Some(chain) = nodes.get(&node_id) {
1278            if chain.visible_at(epoch).map_or(true, |r| r.is_deleted()) {
1279                return false;
1280            }
1281        } else {
1282            return false;
1283        }
1284        drop(nodes);
1285
1286        // Get or create label ID
1287        let label_id = self.get_or_create_label_id(label);
1288
1289        // Add to node_labels map
1290        let mut node_labels = self.node_labels.write();
1291        let label_set = node_labels
1292            .entry(node_id)
1293            .or_insert_with(FxHashSet::default);
1294
1295        if label_set.contains(&label_id) {
1296            return false; // Already has this label
1297        }
1298
1299        label_set.insert(label_id);
1300        drop(node_labels);
1301
1302        // Add to label_index
1303        let mut index = self.label_index.write();
1304        if (label_id as usize) >= index.len() {
1305            index.resize(label_id as usize + 1, FxHashMap::default());
1306        }
1307        index[label_id as usize].insert(node_id, ());
1308
1309        // Update label count in node record
1310        if let Some(chain) = self.nodes.write().get_mut(&node_id) {
1311            if let Some(record) = chain.latest_mut() {
1312                let count = self.node_labels.read().get(&node_id).map_or(0, |s| s.len());
1313                record.set_label_count(count as u16);
1314            }
1315        }
1316
1317        true
1318    }
1319
1320    /// Adds a label to a node.
1321    /// (Tiered storage version)
1322    #[cfg(feature = "tiered-storage")]
1323    pub fn add_label(&self, node_id: NodeId, label: &str) -> bool {
1324        let epoch = self.current_epoch();
1325
1326        // Check if node exists
1327        let versions = self.node_versions.read();
1328        if let Some(index) = versions.get(&node_id) {
1329            if let Some(vref) = index.visible_at(epoch) {
1330                if let Some(record) = self.read_node_record(&vref) {
1331                    if record.is_deleted() {
1332                        return false;
1333                    }
1334                } else {
1335                    return false;
1336                }
1337            } else {
1338                return false;
1339            }
1340        } else {
1341            return false;
1342        }
1343        drop(versions);
1344
1345        // Get or create label ID
1346        let label_id = self.get_or_create_label_id(label);
1347
1348        // Add to node_labels map
1349        let mut node_labels = self.node_labels.write();
1350        let label_set = node_labels
1351            .entry(node_id)
1352            .or_insert_with(FxHashSet::default);
1353
1354        if label_set.contains(&label_id) {
1355            return false; // Already has this label
1356        }
1357
1358        label_set.insert(label_id);
1359        drop(node_labels);
1360
1361        // Add to label_index
1362        let mut index = self.label_index.write();
1363        if (label_id as usize) >= index.len() {
1364            index.resize(label_id as usize + 1, FxHashMap::default());
1365        }
1366        index[label_id as usize].insert(node_id, ());
1367
1368        // Note: label_count in record is not updated for tiered storage.
1369        // The record is immutable once allocated in the arena.
1370
1371        true
1372    }
1373
1374    /// Removes a label from a node.
1375    ///
1376    /// Returns true if the label was removed, false if the node doesn't exist
1377    /// or doesn't have the label.
1378    #[cfg(not(feature = "tiered-storage"))]
1379    pub fn remove_label(&self, node_id: NodeId, label: &str) -> bool {
1380        let epoch = self.current_epoch();
1381
1382        // Check if node exists
1383        let nodes = self.nodes.read();
1384        if let Some(chain) = nodes.get(&node_id) {
1385            if chain.visible_at(epoch).map_or(true, |r| r.is_deleted()) {
1386                return false;
1387            }
1388        } else {
1389            return false;
1390        }
1391        drop(nodes);
1392
1393        // Get label ID
1394        let label_id = {
1395            let label_ids = self.label_to_id.read();
1396            match label_ids.get(label) {
1397                Some(&id) => id,
1398                None => return false, // Label doesn't exist
1399            }
1400        };
1401
1402        // Remove from node_labels map
1403        let mut node_labels = self.node_labels.write();
1404        if let Some(label_set) = node_labels.get_mut(&node_id) {
1405            if !label_set.remove(&label_id) {
1406                return false; // Node doesn't have this label
1407            }
1408        } else {
1409            return false;
1410        }
1411        drop(node_labels);
1412
1413        // Remove from label_index
1414        let mut index = self.label_index.write();
1415        if (label_id as usize) < index.len() {
1416            index[label_id as usize].remove(&node_id);
1417        }
1418
1419        // Update label count in node record
1420        if let Some(chain) = self.nodes.write().get_mut(&node_id) {
1421            if let Some(record) = chain.latest_mut() {
1422                let count = self.node_labels.read().get(&node_id).map_or(0, |s| s.len());
1423                record.set_label_count(count as u16);
1424            }
1425        }
1426
1427        true
1428    }
1429
1430    /// Removes a label from a node.
1431    /// (Tiered storage version)
1432    #[cfg(feature = "tiered-storage")]
1433    pub fn remove_label(&self, node_id: NodeId, label: &str) -> bool {
1434        let epoch = self.current_epoch();
1435
1436        // Check if node exists
1437        let versions = self.node_versions.read();
1438        if let Some(index) = versions.get(&node_id) {
1439            if let Some(vref) = index.visible_at(epoch) {
1440                if let Some(record) = self.read_node_record(&vref) {
1441                    if record.is_deleted() {
1442                        return false;
1443                    }
1444                } else {
1445                    return false;
1446                }
1447            } else {
1448                return false;
1449            }
1450        } else {
1451            return false;
1452        }
1453        drop(versions);
1454
1455        // Get label ID
1456        let label_id = {
1457            let label_ids = self.label_to_id.read();
1458            match label_ids.get(label) {
1459                Some(&id) => id,
1460                None => return false, // Label doesn't exist
1461            }
1462        };
1463
1464        // Remove from node_labels map
1465        let mut node_labels = self.node_labels.write();
1466        if let Some(label_set) = node_labels.get_mut(&node_id) {
1467            if !label_set.remove(&label_id) {
1468                return false; // Node doesn't have this label
1469            }
1470        } else {
1471            return false;
1472        }
1473        drop(node_labels);
1474
1475        // Remove from label_index
1476        let mut index = self.label_index.write();
1477        if (label_id as usize) < index.len() {
1478            index[label_id as usize].remove(&node_id);
1479        }
1480
1481        // Note: label_count in record is not updated for tiered storage.
1482
1483        true
1484    }
1485
1486    /// Returns the number of nodes (non-deleted at current epoch).
1487    #[must_use]
1488    #[cfg(not(feature = "tiered-storage"))]
1489    pub fn node_count(&self) -> usize {
1490        let epoch = self.current_epoch();
1491        self.nodes
1492            .read()
1493            .values()
1494            .filter_map(|chain| chain.visible_at(epoch))
1495            .filter(|r| !r.is_deleted())
1496            .count()
1497    }
1498
1499    /// Returns the number of nodes (non-deleted at current epoch).
1500    /// (Tiered storage version)
1501    #[must_use]
1502    #[cfg(feature = "tiered-storage")]
1503    pub fn node_count(&self) -> usize {
1504        let epoch = self.current_epoch();
1505        let versions = self.node_versions.read();
1506        versions
1507            .iter()
1508            .filter(|(_, index)| {
1509                index.visible_at(epoch).map_or(false, |vref| {
1510                    self.read_node_record(&vref)
1511                        .map_or(false, |r| !r.is_deleted())
1512                })
1513            })
1514            .count()
1515    }
1516
1517    /// Returns all node IDs in the store.
1518    ///
1519    /// This returns a snapshot of current node IDs. The returned vector
1520    /// excludes deleted nodes. Results are sorted by NodeId for deterministic
1521    /// iteration order.
1522    #[must_use]
1523    #[cfg(not(feature = "tiered-storage"))]
1524    pub fn node_ids(&self) -> Vec<NodeId> {
1525        let epoch = self.current_epoch();
1526        let mut ids: Vec<NodeId> = self
1527            .nodes
1528            .read()
1529            .iter()
1530            .filter_map(|(id, chain)| {
1531                chain
1532                    .visible_at(epoch)
1533                    .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
1534            })
1535            .collect();
1536        ids.sort_unstable();
1537        ids
1538    }
1539
1540    /// Returns all node IDs in the store.
1541    /// (Tiered storage version)
1542    #[must_use]
1543    #[cfg(feature = "tiered-storage")]
1544    pub fn node_ids(&self) -> Vec<NodeId> {
1545        let epoch = self.current_epoch();
1546        let versions = self.node_versions.read();
1547        let mut ids: Vec<NodeId> = versions
1548            .iter()
1549            .filter_map(|(id, index)| {
1550                index.visible_at(epoch).and_then(|vref| {
1551                    self.read_node_record(&vref)
1552                        .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
1553                })
1554            })
1555            .collect();
1556        ids.sort_unstable();
1557        ids
1558    }
1559
1560    // === Edge Operations ===
1561
1562    /// Creates a new edge.
1563    pub fn create_edge(&self, src: NodeId, dst: NodeId, edge_type: &str) -> EdgeId {
1564        self.create_edge_versioned(src, dst, edge_type, self.current_epoch(), TxId::SYSTEM)
1565    }
1566
1567    /// Creates a new edge within a transaction context.
1568    #[cfg(not(feature = "tiered-storage"))]
1569    pub fn create_edge_versioned(
1570        &self,
1571        src: NodeId,
1572        dst: NodeId,
1573        edge_type: &str,
1574        epoch: EpochId,
1575        tx_id: TxId,
1576    ) -> EdgeId {
1577        let id = EdgeId::new(self.next_edge_id.fetch_add(1, Ordering::Relaxed));
1578        let type_id = self.get_or_create_edge_type_id(edge_type);
1579
1580        let record = EdgeRecord::new(id, src, dst, type_id, epoch);
1581        let chain = VersionChain::with_initial(record, epoch, tx_id);
1582        self.edges.write().insert(id, chain);
1583
1584        // Update adjacency
1585        self.forward_adj.add_edge(src, dst, id);
1586        if let Some(ref backward) = self.backward_adj {
1587            backward.add_edge(dst, src, id);
1588        }
1589
1590        id
1591    }
1592
1593    /// Creates a new edge within a transaction context.
1594    /// (Tiered storage version)
1595    #[cfg(feature = "tiered-storage")]
1596    pub fn create_edge_versioned(
1597        &self,
1598        src: NodeId,
1599        dst: NodeId,
1600        edge_type: &str,
1601        epoch: EpochId,
1602        tx_id: TxId,
1603    ) -> EdgeId {
1604        let id = EdgeId::new(self.next_edge_id.fetch_add(1, Ordering::Relaxed));
1605        let type_id = self.get_or_create_edge_type_id(edge_type);
1606
1607        let record = EdgeRecord::new(id, src, dst, type_id, epoch);
1608
1609        // Allocate record in arena and get offset (create epoch if needed)
1610        let arena = self.arena_allocator.arena_or_create(epoch);
1611        let (offset, _stored) = arena.alloc_value_with_offset(record);
1612
1613        // Create HotVersionRef pointing to arena data
1614        let hot_ref = HotVersionRef::new(epoch, offset, tx_id);
1615
1616        // Create or update version index
1617        let mut versions = self.edge_versions.write();
1618        if let Some(index) = versions.get_mut(&id) {
1619            index.add_hot(hot_ref);
1620        } else {
1621            versions.insert(id, VersionIndex::with_initial(hot_ref));
1622        }
1623
1624        // Update adjacency
1625        self.forward_adj.add_edge(src, dst, id);
1626        if let Some(ref backward) = self.backward_adj {
1627            backward.add_edge(dst, src, id);
1628        }
1629
1630        id
1631    }
1632
1633    /// Creates a new edge with properties.
1634    pub fn create_edge_with_props(
1635        &self,
1636        src: NodeId,
1637        dst: NodeId,
1638        edge_type: &str,
1639        properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
1640    ) -> EdgeId {
1641        let id = self.create_edge(src, dst, edge_type);
1642
1643        for (key, value) in properties {
1644            self.edge_properties.set(id, key.into(), value.into());
1645        }
1646
1647        id
1648    }
1649
1650    /// Gets an edge by ID (latest visible version).
1651    #[must_use]
1652    pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
1653        self.get_edge_at_epoch(id, self.current_epoch())
1654    }
1655
1656    /// Gets an edge by ID at a specific epoch.
1657    #[must_use]
1658    #[cfg(not(feature = "tiered-storage"))]
1659    pub fn get_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> Option<Edge> {
1660        let edges = self.edges.read();
1661        let chain = edges.get(&id)?;
1662        let record = chain.visible_at(epoch)?;
1663
1664        if record.is_deleted() {
1665            return None;
1666        }
1667
1668        let edge_type = {
1669            let id_to_type = self.id_to_edge_type.read();
1670            id_to_type.get(record.type_id as usize)?.clone()
1671        };
1672
1673        let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1674
1675        // Get properties
1676        edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1677
1678        Some(edge)
1679    }
1680
1681    /// Gets an edge by ID at a specific epoch.
1682    /// (Tiered storage version)
1683    #[must_use]
1684    #[cfg(feature = "tiered-storage")]
1685    pub fn get_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> Option<Edge> {
1686        let versions = self.edge_versions.read();
1687        let index = versions.get(&id)?;
1688        let version_ref = index.visible_at(epoch)?;
1689
1690        let record = self.read_edge_record(&version_ref)?;
1691
1692        if record.is_deleted() {
1693            return None;
1694        }
1695
1696        let edge_type = {
1697            let id_to_type = self.id_to_edge_type.read();
1698            id_to_type.get(record.type_id as usize)?.clone()
1699        };
1700
1701        let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1702
1703        // Get properties
1704        edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1705
1706        Some(edge)
1707    }
1708
1709    /// Gets an edge visible to a specific transaction.
1710    #[must_use]
1711    #[cfg(not(feature = "tiered-storage"))]
1712    pub fn get_edge_versioned(&self, id: EdgeId, epoch: EpochId, tx_id: TxId) -> Option<Edge> {
1713        let edges = self.edges.read();
1714        let chain = edges.get(&id)?;
1715        let record = chain.visible_to(epoch, tx_id)?;
1716
1717        if record.is_deleted() {
1718            return None;
1719        }
1720
1721        let edge_type = {
1722            let id_to_type = self.id_to_edge_type.read();
1723            id_to_type.get(record.type_id as usize)?.clone()
1724        };
1725
1726        let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1727
1728        // Get properties
1729        edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1730
1731        Some(edge)
1732    }
1733
1734    /// Gets an edge visible to a specific transaction.
1735    /// (Tiered storage version)
1736    #[must_use]
1737    #[cfg(feature = "tiered-storage")]
1738    pub fn get_edge_versioned(&self, id: EdgeId, epoch: EpochId, tx_id: TxId) -> Option<Edge> {
1739        let versions = self.edge_versions.read();
1740        let index = versions.get(&id)?;
1741        let version_ref = index.visible_to(epoch, tx_id)?;
1742
1743        let record = self.read_edge_record(&version_ref)?;
1744
1745        if record.is_deleted() {
1746            return None;
1747        }
1748
1749        let edge_type = {
1750            let id_to_type = self.id_to_edge_type.read();
1751            id_to_type.get(record.type_id as usize)?.clone()
1752        };
1753
1754        let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1755
1756        // Get properties
1757        edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1758
1759        Some(edge)
1760    }
1761
1762    /// Reads an EdgeRecord from arena using a VersionRef.
1763    #[cfg(feature = "tiered-storage")]
1764    #[allow(unsafe_code)]
1765    fn read_edge_record(&self, version_ref: &VersionRef) -> Option<EdgeRecord> {
1766        match version_ref {
1767            VersionRef::Hot(hot_ref) => {
1768                let arena = self.arena_allocator.arena(hot_ref.epoch);
1769                // SAFETY: The offset was returned by alloc_value_with_offset for an EdgeRecord
1770                let record: &EdgeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
1771                Some(record.clone())
1772            }
1773            VersionRef::Cold(cold_ref) => {
1774                // Read from compressed epoch store
1775                self.epoch_store
1776                    .get_edge(cold_ref.epoch, cold_ref.block_offset, cold_ref.length)
1777            }
1778        }
1779    }
1780
1781    /// Deletes an edge (using latest epoch).
1782    pub fn delete_edge(&self, id: EdgeId) -> bool {
1783        self.delete_edge_at_epoch(id, self.current_epoch())
1784    }
1785
1786    /// Deletes an edge at a specific epoch.
1787    #[cfg(not(feature = "tiered-storage"))]
1788    pub fn delete_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> bool {
1789        let mut edges = self.edges.write();
1790        if let Some(chain) = edges.get_mut(&id) {
1791            // Get the visible record to check if deleted and get src/dst
1792            let (src, dst) = {
1793                match chain.visible_at(epoch) {
1794                    Some(record) => {
1795                        if record.is_deleted() {
1796                            return false;
1797                        }
1798                        (record.src, record.dst)
1799                    }
1800                    None => return false, // Not visible at this epoch (already deleted)
1801                }
1802            };
1803
1804            // Mark the version chain as deleted
1805            chain.mark_deleted(epoch);
1806
1807            drop(edges); // Release lock
1808
1809            // Mark as deleted in adjacency (soft delete)
1810            self.forward_adj.mark_deleted(src, id);
1811            if let Some(ref backward) = self.backward_adj {
1812                backward.mark_deleted(dst, id);
1813            }
1814
1815            // Remove properties
1816            self.edge_properties.remove_all(id);
1817
1818            true
1819        } else {
1820            false
1821        }
1822    }
1823
1824    /// Deletes an edge at a specific epoch.
1825    /// (Tiered storage version)
1826    #[cfg(feature = "tiered-storage")]
1827    pub fn delete_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> bool {
1828        let mut versions = self.edge_versions.write();
1829        if let Some(index) = versions.get_mut(&id) {
1830            // Get the visible record to check if deleted and get src/dst
1831            let (src, dst) = {
1832                match index.visible_at(epoch) {
1833                    Some(version_ref) => {
1834                        if let Some(record) = self.read_edge_record(&version_ref) {
1835                            if record.is_deleted() {
1836                                return false;
1837                            }
1838                            (record.src, record.dst)
1839                        } else {
1840                            return false;
1841                        }
1842                    }
1843                    None => return false,
1844                }
1845            };
1846
1847            // Mark as deleted in version index
1848            index.mark_deleted(epoch);
1849
1850            drop(versions); // Release lock
1851
1852            // Mark as deleted in adjacency (soft delete)
1853            self.forward_adj.mark_deleted(src, id);
1854            if let Some(ref backward) = self.backward_adj {
1855                backward.mark_deleted(dst, id);
1856            }
1857
1858            // Remove properties
1859            self.edge_properties.remove_all(id);
1860
1861            true
1862        } else {
1863            false
1864        }
1865    }
1866
1867    /// Returns the number of edges (non-deleted at current epoch).
1868    #[must_use]
1869    #[cfg(not(feature = "tiered-storage"))]
1870    pub fn edge_count(&self) -> usize {
1871        let epoch = self.current_epoch();
1872        self.edges
1873            .read()
1874            .values()
1875            .filter_map(|chain| chain.visible_at(epoch))
1876            .filter(|r| !r.is_deleted())
1877            .count()
1878    }
1879
1880    /// Returns the number of edges (non-deleted at current epoch).
1881    /// (Tiered storage version)
1882    #[must_use]
1883    #[cfg(feature = "tiered-storage")]
1884    pub fn edge_count(&self) -> usize {
1885        let epoch = self.current_epoch();
1886        let versions = self.edge_versions.read();
1887        versions
1888            .iter()
1889            .filter(|(_, index)| {
1890                index.visible_at(epoch).map_or(false, |vref| {
1891                    self.read_edge_record(&vref)
1892                        .map_or(false, |r| !r.is_deleted())
1893                })
1894            })
1895            .count()
1896    }
1897
1898    /// Discards all uncommitted versions created by a transaction.
1899    ///
1900    /// This is called during transaction rollback to clean up uncommitted changes.
1901    /// The method removes version chain entries created by the specified transaction.
1902    #[cfg(not(feature = "tiered-storage"))]
1903    pub fn discard_uncommitted_versions(&self, tx_id: TxId) {
1904        // Remove uncommitted node versions
1905        {
1906            let mut nodes = self.nodes.write();
1907            for chain in nodes.values_mut() {
1908                chain.remove_versions_by(tx_id);
1909            }
1910            // Remove completely empty chains (no versions left)
1911            nodes.retain(|_, chain| !chain.is_empty());
1912        }
1913
1914        // Remove uncommitted edge versions
1915        {
1916            let mut edges = self.edges.write();
1917            for chain in edges.values_mut() {
1918                chain.remove_versions_by(tx_id);
1919            }
1920            // Remove completely empty chains (no versions left)
1921            edges.retain(|_, chain| !chain.is_empty());
1922        }
1923    }
1924
1925    /// Discards all uncommitted versions created by a transaction.
1926    /// (Tiered storage version)
1927    #[cfg(feature = "tiered-storage")]
1928    pub fn discard_uncommitted_versions(&self, tx_id: TxId) {
1929        // Remove uncommitted node versions
1930        {
1931            let mut versions = self.node_versions.write();
1932            for index in versions.values_mut() {
1933                index.remove_versions_by(tx_id);
1934            }
1935            // Remove completely empty indexes (no versions left)
1936            versions.retain(|_, index| !index.is_empty());
1937        }
1938
1939        // Remove uncommitted edge versions
1940        {
1941            let mut versions = self.edge_versions.write();
1942            for index in versions.values_mut() {
1943                index.remove_versions_by(tx_id);
1944            }
1945            // Remove completely empty indexes (no versions left)
1946            versions.retain(|_, index| !index.is_empty());
1947        }
1948    }
1949
1950    /// Freezes an epoch from hot (arena) storage to cold (compressed) storage.
1951    ///
1952    /// This is called by the transaction manager when an epoch becomes eligible
1953    /// for freezing (no active transactions can see it). The freeze process:
1954    ///
1955    /// 1. Collects all hot version refs for the epoch
1956    /// 2. Reads the corresponding records from arena
1957    /// 3. Compresses them into a `CompressedEpochBlock`
1958    /// 4. Updates `VersionIndex` entries to point to cold storage
1959    /// 5. The arena can be deallocated after all epochs in it are frozen
1960    ///
1961    /// # Arguments
1962    ///
1963    /// * `epoch` - The epoch to freeze
1964    ///
1965    /// # Returns
1966    ///
1967    /// The number of records frozen (nodes + edges).
1968    #[cfg(feature = "tiered-storage")]
1969    #[allow(unsafe_code)]
1970    pub fn freeze_epoch(&self, epoch: EpochId) -> usize {
1971        // Collect node records to freeze
1972        let mut node_records: Vec<(u64, NodeRecord)> = Vec::new();
1973        let mut node_hot_refs: Vec<(NodeId, HotVersionRef)> = Vec::new();
1974
1975        {
1976            let versions = self.node_versions.read();
1977            for (node_id, index) in versions.iter() {
1978                for hot_ref in index.hot_refs_for_epoch(epoch) {
1979                    let arena = self.arena_allocator.arena(hot_ref.epoch);
1980                    // SAFETY: The offset was returned by alloc_value_with_offset for a NodeRecord
1981                    let record: &NodeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
1982                    node_records.push((node_id.as_u64(), record.clone()));
1983                    node_hot_refs.push((*node_id, *hot_ref));
1984                }
1985            }
1986        }
1987
1988        // Collect edge records to freeze
1989        let mut edge_records: Vec<(u64, EdgeRecord)> = Vec::new();
1990        let mut edge_hot_refs: Vec<(EdgeId, HotVersionRef)> = Vec::new();
1991
1992        {
1993            let versions = self.edge_versions.read();
1994            for (edge_id, index) in versions.iter() {
1995                for hot_ref in index.hot_refs_for_epoch(epoch) {
1996                    let arena = self.arena_allocator.arena(hot_ref.epoch);
1997                    // SAFETY: The offset was returned by alloc_value_with_offset for an EdgeRecord
1998                    let record: &EdgeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
1999                    edge_records.push((edge_id.as_u64(), record.clone()));
2000                    edge_hot_refs.push((*edge_id, *hot_ref));
2001                }
2002            }
2003        }
2004
2005        let total_frozen = node_records.len() + edge_records.len();
2006
2007        if total_frozen == 0 {
2008            return 0;
2009        }
2010
2011        // Freeze to compressed storage
2012        let (node_entries, edge_entries) =
2013            self.epoch_store
2014                .freeze_epoch(epoch, node_records, edge_records);
2015
2016        // Build lookup maps for index entries
2017        let node_entry_map: FxHashMap<u64, _> = node_entries
2018            .iter()
2019            .map(|e| (e.entity_id, (e.offset, e.length)))
2020            .collect();
2021        let edge_entry_map: FxHashMap<u64, _> = edge_entries
2022            .iter()
2023            .map(|e| (e.entity_id, (e.offset, e.length)))
2024            .collect();
2025
2026        // Update version indexes to use cold refs
2027        {
2028            let mut versions = self.node_versions.write();
2029            for (node_id, hot_ref) in &node_hot_refs {
2030                if let Some(index) = versions.get_mut(node_id) {
2031                    if let Some(&(offset, length)) = node_entry_map.get(&node_id.as_u64()) {
2032                        let cold_ref = ColdVersionRef {
2033                            epoch,
2034                            block_offset: offset,
2035                            length,
2036                            created_by: hot_ref.created_by,
2037                            deleted_epoch: hot_ref.deleted_epoch,
2038                        };
2039                        index.freeze_epoch(epoch, std::iter::once(cold_ref));
2040                    }
2041                }
2042            }
2043        }
2044
2045        {
2046            let mut versions = self.edge_versions.write();
2047            for (edge_id, hot_ref) in &edge_hot_refs {
2048                if let Some(index) = versions.get_mut(edge_id) {
2049                    if let Some(&(offset, length)) = edge_entry_map.get(&edge_id.as_u64()) {
2050                        let cold_ref = ColdVersionRef {
2051                            epoch,
2052                            block_offset: offset,
2053                            length,
2054                            created_by: hot_ref.created_by,
2055                            deleted_epoch: hot_ref.deleted_epoch,
2056                        };
2057                        index.freeze_epoch(epoch, std::iter::once(cold_ref));
2058                    }
2059                }
2060            }
2061        }
2062
2063        total_frozen
2064    }
2065
2066    /// Returns the epoch store for cold storage statistics.
2067    #[cfg(feature = "tiered-storage")]
2068    #[must_use]
2069    pub fn epoch_store(&self) -> &EpochStore {
2070        &self.epoch_store
2071    }
2072
2073    /// Returns the number of distinct labels in the store.
2074    #[must_use]
2075    pub fn label_count(&self) -> usize {
2076        self.id_to_label.read().len()
2077    }
2078
2079    /// Returns the number of distinct property keys in the store.
2080    ///
2081    /// This counts unique property keys across both nodes and edges.
2082    #[must_use]
2083    pub fn property_key_count(&self) -> usize {
2084        let node_keys = self.node_properties.column_count();
2085        let edge_keys = self.edge_properties.column_count();
2086        // Note: This may count some keys twice if the same key is used
2087        // for both nodes and edges. A more precise count would require
2088        // tracking unique keys across both storages.
2089        node_keys + edge_keys
2090    }
2091
2092    /// Returns the number of distinct edge types in the store.
2093    #[must_use]
2094    pub fn edge_type_count(&self) -> usize {
2095        self.id_to_edge_type.read().len()
2096    }
2097
2098    // === Traversal ===
2099
2100    /// Iterates over neighbors of a node in the specified direction.
2101    ///
2102    /// This is the fast path for graph traversal - goes straight to the
2103    /// adjacency index without loading full node data.
2104    pub fn neighbors(
2105        &self,
2106        node: NodeId,
2107        direction: Direction,
2108    ) -> impl Iterator<Item = NodeId> + '_ {
2109        let forward: Box<dyn Iterator<Item = NodeId>> = match direction {
2110            Direction::Outgoing | Direction::Both => {
2111                Box::new(self.forward_adj.neighbors(node).into_iter())
2112            }
2113            Direction::Incoming => Box::new(std::iter::empty()),
2114        };
2115
2116        let backward: Box<dyn Iterator<Item = NodeId>> = match direction {
2117            Direction::Incoming | Direction::Both => {
2118                if let Some(ref adj) = self.backward_adj {
2119                    Box::new(adj.neighbors(node).into_iter())
2120                } else {
2121                    Box::new(std::iter::empty())
2122                }
2123            }
2124            Direction::Outgoing => Box::new(std::iter::empty()),
2125        };
2126
2127        forward.chain(backward)
2128    }
2129
2130    /// Returns edges from a node with their targets.
2131    ///
2132    /// Returns an iterator of (target_node, edge_id) pairs.
2133    pub fn edges_from(
2134        &self,
2135        node: NodeId,
2136        direction: Direction,
2137    ) -> impl Iterator<Item = (NodeId, EdgeId)> + '_ {
2138        let forward: Box<dyn Iterator<Item = (NodeId, EdgeId)>> = match direction {
2139            Direction::Outgoing | Direction::Both => {
2140                Box::new(self.forward_adj.edges_from(node).into_iter())
2141            }
2142            Direction::Incoming => Box::new(std::iter::empty()),
2143        };
2144
2145        let backward: Box<dyn Iterator<Item = (NodeId, EdgeId)>> = match direction {
2146            Direction::Incoming | Direction::Both => {
2147                if let Some(ref adj) = self.backward_adj {
2148                    Box::new(adj.edges_from(node).into_iter())
2149                } else {
2150                    Box::new(std::iter::empty())
2151                }
2152            }
2153            Direction::Outgoing => Box::new(std::iter::empty()),
2154        };
2155
2156        forward.chain(backward)
2157    }
2158
2159    /// Returns edges to a node (where the node is the destination).
2160    ///
2161    /// Returns (source_node, edge_id) pairs for all edges pointing TO this node.
2162    /// Uses the backward adjacency index for O(degree) lookup.
2163    ///
2164    /// # Example
2165    ///
2166    /// ```ignore
2167    /// // For edges: A->B, C->B
2168    /// let incoming = store.edges_to(B);
2169    /// // Returns: [(A, edge1), (C, edge2)]
2170    /// ```
2171    pub fn edges_to(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
2172        if let Some(ref backward) = self.backward_adj {
2173            backward.edges_from(node)
2174        } else {
2175            // Fallback: scan all edges (slow but correct)
2176            self.all_edges()
2177                .filter_map(|edge| {
2178                    if edge.dst == node {
2179                        Some((edge.src, edge.id))
2180                    } else {
2181                        None
2182                    }
2183                })
2184                .collect()
2185        }
2186    }
2187
2188    /// Returns the out-degree of a node (number of outgoing edges).
2189    ///
2190    /// Uses the forward adjacency index for O(1) lookup.
2191    #[must_use]
2192    pub fn out_degree(&self, node: NodeId) -> usize {
2193        self.forward_adj.out_degree(node)
2194    }
2195
2196    /// Returns the in-degree of a node (number of incoming edges).
2197    ///
2198    /// Uses the backward adjacency index for O(1) lookup if available,
2199    /// otherwise falls back to scanning edges.
2200    #[must_use]
2201    pub fn in_degree(&self, node: NodeId) -> usize {
2202        if let Some(ref backward) = self.backward_adj {
2203            backward.in_degree(node)
2204        } else {
2205            // Fallback: count edges (slow)
2206            self.all_edges().filter(|edge| edge.dst == node).count()
2207        }
2208    }
2209
2210    /// Gets the type of an edge by ID.
2211    #[must_use]
2212    #[cfg(not(feature = "tiered-storage"))]
2213    pub fn edge_type(&self, id: EdgeId) -> Option<ArcStr> {
2214        let edges = self.edges.read();
2215        let chain = edges.get(&id)?;
2216        let epoch = self.current_epoch();
2217        let record = chain.visible_at(epoch)?;
2218        let id_to_type = self.id_to_edge_type.read();
2219        id_to_type.get(record.type_id as usize).cloned()
2220    }
2221
2222    /// Gets the type of an edge by ID.
2223    /// (Tiered storage version)
2224    #[must_use]
2225    #[cfg(feature = "tiered-storage")]
2226    pub fn edge_type(&self, id: EdgeId) -> Option<ArcStr> {
2227        let versions = self.edge_versions.read();
2228        let index = versions.get(&id)?;
2229        let epoch = self.current_epoch();
2230        let vref = index.visible_at(epoch)?;
2231        let record = self.read_edge_record(&vref)?;
2232        let id_to_type = self.id_to_edge_type.read();
2233        id_to_type.get(record.type_id as usize).cloned()
2234    }
2235
2236    /// Returns all nodes with a specific label.
2237    ///
2238    /// Uses the label index for O(1) lookup per label. Returns a snapshot -
2239    /// concurrent modifications won't affect the returned vector. Results are
2240    /// sorted by NodeId for deterministic iteration order.
2241    pub fn nodes_by_label(&self, label: &str) -> Vec<NodeId> {
2242        let label_to_id = self.label_to_id.read();
2243        if let Some(&label_id) = label_to_id.get(label) {
2244            let index = self.label_index.read();
2245            if let Some(set) = index.get(label_id as usize) {
2246                let mut ids: Vec<NodeId> = set.keys().copied().collect();
2247                ids.sort_unstable();
2248                return ids;
2249            }
2250        }
2251        Vec::new()
2252    }
2253
2254    // === Admin API: Iteration ===
2255
2256    /// Returns an iterator over all nodes in the database.
2257    ///
2258    /// This creates a snapshot of all visible nodes at the current epoch.
2259    /// Useful for dump/export operations.
2260    #[cfg(not(feature = "tiered-storage"))]
2261    pub fn all_nodes(&self) -> impl Iterator<Item = Node> + '_ {
2262        let epoch = self.current_epoch();
2263        let node_ids: Vec<NodeId> = self
2264            .nodes
2265            .read()
2266            .iter()
2267            .filter_map(|(id, chain)| {
2268                chain
2269                    .visible_at(epoch)
2270                    .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
2271            })
2272            .collect();
2273
2274        node_ids.into_iter().filter_map(move |id| self.get_node(id))
2275    }
2276
2277    /// Returns an iterator over all nodes in the database.
2278    /// (Tiered storage version)
2279    #[cfg(feature = "tiered-storage")]
2280    pub fn all_nodes(&self) -> impl Iterator<Item = Node> + '_ {
2281        let node_ids = self.node_ids();
2282        node_ids.into_iter().filter_map(move |id| self.get_node(id))
2283    }
2284
2285    /// Returns an iterator over all edges in the database.
2286    ///
2287    /// This creates a snapshot of all visible edges at the current epoch.
2288    /// Useful for dump/export operations.
2289    #[cfg(not(feature = "tiered-storage"))]
2290    pub fn all_edges(&self) -> impl Iterator<Item = Edge> + '_ {
2291        let epoch = self.current_epoch();
2292        let edge_ids: Vec<EdgeId> = self
2293            .edges
2294            .read()
2295            .iter()
2296            .filter_map(|(id, chain)| {
2297                chain
2298                    .visible_at(epoch)
2299                    .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
2300            })
2301            .collect();
2302
2303        edge_ids.into_iter().filter_map(move |id| self.get_edge(id))
2304    }
2305
2306    /// Returns an iterator over all edges in the database.
2307    /// (Tiered storage version)
2308    #[cfg(feature = "tiered-storage")]
2309    pub fn all_edges(&self) -> impl Iterator<Item = Edge> + '_ {
2310        let epoch = self.current_epoch();
2311        let versions = self.edge_versions.read();
2312        let edge_ids: Vec<EdgeId> = versions
2313            .iter()
2314            .filter_map(|(id, index)| {
2315                index.visible_at(epoch).and_then(|vref| {
2316                    self.read_edge_record(&vref)
2317                        .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
2318                })
2319            })
2320            .collect();
2321
2322        edge_ids.into_iter().filter_map(move |id| self.get_edge(id))
2323    }
2324
2325    /// Returns all label names in the database.
2326    pub fn all_labels(&self) -> Vec<String> {
2327        self.id_to_label
2328            .read()
2329            .iter()
2330            .map(|s| s.to_string())
2331            .collect()
2332    }
2333
2334    /// Returns all edge type names in the database.
2335    pub fn all_edge_types(&self) -> Vec<String> {
2336        self.id_to_edge_type
2337            .read()
2338            .iter()
2339            .map(|s| s.to_string())
2340            .collect()
2341    }
2342
2343    /// Returns all property keys used in the database.
2344    pub fn all_property_keys(&self) -> Vec<String> {
2345        let mut keys = std::collections::HashSet::new();
2346        for key in self.node_properties.keys() {
2347            keys.insert(key.to_string());
2348        }
2349        for key in self.edge_properties.keys() {
2350            keys.insert(key.to_string());
2351        }
2352        keys.into_iter().collect()
2353    }
2354
2355    /// Returns an iterator over nodes with a specific label.
2356    pub fn nodes_with_label<'a>(&'a self, label: &str) -> impl Iterator<Item = Node> + 'a {
2357        let node_ids = self.nodes_by_label(label);
2358        node_ids.into_iter().filter_map(move |id| self.get_node(id))
2359    }
2360
2361    /// Returns an iterator over edges with a specific type.
2362    #[cfg(not(feature = "tiered-storage"))]
2363    pub fn edges_with_type<'a>(&'a self, edge_type: &str) -> impl Iterator<Item = Edge> + 'a {
2364        let epoch = self.current_epoch();
2365        let type_to_id = self.edge_type_to_id.read();
2366
2367        if let Some(&type_id) = type_to_id.get(edge_type) {
2368            let edge_ids: Vec<EdgeId> = self
2369                .edges
2370                .read()
2371                .iter()
2372                .filter_map(|(id, chain)| {
2373                    chain.visible_at(epoch).and_then(|r| {
2374                        if !r.is_deleted() && r.type_id == type_id {
2375                            Some(*id)
2376                        } else {
2377                            None
2378                        }
2379                    })
2380                })
2381                .collect();
2382
2383            // Return a boxed iterator for the found edges
2384            Box::new(edge_ids.into_iter().filter_map(move |id| self.get_edge(id)))
2385                as Box<dyn Iterator<Item = Edge> + 'a>
2386        } else {
2387            // Return empty iterator
2388            Box::new(std::iter::empty()) as Box<dyn Iterator<Item = Edge> + 'a>
2389        }
2390    }
2391
2392    /// Returns an iterator over edges with a specific type.
2393    /// (Tiered storage version)
2394    #[cfg(feature = "tiered-storage")]
2395    pub fn edges_with_type<'a>(&'a self, edge_type: &str) -> impl Iterator<Item = Edge> + 'a {
2396        let epoch = self.current_epoch();
2397        let type_to_id = self.edge_type_to_id.read();
2398
2399        if let Some(&type_id) = type_to_id.get(edge_type) {
2400            let versions = self.edge_versions.read();
2401            let edge_ids: Vec<EdgeId> = versions
2402                .iter()
2403                .filter_map(|(id, index)| {
2404                    index.visible_at(epoch).and_then(|vref| {
2405                        self.read_edge_record(&vref).and_then(|r| {
2406                            if !r.is_deleted() && r.type_id == type_id {
2407                                Some(*id)
2408                            } else {
2409                                None
2410                            }
2411                        })
2412                    })
2413                })
2414                .collect();
2415
2416            Box::new(edge_ids.into_iter().filter_map(move |id| self.get_edge(id)))
2417                as Box<dyn Iterator<Item = Edge> + 'a>
2418        } else {
2419            Box::new(std::iter::empty()) as Box<dyn Iterator<Item = Edge> + 'a>
2420        }
2421    }
2422
2423    // === Zone Map Support ===
2424
2425    /// Checks if a node property predicate might match any nodes.
2426    ///
2427    /// Uses zone maps for early filtering. Returns `true` if there might be
2428    /// matching nodes, `false` if there definitely aren't.
2429    #[must_use]
2430    pub fn node_property_might_match(
2431        &self,
2432        property: &PropertyKey,
2433        op: CompareOp,
2434        value: &Value,
2435    ) -> bool {
2436        self.node_properties.might_match(property, op, value)
2437    }
2438
2439    /// Checks if an edge property predicate might match any edges.
2440    #[must_use]
2441    pub fn edge_property_might_match(
2442        &self,
2443        property: &PropertyKey,
2444        op: CompareOp,
2445        value: &Value,
2446    ) -> bool {
2447        self.edge_properties.might_match(property, op, value)
2448    }
2449
2450    /// Gets the zone map for a node property.
2451    #[must_use]
2452    pub fn node_property_zone_map(&self, property: &PropertyKey) -> Option<ZoneMapEntry> {
2453        self.node_properties.zone_map(property)
2454    }
2455
2456    /// Gets the zone map for an edge property.
2457    #[must_use]
2458    pub fn edge_property_zone_map(&self, property: &PropertyKey) -> Option<ZoneMapEntry> {
2459        self.edge_properties.zone_map(property)
2460    }
2461
2462    /// Rebuilds zone maps for all properties.
2463    pub fn rebuild_zone_maps(&self) {
2464        self.node_properties.rebuild_zone_maps();
2465        self.edge_properties.rebuild_zone_maps();
2466    }
2467
2468    // === Statistics ===
2469
2470    /// Returns the current statistics.
2471    #[must_use]
2472    pub fn statistics(&self) -> Statistics {
2473        self.statistics.read().clone()
2474    }
2475
2476    /// Recomputes statistics from current data.
2477    ///
2478    /// Scans all labels and edge types to build cardinality estimates for the
2479    /// query optimizer. Call this periodically or after bulk data loads.
2480    #[cfg(not(feature = "tiered-storage"))]
2481    pub fn compute_statistics(&self) {
2482        let mut stats = Statistics::new();
2483
2484        // Compute total counts
2485        stats.total_nodes = self.node_count() as u64;
2486        stats.total_edges = self.edge_count() as u64;
2487
2488        // Compute per-label statistics
2489        let id_to_label = self.id_to_label.read();
2490        let label_index = self.label_index.read();
2491
2492        for (label_id, label_name) in id_to_label.iter().enumerate() {
2493            let node_count = label_index
2494                .get(label_id)
2495                .map(|set| set.len() as u64)
2496                .unwrap_or(0);
2497
2498            if node_count > 0 {
2499                // Estimate average degree
2500                let avg_out_degree = if stats.total_nodes > 0 {
2501                    stats.total_edges as f64 / stats.total_nodes as f64
2502                } else {
2503                    0.0
2504                };
2505
2506                let label_stats =
2507                    LabelStatistics::new(node_count).with_degrees(avg_out_degree, avg_out_degree);
2508
2509                stats.update_label(label_name.as_ref(), label_stats);
2510            }
2511        }
2512
2513        // Compute per-edge-type statistics
2514        let id_to_edge_type = self.id_to_edge_type.read();
2515        let edges = self.edges.read();
2516        let epoch = self.current_epoch();
2517
2518        let mut edge_type_counts: FxHashMap<u32, u64> = FxHashMap::default();
2519        for chain in edges.values() {
2520            if let Some(record) = chain.visible_at(epoch) {
2521                if !record.is_deleted() {
2522                    *edge_type_counts.entry(record.type_id).or_default() += 1;
2523                }
2524            }
2525        }
2526
2527        for (type_id, count) in edge_type_counts {
2528            if let Some(type_name) = id_to_edge_type.get(type_id as usize) {
2529                let avg_degree = if stats.total_nodes > 0 {
2530                    count as f64 / stats.total_nodes as f64
2531                } else {
2532                    0.0
2533                };
2534
2535                let edge_stats = EdgeTypeStatistics::new(count, avg_degree, avg_degree);
2536                stats.update_edge_type(type_name.as_ref(), edge_stats);
2537            }
2538        }
2539
2540        *self.statistics.write() = stats;
2541    }
2542
2543    /// Recomputes statistics from current data.
2544    /// (Tiered storage version)
2545    #[cfg(feature = "tiered-storage")]
2546    pub fn compute_statistics(&self) {
2547        let mut stats = Statistics::new();
2548
2549        // Compute total counts
2550        stats.total_nodes = self.node_count() as u64;
2551        stats.total_edges = self.edge_count() as u64;
2552
2553        // Compute per-label statistics
2554        let id_to_label = self.id_to_label.read();
2555        let label_index = self.label_index.read();
2556
2557        for (label_id, label_name) in id_to_label.iter().enumerate() {
2558            let node_count = label_index
2559                .get(label_id)
2560                .map(|set| set.len() as u64)
2561                .unwrap_or(0);
2562
2563            if node_count > 0 {
2564                let avg_out_degree = if stats.total_nodes > 0 {
2565                    stats.total_edges as f64 / stats.total_nodes as f64
2566                } else {
2567                    0.0
2568                };
2569
2570                let label_stats =
2571                    LabelStatistics::new(node_count).with_degrees(avg_out_degree, avg_out_degree);
2572
2573                stats.update_label(label_name.as_ref(), label_stats);
2574            }
2575        }
2576
2577        // Compute per-edge-type statistics
2578        let id_to_edge_type = self.id_to_edge_type.read();
2579        let versions = self.edge_versions.read();
2580        let epoch = self.current_epoch();
2581
2582        let mut edge_type_counts: FxHashMap<u32, u64> = FxHashMap::default();
2583        for index in versions.values() {
2584            if let Some(vref) = index.visible_at(epoch) {
2585                if let Some(record) = self.read_edge_record(&vref) {
2586                    if !record.is_deleted() {
2587                        *edge_type_counts.entry(record.type_id).or_default() += 1;
2588                    }
2589                }
2590            }
2591        }
2592
2593        for (type_id, count) in edge_type_counts {
2594            if let Some(type_name) = id_to_edge_type.get(type_id as usize) {
2595                let avg_degree = if stats.total_nodes > 0 {
2596                    count as f64 / stats.total_nodes as f64
2597                } else {
2598                    0.0
2599                };
2600
2601                let edge_stats = EdgeTypeStatistics::new(count, avg_degree, avg_degree);
2602                stats.update_edge_type(type_name.as_ref(), edge_stats);
2603            }
2604        }
2605
2606        *self.statistics.write() = stats;
2607    }
2608
2609    /// Estimates cardinality for a label scan.
2610    #[must_use]
2611    pub fn estimate_label_cardinality(&self, label: &str) -> f64 {
2612        self.statistics.read().estimate_label_cardinality(label)
2613    }
2614
2615    /// Estimates average degree for an edge type.
2616    #[must_use]
2617    pub fn estimate_avg_degree(&self, edge_type: &str, outgoing: bool) -> f64 {
2618        self.statistics
2619            .read()
2620            .estimate_avg_degree(edge_type, outgoing)
2621    }
2622
2623    // === Internal Helpers ===
2624
2625    fn get_or_create_label_id(&self, label: &str) -> u32 {
2626        {
2627            let label_to_id = self.label_to_id.read();
2628            if let Some(&id) = label_to_id.get(label) {
2629                return id;
2630            }
2631        }
2632
2633        let mut label_to_id = self.label_to_id.write();
2634        let mut id_to_label = self.id_to_label.write();
2635
2636        // Double-check after acquiring write lock
2637        if let Some(&id) = label_to_id.get(label) {
2638            return id;
2639        }
2640
2641        let id = id_to_label.len() as u32;
2642
2643        let label: ArcStr = label.into();
2644        label_to_id.insert(label.clone(), id);
2645        id_to_label.push(label);
2646
2647        id
2648    }
2649
2650    fn get_or_create_edge_type_id(&self, edge_type: &str) -> u32 {
2651        {
2652            let type_to_id = self.edge_type_to_id.read();
2653            if let Some(&id) = type_to_id.get(edge_type) {
2654                return id;
2655            }
2656        }
2657
2658        let mut type_to_id = self.edge_type_to_id.write();
2659        let mut id_to_type = self.id_to_edge_type.write();
2660
2661        // Double-check
2662        if let Some(&id) = type_to_id.get(edge_type) {
2663            return id;
2664        }
2665
2666        let id = id_to_type.len() as u32;
2667        let edge_type: ArcStr = edge_type.into();
2668        type_to_id.insert(edge_type.clone(), id);
2669        id_to_type.push(edge_type);
2670
2671        id
2672    }
2673
2674    // === Recovery Support ===
2675
2676    /// Creates a node with a specific ID during recovery.
2677    ///
2678    /// This is used for WAL recovery to restore nodes with their original IDs.
2679    /// The caller must ensure IDs don't conflict with existing nodes.
2680    #[cfg(not(feature = "tiered-storage"))]
2681    pub fn create_node_with_id(&self, id: NodeId, labels: &[&str]) {
2682        let epoch = self.current_epoch();
2683        let mut record = NodeRecord::new(id, epoch);
2684        record.set_label_count(labels.len() as u16);
2685
2686        // Store labels in node_labels map and label_index
2687        let mut node_label_set = FxHashSet::default();
2688        for label in labels {
2689            let label_id = self.get_or_create_label_id(*label);
2690            node_label_set.insert(label_id);
2691
2692            // Update label index
2693            let mut index = self.label_index.write();
2694            while index.len() <= label_id as usize {
2695                index.push(FxHashMap::default());
2696            }
2697            index[label_id as usize].insert(id, ());
2698        }
2699
2700        // Store node's labels
2701        self.node_labels.write().insert(id, node_label_set);
2702
2703        // Create version chain with initial version (using SYSTEM tx for recovery)
2704        let chain = VersionChain::with_initial(record, epoch, TxId::SYSTEM);
2705        self.nodes.write().insert(id, chain);
2706
2707        // Update next_node_id if necessary to avoid future collisions
2708        let id_val = id.as_u64();
2709        let _ = self
2710            .next_node_id
2711            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
2712                if id_val >= current {
2713                    Some(id_val + 1)
2714                } else {
2715                    None
2716                }
2717            });
2718    }
2719
2720    /// Creates a node with a specific ID during recovery.
2721    /// (Tiered storage version)
2722    #[cfg(feature = "tiered-storage")]
2723    pub fn create_node_with_id(&self, id: NodeId, labels: &[&str]) {
2724        let epoch = self.current_epoch();
2725        let mut record = NodeRecord::new(id, epoch);
2726        record.set_label_count(labels.len() as u16);
2727
2728        // Store labels in node_labels map and label_index
2729        let mut node_label_set = FxHashSet::default();
2730        for label in labels {
2731            let label_id = self.get_or_create_label_id(*label);
2732            node_label_set.insert(label_id);
2733
2734            // Update label index
2735            let mut index = self.label_index.write();
2736            while index.len() <= label_id as usize {
2737                index.push(FxHashMap::default());
2738            }
2739            index[label_id as usize].insert(id, ());
2740        }
2741
2742        // Store node's labels
2743        self.node_labels.write().insert(id, node_label_set);
2744
2745        // Allocate record in arena and get offset (create epoch if needed)
2746        let arena = self.arena_allocator.arena_or_create(epoch);
2747        let (offset, _stored) = arena.alloc_value_with_offset(record);
2748
2749        // Create HotVersionRef (using SYSTEM tx for recovery)
2750        let hot_ref = HotVersionRef::new(epoch, offset, TxId::SYSTEM);
2751        let mut versions = self.node_versions.write();
2752        versions.insert(id, VersionIndex::with_initial(hot_ref));
2753
2754        // Update next_node_id if necessary to avoid future collisions
2755        let id_val = id.as_u64();
2756        let _ = self
2757            .next_node_id
2758            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
2759                if id_val >= current {
2760                    Some(id_val + 1)
2761                } else {
2762                    None
2763                }
2764            });
2765    }
2766
2767    /// Creates an edge with a specific ID during recovery.
2768    ///
2769    /// This is used for WAL recovery to restore edges with their original IDs.
2770    #[cfg(not(feature = "tiered-storage"))]
2771    pub fn create_edge_with_id(&self, id: EdgeId, src: NodeId, dst: NodeId, edge_type: &str) {
2772        let epoch = self.current_epoch();
2773        let type_id = self.get_or_create_edge_type_id(edge_type);
2774
2775        let record = EdgeRecord::new(id, src, dst, type_id, epoch);
2776        let chain = VersionChain::with_initial(record, epoch, TxId::SYSTEM);
2777        self.edges.write().insert(id, chain);
2778
2779        // Update adjacency
2780        self.forward_adj.add_edge(src, dst, id);
2781        if let Some(ref backward) = self.backward_adj {
2782            backward.add_edge(dst, src, id);
2783        }
2784
2785        // Update next_edge_id if necessary
2786        let id_val = id.as_u64();
2787        let _ = self
2788            .next_edge_id
2789            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
2790                if id_val >= current {
2791                    Some(id_val + 1)
2792                } else {
2793                    None
2794                }
2795            });
2796    }
2797
2798    /// Creates an edge with a specific ID during recovery.
2799    /// (Tiered storage version)
2800    #[cfg(feature = "tiered-storage")]
2801    pub fn create_edge_with_id(&self, id: EdgeId, src: NodeId, dst: NodeId, edge_type: &str) {
2802        let epoch = self.current_epoch();
2803        let type_id = self.get_or_create_edge_type_id(edge_type);
2804
2805        let record = EdgeRecord::new(id, src, dst, type_id, epoch);
2806
2807        // Allocate record in arena and get offset (create epoch if needed)
2808        let arena = self.arena_allocator.arena_or_create(epoch);
2809        let (offset, _stored) = arena.alloc_value_with_offset(record);
2810
2811        // Create HotVersionRef (using SYSTEM tx for recovery)
2812        let hot_ref = HotVersionRef::new(epoch, offset, TxId::SYSTEM);
2813        let mut versions = self.edge_versions.write();
2814        versions.insert(id, VersionIndex::with_initial(hot_ref));
2815
2816        // Update adjacency
2817        self.forward_adj.add_edge(src, dst, id);
2818        if let Some(ref backward) = self.backward_adj {
2819            backward.add_edge(dst, src, id);
2820        }
2821
2822        // Update next_edge_id if necessary
2823        let id_val = id.as_u64();
2824        let _ = self
2825            .next_edge_id
2826            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
2827                if id_val >= current {
2828                    Some(id_val + 1)
2829                } else {
2830                    None
2831                }
2832            });
2833    }
2834
2835    /// Sets the current epoch during recovery.
2836    pub fn set_epoch(&self, epoch: EpochId) {
2837        self.current_epoch.store(epoch.as_u64(), Ordering::SeqCst);
2838    }
2839}
2840
2841impl Default for LpgStore {
2842    fn default() -> Self {
2843        Self::new()
2844    }
2845}
2846
2847#[cfg(test)]
2848mod tests {
2849    use super::*;
2850
2851    #[test]
2852    fn test_create_node() {
2853        let store = LpgStore::new();
2854
2855        let id = store.create_node(&["Person"]);
2856        assert!(id.is_valid());
2857
2858        let node = store.get_node(id).unwrap();
2859        assert!(node.has_label("Person"));
2860        assert!(!node.has_label("Animal"));
2861    }
2862
2863    #[test]
2864    fn test_create_node_with_props() {
2865        let store = LpgStore::new();
2866
2867        let id = store.create_node_with_props(
2868            &["Person"],
2869            [("name", Value::from("Alice")), ("age", Value::from(30i64))],
2870        );
2871
2872        let node = store.get_node(id).unwrap();
2873        assert_eq!(
2874            node.get_property("name").and_then(|v| v.as_str()),
2875            Some("Alice")
2876        );
2877        assert_eq!(
2878            node.get_property("age").and_then(|v| v.as_int64()),
2879            Some(30)
2880        );
2881    }
2882
2883    #[test]
2884    fn test_delete_node() {
2885        let store = LpgStore::new();
2886
2887        let id = store.create_node(&["Person"]);
2888        assert_eq!(store.node_count(), 1);
2889
2890        assert!(store.delete_node(id));
2891        assert_eq!(store.node_count(), 0);
2892        assert!(store.get_node(id).is_none());
2893
2894        // Double delete should return false
2895        assert!(!store.delete_node(id));
2896    }
2897
2898    #[test]
2899    fn test_create_edge() {
2900        let store = LpgStore::new();
2901
2902        let alice = store.create_node(&["Person"]);
2903        let bob = store.create_node(&["Person"]);
2904
2905        let edge_id = store.create_edge(alice, bob, "KNOWS");
2906        assert!(edge_id.is_valid());
2907
2908        let edge = store.get_edge(edge_id).unwrap();
2909        assert_eq!(edge.src, alice);
2910        assert_eq!(edge.dst, bob);
2911        assert_eq!(edge.edge_type.as_str(), "KNOWS");
2912    }
2913
2914    #[test]
2915    fn test_neighbors() {
2916        let store = LpgStore::new();
2917
2918        let a = store.create_node(&["Person"]);
2919        let b = store.create_node(&["Person"]);
2920        let c = store.create_node(&["Person"]);
2921
2922        store.create_edge(a, b, "KNOWS");
2923        store.create_edge(a, c, "KNOWS");
2924
2925        let outgoing: Vec<_> = store.neighbors(a, Direction::Outgoing).collect();
2926        assert_eq!(outgoing.len(), 2);
2927        assert!(outgoing.contains(&b));
2928        assert!(outgoing.contains(&c));
2929
2930        let incoming: Vec<_> = store.neighbors(b, Direction::Incoming).collect();
2931        assert_eq!(incoming.len(), 1);
2932        assert!(incoming.contains(&a));
2933    }
2934
2935    #[test]
2936    fn test_nodes_by_label() {
2937        let store = LpgStore::new();
2938
2939        let p1 = store.create_node(&["Person"]);
2940        let p2 = store.create_node(&["Person"]);
2941        let _a = store.create_node(&["Animal"]);
2942
2943        let persons = store.nodes_by_label("Person");
2944        assert_eq!(persons.len(), 2);
2945        assert!(persons.contains(&p1));
2946        assert!(persons.contains(&p2));
2947
2948        let animals = store.nodes_by_label("Animal");
2949        assert_eq!(animals.len(), 1);
2950    }
2951
2952    #[test]
2953    fn test_delete_edge() {
2954        let store = LpgStore::new();
2955
2956        let a = store.create_node(&["Person"]);
2957        let b = store.create_node(&["Person"]);
2958        let edge_id = store.create_edge(a, b, "KNOWS");
2959
2960        assert_eq!(store.edge_count(), 1);
2961
2962        assert!(store.delete_edge(edge_id));
2963        assert_eq!(store.edge_count(), 0);
2964        assert!(store.get_edge(edge_id).is_none());
2965    }
2966
2967    // === New tests for improved coverage ===
2968
2969    #[test]
2970    fn test_lpg_store_config() {
2971        // Test with_config
2972        let config = LpgStoreConfig {
2973            backward_edges: false,
2974            initial_node_capacity: 100,
2975            initial_edge_capacity: 200,
2976        };
2977        let store = LpgStore::with_config(config);
2978
2979        // Store should work but without backward adjacency
2980        let a = store.create_node(&["Person"]);
2981        let b = store.create_node(&["Person"]);
2982        store.create_edge(a, b, "KNOWS");
2983
2984        // Outgoing should work
2985        let outgoing: Vec<_> = store.neighbors(a, Direction::Outgoing).collect();
2986        assert_eq!(outgoing.len(), 1);
2987
2988        // Incoming should be empty (no backward adjacency)
2989        let incoming: Vec<_> = store.neighbors(b, Direction::Incoming).collect();
2990        assert_eq!(incoming.len(), 0);
2991    }
2992
2993    #[test]
2994    fn test_epoch_management() {
2995        let store = LpgStore::new();
2996
2997        let epoch0 = store.current_epoch();
2998        assert_eq!(epoch0.as_u64(), 0);
2999
3000        let epoch1 = store.new_epoch();
3001        assert_eq!(epoch1.as_u64(), 1);
3002
3003        let current = store.current_epoch();
3004        assert_eq!(current.as_u64(), 1);
3005    }
3006
3007    #[test]
3008    fn test_node_properties() {
3009        let store = LpgStore::new();
3010        let id = store.create_node(&["Person"]);
3011
3012        // Set and get property
3013        store.set_node_property(id, "name", Value::from("Alice"));
3014        let name = store.get_node_property(id, &"name".into());
3015        assert!(matches!(name, Some(Value::String(s)) if s.as_str() == "Alice"));
3016
3017        // Update property
3018        store.set_node_property(id, "name", Value::from("Bob"));
3019        let name = store.get_node_property(id, &"name".into());
3020        assert!(matches!(name, Some(Value::String(s)) if s.as_str() == "Bob"));
3021
3022        // Remove property
3023        let old = store.remove_node_property(id, "name");
3024        assert!(matches!(old, Some(Value::String(s)) if s.as_str() == "Bob"));
3025
3026        // Property should be gone
3027        let name = store.get_node_property(id, &"name".into());
3028        assert!(name.is_none());
3029
3030        // Remove non-existent property
3031        let none = store.remove_node_property(id, "nonexistent");
3032        assert!(none.is_none());
3033    }
3034
3035    #[test]
3036    fn test_edge_properties() {
3037        let store = LpgStore::new();
3038        let a = store.create_node(&["Person"]);
3039        let b = store.create_node(&["Person"]);
3040        let edge_id = store.create_edge(a, b, "KNOWS");
3041
3042        // Set and get property
3043        store.set_edge_property(edge_id, "since", Value::from(2020i64));
3044        let since = store.get_edge_property(edge_id, &"since".into());
3045        assert_eq!(since.and_then(|v| v.as_int64()), Some(2020));
3046
3047        // Remove property
3048        let old = store.remove_edge_property(edge_id, "since");
3049        assert_eq!(old.and_then(|v| v.as_int64()), Some(2020));
3050
3051        let since = store.get_edge_property(edge_id, &"since".into());
3052        assert!(since.is_none());
3053    }
3054
3055    #[test]
3056    fn test_add_remove_label() {
3057        let store = LpgStore::new();
3058        let id = store.create_node(&["Person"]);
3059
3060        // Add new label
3061        assert!(store.add_label(id, "Employee"));
3062
3063        let node = store.get_node(id).unwrap();
3064        assert!(node.has_label("Person"));
3065        assert!(node.has_label("Employee"));
3066
3067        // Adding same label again should fail
3068        assert!(!store.add_label(id, "Employee"));
3069
3070        // Remove label
3071        assert!(store.remove_label(id, "Employee"));
3072
3073        let node = store.get_node(id).unwrap();
3074        assert!(node.has_label("Person"));
3075        assert!(!node.has_label("Employee"));
3076
3077        // Removing non-existent label should fail
3078        assert!(!store.remove_label(id, "Employee"));
3079        assert!(!store.remove_label(id, "NonExistent"));
3080    }
3081
3082    #[test]
3083    fn test_add_label_to_nonexistent_node() {
3084        let store = LpgStore::new();
3085        let fake_id = NodeId::new(999);
3086        assert!(!store.add_label(fake_id, "Label"));
3087    }
3088
3089    #[test]
3090    fn test_remove_label_from_nonexistent_node() {
3091        let store = LpgStore::new();
3092        let fake_id = NodeId::new(999);
3093        assert!(!store.remove_label(fake_id, "Label"));
3094    }
3095
3096    #[test]
3097    fn test_node_ids() {
3098        let store = LpgStore::new();
3099
3100        let n1 = store.create_node(&["Person"]);
3101        let n2 = store.create_node(&["Person"]);
3102        let n3 = store.create_node(&["Person"]);
3103
3104        let ids = store.node_ids();
3105        assert_eq!(ids.len(), 3);
3106        assert!(ids.contains(&n1));
3107        assert!(ids.contains(&n2));
3108        assert!(ids.contains(&n3));
3109
3110        // Delete one
3111        store.delete_node(n2);
3112        let ids = store.node_ids();
3113        assert_eq!(ids.len(), 2);
3114        assert!(!ids.contains(&n2));
3115    }
3116
3117    #[test]
3118    fn test_delete_node_nonexistent() {
3119        let store = LpgStore::new();
3120        let fake_id = NodeId::new(999);
3121        assert!(!store.delete_node(fake_id));
3122    }
3123
3124    #[test]
3125    fn test_delete_edge_nonexistent() {
3126        let store = LpgStore::new();
3127        let fake_id = EdgeId::new(999);
3128        assert!(!store.delete_edge(fake_id));
3129    }
3130
3131    #[test]
3132    fn test_delete_edge_double() {
3133        let store = LpgStore::new();
3134        let a = store.create_node(&["Person"]);
3135        let b = store.create_node(&["Person"]);
3136        let edge_id = store.create_edge(a, b, "KNOWS");
3137
3138        assert!(store.delete_edge(edge_id));
3139        assert!(!store.delete_edge(edge_id)); // Double delete
3140    }
3141
3142    #[test]
3143    fn test_create_edge_with_props() {
3144        let store = LpgStore::new();
3145        let a = store.create_node(&["Person"]);
3146        let b = store.create_node(&["Person"]);
3147
3148        let edge_id = store.create_edge_with_props(
3149            a,
3150            b,
3151            "KNOWS",
3152            [
3153                ("since", Value::from(2020i64)),
3154                ("weight", Value::from(1.0)),
3155            ],
3156        );
3157
3158        let edge = store.get_edge(edge_id).unwrap();
3159        assert_eq!(
3160            edge.get_property("since").and_then(|v| v.as_int64()),
3161            Some(2020)
3162        );
3163        assert_eq!(
3164            edge.get_property("weight").and_then(|v| v.as_float64()),
3165            Some(1.0)
3166        );
3167    }
3168
3169    #[test]
3170    fn test_delete_node_edges() {
3171        let store = LpgStore::new();
3172
3173        let a = store.create_node(&["Person"]);
3174        let b = store.create_node(&["Person"]);
3175        let c = store.create_node(&["Person"]);
3176
3177        store.create_edge(a, b, "KNOWS"); // a -> b
3178        store.create_edge(c, a, "KNOWS"); // c -> a
3179
3180        assert_eq!(store.edge_count(), 2);
3181
3182        // Delete all edges connected to a
3183        store.delete_node_edges(a);
3184
3185        assert_eq!(store.edge_count(), 0);
3186    }
3187
3188    #[test]
3189    fn test_neighbors_both_directions() {
3190        let store = LpgStore::new();
3191
3192        let a = store.create_node(&["Person"]);
3193        let b = store.create_node(&["Person"]);
3194        let c = store.create_node(&["Person"]);
3195
3196        store.create_edge(a, b, "KNOWS"); // a -> b
3197        store.create_edge(c, a, "KNOWS"); // c -> a
3198
3199        // Direction::Both for node a
3200        let neighbors: Vec<_> = store.neighbors(a, Direction::Both).collect();
3201        assert_eq!(neighbors.len(), 2);
3202        assert!(neighbors.contains(&b)); // outgoing
3203        assert!(neighbors.contains(&c)); // incoming
3204    }
3205
3206    #[test]
3207    fn test_edges_from() {
3208        let store = LpgStore::new();
3209
3210        let a = store.create_node(&["Person"]);
3211        let b = store.create_node(&["Person"]);
3212        let c = store.create_node(&["Person"]);
3213
3214        let e1 = store.create_edge(a, b, "KNOWS");
3215        let e2 = store.create_edge(a, c, "KNOWS");
3216
3217        let edges: Vec<_> = store.edges_from(a, Direction::Outgoing).collect();
3218        assert_eq!(edges.len(), 2);
3219        assert!(edges.iter().any(|(_, e)| *e == e1));
3220        assert!(edges.iter().any(|(_, e)| *e == e2));
3221
3222        // Incoming edges to b
3223        let incoming: Vec<_> = store.edges_from(b, Direction::Incoming).collect();
3224        assert_eq!(incoming.len(), 1);
3225        assert_eq!(incoming[0].1, e1);
3226    }
3227
3228    #[test]
3229    fn test_edges_to() {
3230        let store = LpgStore::new();
3231
3232        let a = store.create_node(&["Person"]);
3233        let b = store.create_node(&["Person"]);
3234        let c = store.create_node(&["Person"]);
3235
3236        let e1 = store.create_edge(a, b, "KNOWS");
3237        let e2 = store.create_edge(c, b, "KNOWS");
3238
3239        // Edges pointing TO b
3240        let to_b = store.edges_to(b);
3241        assert_eq!(to_b.len(), 2);
3242        assert!(to_b.iter().any(|(src, e)| *src == a && *e == e1));
3243        assert!(to_b.iter().any(|(src, e)| *src == c && *e == e2));
3244    }
3245
3246    #[test]
3247    fn test_out_degree_in_degree() {
3248        let store = LpgStore::new();
3249
3250        let a = store.create_node(&["Person"]);
3251        let b = store.create_node(&["Person"]);
3252        let c = store.create_node(&["Person"]);
3253
3254        store.create_edge(a, b, "KNOWS");
3255        store.create_edge(a, c, "KNOWS");
3256        store.create_edge(c, b, "KNOWS");
3257
3258        assert_eq!(store.out_degree(a), 2);
3259        assert_eq!(store.out_degree(b), 0);
3260        assert_eq!(store.out_degree(c), 1);
3261
3262        assert_eq!(store.in_degree(a), 0);
3263        assert_eq!(store.in_degree(b), 2);
3264        assert_eq!(store.in_degree(c), 1);
3265    }
3266
3267    #[test]
3268    fn test_edge_type() {
3269        let store = LpgStore::new();
3270
3271        let a = store.create_node(&["Person"]);
3272        let b = store.create_node(&["Person"]);
3273        let edge_id = store.create_edge(a, b, "KNOWS");
3274
3275        let edge_type = store.edge_type(edge_id);
3276        assert_eq!(edge_type.as_deref(), Some("KNOWS"));
3277
3278        // Non-existent edge
3279        let fake_id = EdgeId::new(999);
3280        assert!(store.edge_type(fake_id).is_none());
3281    }
3282
3283    #[test]
3284    fn test_count_methods() {
3285        let store = LpgStore::new();
3286
3287        assert_eq!(store.label_count(), 0);
3288        assert_eq!(store.edge_type_count(), 0);
3289        assert_eq!(store.property_key_count(), 0);
3290
3291        let a = store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
3292        let b = store.create_node(&["Company"]);
3293        store.create_edge_with_props(a, b, "WORKS_AT", [("since", Value::from(2020i64))]);
3294
3295        assert_eq!(store.label_count(), 2); // Person, Company
3296        assert_eq!(store.edge_type_count(), 1); // WORKS_AT
3297        assert_eq!(store.property_key_count(), 2); // age, since
3298    }
3299
3300    #[test]
3301    fn test_all_nodes_and_edges() {
3302        let store = LpgStore::new();
3303
3304        let a = store.create_node(&["Person"]);
3305        let b = store.create_node(&["Person"]);
3306        store.create_edge(a, b, "KNOWS");
3307
3308        let nodes: Vec<_> = store.all_nodes().collect();
3309        assert_eq!(nodes.len(), 2);
3310
3311        let edges: Vec<_> = store.all_edges().collect();
3312        assert_eq!(edges.len(), 1);
3313    }
3314
3315    #[test]
3316    fn test_all_labels_and_edge_types() {
3317        let store = LpgStore::new();
3318
3319        store.create_node(&["Person"]);
3320        store.create_node(&["Company"]);
3321        let a = store.create_node(&["Animal"]);
3322        let b = store.create_node(&["Animal"]);
3323        store.create_edge(a, b, "EATS");
3324
3325        let labels = store.all_labels();
3326        assert_eq!(labels.len(), 3);
3327        assert!(labels.contains(&"Person".to_string()));
3328        assert!(labels.contains(&"Company".to_string()));
3329        assert!(labels.contains(&"Animal".to_string()));
3330
3331        let edge_types = store.all_edge_types();
3332        assert_eq!(edge_types.len(), 1);
3333        assert!(edge_types.contains(&"EATS".to_string()));
3334    }
3335
3336    #[test]
3337    fn test_all_property_keys() {
3338        let store = LpgStore::new();
3339
3340        let a = store.create_node_with_props(&["Person"], [("name", Value::from("Alice"))]);
3341        let b = store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
3342        store.create_edge_with_props(a, b, "KNOWS", [("since", Value::from(2020i64))]);
3343
3344        let keys = store.all_property_keys();
3345        assert!(keys.contains(&"name".to_string()));
3346        assert!(keys.contains(&"age".to_string()));
3347        assert!(keys.contains(&"since".to_string()));
3348    }
3349
3350    #[test]
3351    fn test_nodes_with_label() {
3352        let store = LpgStore::new();
3353
3354        store.create_node(&["Person"]);
3355        store.create_node(&["Person"]);
3356        store.create_node(&["Company"]);
3357
3358        let persons: Vec<_> = store.nodes_with_label("Person").collect();
3359        assert_eq!(persons.len(), 2);
3360
3361        let companies: Vec<_> = store.nodes_with_label("Company").collect();
3362        assert_eq!(companies.len(), 1);
3363
3364        let none: Vec<_> = store.nodes_with_label("NonExistent").collect();
3365        assert_eq!(none.len(), 0);
3366    }
3367
3368    #[test]
3369    fn test_edges_with_type() {
3370        let store = LpgStore::new();
3371
3372        let a = store.create_node(&["Person"]);
3373        let b = store.create_node(&["Person"]);
3374        let c = store.create_node(&["Company"]);
3375
3376        store.create_edge(a, b, "KNOWS");
3377        store.create_edge(a, c, "WORKS_AT");
3378
3379        let knows: Vec<_> = store.edges_with_type("KNOWS").collect();
3380        assert_eq!(knows.len(), 1);
3381
3382        let works_at: Vec<_> = store.edges_with_type("WORKS_AT").collect();
3383        assert_eq!(works_at.len(), 1);
3384
3385        let none: Vec<_> = store.edges_with_type("NonExistent").collect();
3386        assert_eq!(none.len(), 0);
3387    }
3388
3389    #[test]
3390    fn test_nodes_by_label_nonexistent() {
3391        let store = LpgStore::new();
3392        store.create_node(&["Person"]);
3393
3394        let empty = store.nodes_by_label("NonExistent");
3395        assert!(empty.is_empty());
3396    }
3397
3398    #[test]
3399    fn test_statistics() {
3400        let store = LpgStore::new();
3401
3402        let a = store.create_node(&["Person"]);
3403        let b = store.create_node(&["Person"]);
3404        let c = store.create_node(&["Company"]);
3405
3406        store.create_edge(a, b, "KNOWS");
3407        store.create_edge(a, c, "WORKS_AT");
3408
3409        store.compute_statistics();
3410        let stats = store.statistics();
3411
3412        assert_eq!(stats.total_nodes, 3);
3413        assert_eq!(stats.total_edges, 2);
3414
3415        // Estimates
3416        let person_card = store.estimate_label_cardinality("Person");
3417        assert!(person_card > 0.0);
3418
3419        let avg_degree = store.estimate_avg_degree("KNOWS", true);
3420        assert!(avg_degree >= 0.0);
3421    }
3422
3423    #[test]
3424    fn test_zone_maps() {
3425        let store = LpgStore::new();
3426
3427        store.create_node_with_props(&["Person"], [("age", Value::from(25i64))]);
3428        store.create_node_with_props(&["Person"], [("age", Value::from(35i64))]);
3429
3430        // Zone map should indicate possible matches (30 is within [25, 35] range)
3431        let might_match =
3432            store.node_property_might_match(&"age".into(), CompareOp::Eq, &Value::from(30i64));
3433        // Zone maps return true conservatively when value is within min/max range
3434        assert!(might_match);
3435
3436        let zone = store.node_property_zone_map(&"age".into());
3437        assert!(zone.is_some());
3438
3439        // Non-existent property
3440        let no_zone = store.node_property_zone_map(&"nonexistent".into());
3441        assert!(no_zone.is_none());
3442
3443        // Edge zone maps
3444        let a = store.create_node(&["A"]);
3445        let b = store.create_node(&["B"]);
3446        store.create_edge_with_props(a, b, "REL", [("weight", Value::from(1.0))]);
3447
3448        let edge_zone = store.edge_property_zone_map(&"weight".into());
3449        assert!(edge_zone.is_some());
3450    }
3451
3452    #[test]
3453    fn test_rebuild_zone_maps() {
3454        let store = LpgStore::new();
3455        store.create_node_with_props(&["Person"], [("age", Value::from(25i64))]);
3456
3457        // Should not panic
3458        store.rebuild_zone_maps();
3459    }
3460
3461    #[test]
3462    fn test_create_node_with_id() {
3463        let store = LpgStore::new();
3464
3465        let specific_id = NodeId::new(100);
3466        store.create_node_with_id(specific_id, &["Person", "Employee"]);
3467
3468        let node = store.get_node(specific_id).unwrap();
3469        assert!(node.has_label("Person"));
3470        assert!(node.has_label("Employee"));
3471
3472        // Next auto-generated ID should be > 100
3473        let next = store.create_node(&["Other"]);
3474        assert!(next.as_u64() > 100);
3475    }
3476
3477    #[test]
3478    fn test_create_edge_with_id() {
3479        let store = LpgStore::new();
3480
3481        let a = store.create_node(&["A"]);
3482        let b = store.create_node(&["B"]);
3483
3484        let specific_id = EdgeId::new(500);
3485        store.create_edge_with_id(specific_id, a, b, "REL");
3486
3487        let edge = store.get_edge(specific_id).unwrap();
3488        assert_eq!(edge.src, a);
3489        assert_eq!(edge.dst, b);
3490        assert_eq!(edge.edge_type.as_str(), "REL");
3491
3492        // Next auto-generated ID should be > 500
3493        let next = store.create_edge(a, b, "OTHER");
3494        assert!(next.as_u64() > 500);
3495    }
3496
3497    #[test]
3498    fn test_set_epoch() {
3499        let store = LpgStore::new();
3500
3501        assert_eq!(store.current_epoch().as_u64(), 0);
3502
3503        store.set_epoch(EpochId::new(42));
3504        assert_eq!(store.current_epoch().as_u64(), 42);
3505    }
3506
3507    #[test]
3508    fn test_get_node_nonexistent() {
3509        let store = LpgStore::new();
3510        let fake_id = NodeId::new(999);
3511        assert!(store.get_node(fake_id).is_none());
3512    }
3513
3514    #[test]
3515    fn test_get_edge_nonexistent() {
3516        let store = LpgStore::new();
3517        let fake_id = EdgeId::new(999);
3518        assert!(store.get_edge(fake_id).is_none());
3519    }
3520
3521    #[test]
3522    fn test_multiple_labels() {
3523        let store = LpgStore::new();
3524
3525        let id = store.create_node(&["Person", "Employee", "Manager"]);
3526        let node = store.get_node(id).unwrap();
3527
3528        assert!(node.has_label("Person"));
3529        assert!(node.has_label("Employee"));
3530        assert!(node.has_label("Manager"));
3531        assert!(!node.has_label("Other"));
3532    }
3533
3534    #[test]
3535    fn test_default_impl() {
3536        let store: LpgStore = Default::default();
3537        assert_eq!(store.node_count(), 0);
3538        assert_eq!(store.edge_count(), 0);
3539    }
3540
3541    #[test]
3542    fn test_edges_from_both_directions() {
3543        let store = LpgStore::new();
3544
3545        let a = store.create_node(&["A"]);
3546        let b = store.create_node(&["B"]);
3547        let c = store.create_node(&["C"]);
3548
3549        let e1 = store.create_edge(a, b, "R1"); // a -> b
3550        let e2 = store.create_edge(c, a, "R2"); // c -> a
3551
3552        // Both directions from a
3553        let edges: Vec<_> = store.edges_from(a, Direction::Both).collect();
3554        assert_eq!(edges.len(), 2);
3555        assert!(edges.iter().any(|(_, e)| *e == e1)); // outgoing
3556        assert!(edges.iter().any(|(_, e)| *e == e2)); // incoming
3557    }
3558
3559    #[test]
3560    fn test_no_backward_adj_in_degree() {
3561        let config = LpgStoreConfig {
3562            backward_edges: false,
3563            initial_node_capacity: 10,
3564            initial_edge_capacity: 10,
3565        };
3566        let store = LpgStore::with_config(config);
3567
3568        let a = store.create_node(&["A"]);
3569        let b = store.create_node(&["B"]);
3570        store.create_edge(a, b, "R");
3571
3572        // in_degree should still work (falls back to scanning)
3573        let degree = store.in_degree(b);
3574        assert_eq!(degree, 1);
3575    }
3576
3577    #[test]
3578    fn test_no_backward_adj_edges_to() {
3579        let config = LpgStoreConfig {
3580            backward_edges: false,
3581            initial_node_capacity: 10,
3582            initial_edge_capacity: 10,
3583        };
3584        let store = LpgStore::with_config(config);
3585
3586        let a = store.create_node(&["A"]);
3587        let b = store.create_node(&["B"]);
3588        let e = store.create_edge(a, b, "R");
3589
3590        // edges_to should still work (falls back to scanning)
3591        let edges = store.edges_to(b);
3592        assert_eq!(edges.len(), 1);
3593        assert_eq!(edges[0].1, e);
3594    }
3595
3596    #[test]
3597    fn test_node_versioned_creation() {
3598        let store = LpgStore::new();
3599
3600        let epoch = store.new_epoch();
3601        let tx_id = TxId::new(1);
3602
3603        let id = store.create_node_versioned(&["Person"], epoch, tx_id);
3604        assert!(store.get_node(id).is_some());
3605    }
3606
3607    #[test]
3608    fn test_edge_versioned_creation() {
3609        let store = LpgStore::new();
3610
3611        let a = store.create_node(&["A"]);
3612        let b = store.create_node(&["B"]);
3613
3614        let epoch = store.new_epoch();
3615        let tx_id = TxId::new(1);
3616
3617        let edge_id = store.create_edge_versioned(a, b, "REL", epoch, tx_id);
3618        assert!(store.get_edge(edge_id).is_some());
3619    }
3620
3621    #[test]
3622    fn test_node_with_props_versioned() {
3623        let store = LpgStore::new();
3624
3625        let epoch = store.new_epoch();
3626        let tx_id = TxId::new(1);
3627
3628        let id = store.create_node_with_props_versioned(
3629            &["Person"],
3630            [("name", Value::from("Alice"))],
3631            epoch,
3632            tx_id,
3633        );
3634
3635        let node = store.get_node(id).unwrap();
3636        assert_eq!(
3637            node.get_property("name").and_then(|v| v.as_str()),
3638            Some("Alice")
3639        );
3640    }
3641
3642    #[test]
3643    fn test_discard_uncommitted_versions() {
3644        let store = LpgStore::new();
3645
3646        let epoch = store.new_epoch();
3647        let tx_id = TxId::new(42);
3648
3649        // Create node with specific tx
3650        let node_id = store.create_node_versioned(&["Person"], epoch, tx_id);
3651        assert!(store.get_node(node_id).is_some());
3652
3653        // Discard uncommitted versions for this tx
3654        store.discard_uncommitted_versions(tx_id);
3655
3656        // Node should be gone (version chain was removed)
3657        assert!(store.get_node(node_id).is_none());
3658    }
3659
3660    // === Property Index Tests ===
3661
3662    #[test]
3663    fn test_property_index_create_and_lookup() {
3664        let store = LpgStore::new();
3665
3666        // Create nodes with properties
3667        let alice = store.create_node(&["Person"]);
3668        let bob = store.create_node(&["Person"]);
3669        let charlie = store.create_node(&["Person"]);
3670
3671        store.set_node_property(alice, "city", Value::from("NYC"));
3672        store.set_node_property(bob, "city", Value::from("NYC"));
3673        store.set_node_property(charlie, "city", Value::from("LA"));
3674
3675        // Before indexing, lookup still works (via scan)
3676        let nyc_people = store.find_nodes_by_property("city", &Value::from("NYC"));
3677        assert_eq!(nyc_people.len(), 2);
3678
3679        // Create index
3680        store.create_property_index("city");
3681        assert!(store.has_property_index("city"));
3682
3683        // Indexed lookup should return same results
3684        let nyc_people = store.find_nodes_by_property("city", &Value::from("NYC"));
3685        assert_eq!(nyc_people.len(), 2);
3686        assert!(nyc_people.contains(&alice));
3687        assert!(nyc_people.contains(&bob));
3688
3689        let la_people = store.find_nodes_by_property("city", &Value::from("LA"));
3690        assert_eq!(la_people.len(), 1);
3691        assert!(la_people.contains(&charlie));
3692    }
3693
3694    #[test]
3695    fn test_property_index_maintained_on_update() {
3696        let store = LpgStore::new();
3697
3698        // Create index first
3699        store.create_property_index("status");
3700
3701        let node = store.create_node(&["Task"]);
3702        store.set_node_property(node, "status", Value::from("pending"));
3703
3704        // Should find by initial value
3705        let pending = store.find_nodes_by_property("status", &Value::from("pending"));
3706        assert_eq!(pending.len(), 1);
3707        assert!(pending.contains(&node));
3708
3709        // Update the property
3710        store.set_node_property(node, "status", Value::from("done"));
3711
3712        // Old value should not find it
3713        let pending = store.find_nodes_by_property("status", &Value::from("pending"));
3714        assert!(pending.is_empty());
3715
3716        // New value should find it
3717        let done = store.find_nodes_by_property("status", &Value::from("done"));
3718        assert_eq!(done.len(), 1);
3719        assert!(done.contains(&node));
3720    }
3721
3722    #[test]
3723    fn test_property_index_maintained_on_remove() {
3724        let store = LpgStore::new();
3725
3726        store.create_property_index("tag");
3727
3728        let node = store.create_node(&["Item"]);
3729        store.set_node_property(node, "tag", Value::from("important"));
3730
3731        // Should find it
3732        let found = store.find_nodes_by_property("tag", &Value::from("important"));
3733        assert_eq!(found.len(), 1);
3734
3735        // Remove the property
3736        store.remove_node_property(node, "tag");
3737
3738        // Should no longer find it
3739        let found = store.find_nodes_by_property("tag", &Value::from("important"));
3740        assert!(found.is_empty());
3741    }
3742
3743    #[test]
3744    fn test_property_index_drop() {
3745        let store = LpgStore::new();
3746
3747        store.create_property_index("key");
3748        assert!(store.has_property_index("key"));
3749
3750        assert!(store.drop_property_index("key"));
3751        assert!(!store.has_property_index("key"));
3752
3753        // Dropping non-existent index returns false
3754        assert!(!store.drop_property_index("key"));
3755    }
3756
3757    #[test]
3758    fn test_property_index_multiple_values() {
3759        let store = LpgStore::new();
3760
3761        store.create_property_index("age");
3762
3763        // Create multiple nodes with same and different ages
3764        let n1 = store.create_node(&["Person"]);
3765        let n2 = store.create_node(&["Person"]);
3766        let n3 = store.create_node(&["Person"]);
3767        let n4 = store.create_node(&["Person"]);
3768
3769        store.set_node_property(n1, "age", Value::from(25i64));
3770        store.set_node_property(n2, "age", Value::from(25i64));
3771        store.set_node_property(n3, "age", Value::from(30i64));
3772        store.set_node_property(n4, "age", Value::from(25i64));
3773
3774        let age_25 = store.find_nodes_by_property("age", &Value::from(25i64));
3775        assert_eq!(age_25.len(), 3);
3776
3777        let age_30 = store.find_nodes_by_property("age", &Value::from(30i64));
3778        assert_eq!(age_30.len(), 1);
3779
3780        let age_40 = store.find_nodes_by_property("age", &Value::from(40i64));
3781        assert!(age_40.is_empty());
3782    }
3783
3784    #[test]
3785    fn test_property_index_builds_from_existing_data() {
3786        let store = LpgStore::new();
3787
3788        // Create nodes first
3789        let n1 = store.create_node(&["Person"]);
3790        let n2 = store.create_node(&["Person"]);
3791        store.set_node_property(n1, "email", Value::from("alice@example.com"));
3792        store.set_node_property(n2, "email", Value::from("bob@example.com"));
3793
3794        // Create index after data exists
3795        store.create_property_index("email");
3796
3797        // Index should include existing data
3798        let alice = store.find_nodes_by_property("email", &Value::from("alice@example.com"));
3799        assert_eq!(alice.len(), 1);
3800        assert!(alice.contains(&n1));
3801
3802        let bob = store.find_nodes_by_property("email", &Value::from("bob@example.com"));
3803        assert_eq!(bob.len(), 1);
3804        assert!(bob.contains(&n2));
3805    }
3806
3807    #[test]
3808    fn test_get_node_property_batch() {
3809        let store = LpgStore::new();
3810
3811        let n1 = store.create_node(&["Person"]);
3812        let n2 = store.create_node(&["Person"]);
3813        let n3 = store.create_node(&["Person"]);
3814
3815        store.set_node_property(n1, "age", Value::from(25i64));
3816        store.set_node_property(n2, "age", Value::from(30i64));
3817        // n3 has no age property
3818
3819        let age_key = PropertyKey::new("age");
3820        let values = store.get_node_property_batch(&[n1, n2, n3], &age_key);
3821
3822        assert_eq!(values.len(), 3);
3823        assert_eq!(values[0], Some(Value::from(25i64)));
3824        assert_eq!(values[1], Some(Value::from(30i64)));
3825        assert_eq!(values[2], None);
3826    }
3827
3828    #[test]
3829    fn test_get_node_property_batch_empty() {
3830        let store = LpgStore::new();
3831        let key = PropertyKey::new("any");
3832
3833        let values = store.get_node_property_batch(&[], &key);
3834        assert!(values.is_empty());
3835    }
3836
3837    #[test]
3838    fn test_get_nodes_properties_batch() {
3839        let store = LpgStore::new();
3840
3841        let n1 = store.create_node(&["Person"]);
3842        let n2 = store.create_node(&["Person"]);
3843        let n3 = store.create_node(&["Person"]);
3844
3845        store.set_node_property(n1, "name", Value::from("Alice"));
3846        store.set_node_property(n1, "age", Value::from(25i64));
3847        store.set_node_property(n2, "name", Value::from("Bob"));
3848        // n3 has no properties
3849
3850        let all_props = store.get_nodes_properties_batch(&[n1, n2, n3]);
3851
3852        assert_eq!(all_props.len(), 3);
3853        assert_eq!(all_props[0].len(), 2); // name and age
3854        assert_eq!(all_props[1].len(), 1); // name only
3855        assert_eq!(all_props[2].len(), 0); // no properties
3856
3857        assert_eq!(
3858            all_props[0].get(&PropertyKey::new("name")),
3859            Some(&Value::from("Alice"))
3860        );
3861        assert_eq!(
3862            all_props[1].get(&PropertyKey::new("name")),
3863            Some(&Value::from("Bob"))
3864        );
3865    }
3866
3867    #[test]
3868    fn test_get_nodes_properties_batch_empty() {
3869        let store = LpgStore::new();
3870
3871        let all_props = store.get_nodes_properties_batch(&[]);
3872        assert!(all_props.is_empty());
3873    }
3874}