Skip to main content

grafeo_core/graph/lpg/store/
mod.rs

1//! The in-memory LPG graph store.
2//!
3//! This is where your nodes and edges actually live. Most users interact
4//! through [`GrafeoDB`](grafeo_engine::GrafeoDB), but algorithm implementers
5//! sometimes need the raw [`LpgStore`] for direct adjacency traversal.
6//!
7//! Key features:
8//! - MVCC versioning - concurrent readers don't block each other
9//! - Columnar properties with zone maps for fast filtering
10//! - Forward and backward adjacency indexes
11
12mod edge_ops;
13mod graph_store_impl;
14mod index;
15mod node_ops;
16mod property_ops;
17mod schema;
18mod search;
19mod statistics;
20mod traversal;
21mod versioning;
22
23#[cfg(test)]
24mod tests;
25
26use super::PropertyStorage;
27#[cfg(not(feature = "tiered-storage"))]
28use super::{EdgeRecord, NodeRecord};
29use crate::index::adjacency::ChunkedAdjacency;
30use crate::statistics::Statistics;
31use arcstr::ArcStr;
32use dashmap::DashMap;
33#[cfg(not(feature = "tiered-storage"))]
34use grafeo_common::mvcc::VersionChain;
35use grafeo_common::types::{EdgeId, EpochId, HashableValue, NodeId, PropertyKey, Value};
36use grafeo_common::utils::hash::{FxHashMap, FxHashSet};
37use parking_lot::RwLock;
38use std::cmp::Ordering as CmpOrdering;
39use std::sync::Arc;
40use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, Ordering};
41
42#[cfg(feature = "vector-index")]
43use crate::index::vector::HnswIndex;
44
45#[cfg(feature = "tiered-storage")]
46use crate::storage::EpochStore;
47#[cfg(feature = "tiered-storage")]
48use grafeo_common::memory::arena::ArenaAllocator;
49#[cfg(feature = "tiered-storage")]
50use grafeo_common::mvcc::VersionIndex;
51
52/// Compares two values for ordering (used for range checks).
53pub(super) fn compare_values_for_range(a: &Value, b: &Value) -> Option<CmpOrdering> {
54    match (a, b) {
55        (Value::Int64(a), Value::Int64(b)) => Some(a.cmp(b)),
56        (Value::Float64(a), Value::Float64(b)) => a.partial_cmp(b),
57        (Value::String(a), Value::String(b)) => Some(a.cmp(b)),
58        (Value::Bool(a), Value::Bool(b)) => Some(a.cmp(b)),
59        _ => None,
60    }
61}
62
63/// Checks if a value is within a range.
64pub(super) fn value_in_range(
65    value: &Value,
66    min: Option<&Value>,
67    max: Option<&Value>,
68    min_inclusive: bool,
69    max_inclusive: bool,
70) -> bool {
71    // Check lower bound
72    if let Some(min_val) = min {
73        match compare_values_for_range(value, min_val) {
74            Some(CmpOrdering::Less) => return false,
75            Some(CmpOrdering::Equal) if !min_inclusive => return false,
76            None => return false, // Can't compare
77            _ => {}
78        }
79    }
80
81    // Check upper bound
82    if let Some(max_val) = max {
83        match compare_values_for_range(value, max_val) {
84            Some(CmpOrdering::Greater) => return false,
85            Some(CmpOrdering::Equal) if !max_inclusive => return false,
86            None => return false,
87            _ => {}
88        }
89    }
90
91    true
92}
93
94/// Configuration for the LPG store.
95///
96/// The defaults work well for most cases. Tune `backward_edges` if you only
97/// traverse in one direction (saves memory), or adjust capacities if you know
98/// your graph size upfront (avoids reallocations).
99#[derive(Debug, Clone)]
100pub struct LpgStoreConfig {
101    /// Maintain backward adjacency for incoming edge queries. Turn off if
102    /// you only traverse outgoing edges - saves ~50% adjacency memory.
103    pub backward_edges: bool,
104    /// Initial capacity for nodes (avoids early reallocations).
105    pub initial_node_capacity: usize,
106    /// Initial capacity for edges (avoids early reallocations).
107    pub initial_edge_capacity: usize,
108}
109
110impl Default for LpgStoreConfig {
111    fn default() -> Self {
112        Self {
113            backward_edges: true,
114            initial_node_capacity: 1024,
115            initial_edge_capacity: 4096,
116        }
117    }
118}
119
120/// The core in-memory graph storage.
121///
122/// Everything lives here: nodes, edges, properties, adjacency indexes, and
123/// version chains for MVCC. Concurrent reads never block each other.
124///
125/// Most users should go through `GrafeoDB` (from the `grafeo_engine` crate) which
126/// adds transaction management and query execution. Use `LpgStore` directly
127/// when you need raw performance for algorithm implementations.
128///
129/// # Example
130///
131/// ```
132/// use grafeo_core::graph::lpg::LpgStore;
133/// use grafeo_core::graph::Direction;
134///
135/// let store = LpgStore::new();
136///
137/// // Create a small social network
138/// let alice = store.create_node(&["Person"]);
139/// let bob = store.create_node(&["Person"]);
140/// store.create_edge(alice, bob, "KNOWS");
141///
142/// // Traverse outgoing edges
143/// for neighbor in store.neighbors(alice, Direction::Outgoing) {
144///     println!("Alice knows node {:?}", neighbor);
145/// }
146/// ```
147///
148/// # Lock Ordering
149///
150/// `LpgStore` contains multiple `RwLock` fields that must be acquired in a
151/// consistent order to prevent deadlocks. Always acquire locks in this order:
152///
153/// ## Level 1 - Entity Storage (mutually exclusive via feature flag)
154/// 1. `nodes` / `node_versions`
155/// 2. `edges` / `edge_versions`
156///
157/// ## Level 2 - Catalogs (acquire as pairs when writing)
158/// 3. `label_to_id` + `id_to_label`
159/// 4. `edge_type_to_id` + `id_to_edge_type`
160///
161/// ## Level 3 - Indexes
162/// 5. `label_index`
163/// 6. `node_labels`
164/// 7. `property_indexes`
165///
166/// ## Level 4 - Statistics
167/// 8. `statistics`
168///
169/// ## Level 5 - Nested Locks (internal to other structs)
170/// 9. `PropertyStorage::columns` (via `node_properties`/`edge_properties`)
171/// 10. `ChunkedAdjacency::lists` (via `forward_adj`/`backward_adj`)
172///
173/// ## Rules
174/// - Catalog pairs must be acquired together when writing.
175/// - Never hold entity locks while acquiring catalog locks in a different scope.
176/// - Statistics lock is always last.
177/// - Read locks are generally safe, but avoid read-to-write upgrades.
178pub struct LpgStore {
179    /// Configuration.
180    #[allow(dead_code)]
181    pub(super) config: LpgStoreConfig,
182
183    /// Node records indexed by NodeId, with version chains for MVCC.
184    /// Used when `tiered-storage` feature is disabled.
185    /// Lock order: 1
186    #[cfg(not(feature = "tiered-storage"))]
187    pub(super) nodes: RwLock<FxHashMap<NodeId, VersionChain<NodeRecord>>>,
188
189    /// Edge records indexed by EdgeId, with version chains for MVCC.
190    /// Used when `tiered-storage` feature is disabled.
191    /// Lock order: 2
192    #[cfg(not(feature = "tiered-storage"))]
193    pub(super) edges: RwLock<FxHashMap<EdgeId, VersionChain<EdgeRecord>>>,
194
195    // === Tiered Storage Fields (feature-gated) ===
196    //
197    // Lock ordering for arena access:
198    //   version_lock (read/write) → arena read lock (via arena_allocator.arena())
199    //
200    // Rules:
201    // - Acquire arena read lock *after* version locks, never before.
202    // - Multiple threads may call arena.read_at() concurrently (shared refs only).
203    // - Never acquire arena write lock (alloc_new_chunk) while holding version locks.
204    // - freeze_epoch order: node_versions.read() → arena.read_at(),
205    //   then edge_versions.read() → arena.read_at().
206    /// Arena allocator for hot data storage.
207    /// Data is stored in per-epoch arenas for fast allocation and bulk deallocation.
208    #[cfg(feature = "tiered-storage")]
209    pub(super) arena_allocator: Arc<ArenaAllocator>,
210
211    /// Node version indexes - store metadata and arena offsets.
212    /// The actual NodeRecord data is stored in the arena.
213    /// Lock order: 1
214    #[cfg(feature = "tiered-storage")]
215    pub(super) node_versions: RwLock<FxHashMap<NodeId, VersionIndex>>,
216
217    /// Edge version indexes - store metadata and arena offsets.
218    /// The actual EdgeRecord data is stored in the arena.
219    /// Lock order: 2
220    #[cfg(feature = "tiered-storage")]
221    pub(super) edge_versions: RwLock<FxHashMap<EdgeId, VersionIndex>>,
222
223    /// Cold storage for frozen epochs.
224    /// Contains compressed epoch blocks for historical data.
225    #[cfg(feature = "tiered-storage")]
226    pub(super) epoch_store: Arc<EpochStore>,
227
228    /// Property storage for nodes.
229    pub(super) node_properties: PropertyStorage<NodeId>,
230
231    /// Property storage for edges.
232    pub(super) edge_properties: PropertyStorage<EdgeId>,
233
234    /// Label name to ID mapping.
235    /// Lock order: 3 (acquire with id_to_label)
236    pub(super) label_to_id: RwLock<FxHashMap<ArcStr, u32>>,
237
238    /// Label ID to name mapping.
239    /// Lock order: 3 (acquire with label_to_id)
240    pub(super) id_to_label: RwLock<Vec<ArcStr>>,
241
242    /// Edge type name to ID mapping.
243    /// Lock order: 4 (acquire with id_to_edge_type)
244    pub(super) edge_type_to_id: RwLock<FxHashMap<ArcStr, u32>>,
245
246    /// Edge type ID to name mapping.
247    /// Lock order: 4 (acquire with edge_type_to_id)
248    pub(super) id_to_edge_type: RwLock<Vec<ArcStr>>,
249
250    /// Forward adjacency lists (outgoing edges).
251    pub(super) forward_adj: ChunkedAdjacency,
252
253    /// Backward adjacency lists (incoming edges).
254    /// Only populated if config.backward_edges is true.
255    pub(super) backward_adj: Option<ChunkedAdjacency>,
256
257    /// Label index: label_id -> set of node IDs.
258    /// Lock order: 5
259    pub(super) label_index: RwLock<Vec<FxHashMap<NodeId, ()>>>,
260
261    /// Node labels: node_id -> set of label IDs.
262    /// Reverse mapping to efficiently get labels for a node.
263    /// Lock order: 6
264    pub(super) node_labels: RwLock<FxHashMap<NodeId, FxHashSet<u32>>>,
265
266    /// Property indexes: property_key -> (value -> set of node IDs).
267    ///
268    /// When a property is indexed, lookups by value are O(1) instead of O(n).
269    /// Use [`create_property_index`] to enable indexing for a property.
270    /// Lock order: 7
271    pub(super) property_indexes:
272        RwLock<FxHashMap<PropertyKey, DashMap<HashableValue, FxHashSet<NodeId>>>>,
273
274    /// Vector indexes: "label:property" -> HNSW index.
275    ///
276    /// Created via [`GrafeoDB::create_vector_index`](grafeo_engine::GrafeoDB::create_vector_index).
277    /// Lock order: 7 (same level as property_indexes, disjoint keys)
278    #[cfg(feature = "vector-index")]
279    pub(super) vector_indexes: RwLock<FxHashMap<String, Arc<HnswIndex>>>,
280
281    /// Text indexes: "label:property" -> inverted index with BM25 scoring.
282    ///
283    /// Created via [`GrafeoDB::create_text_index`](grafeo_engine::GrafeoDB::create_text_index).
284    /// Lock order: 7 (same level as property_indexes, disjoint keys)
285    #[cfg(feature = "text-index")]
286    pub(super) text_indexes:
287        RwLock<FxHashMap<String, Arc<RwLock<crate::index::text::InvertedIndex>>>>,
288
289    /// Next node ID.
290    pub(super) next_node_id: AtomicU64,
291
292    /// Next edge ID.
293    pub(super) next_edge_id: AtomicU64,
294
295    /// Current epoch.
296    pub(super) current_epoch: AtomicU64,
297
298    /// Live (non-deleted) node count, maintained incrementally.
299    /// Avoids O(n) full scan in `compute_statistics()`.
300    pub(super) live_node_count: AtomicI64,
301
302    /// Live (non-deleted) edge count, maintained incrementally.
303    /// Avoids O(m) full scan in `compute_statistics()`.
304    pub(super) live_edge_count: AtomicI64,
305
306    /// Per-edge-type live counts, indexed by edge_type_id.
307    /// Avoids O(m) edge scan in `compute_statistics()`.
308    /// Lock order: 8 (same level as statistics)
309    pub(super) edge_type_live_counts: RwLock<Vec<i64>>,
310
311    /// Statistics for cost-based optimization.
312    /// Lock order: 8 (always last)
313    pub(super) statistics: RwLock<Arc<Statistics>>,
314
315    /// Whether statistics need full recomputation (e.g., after rollback).
316    pub(super) needs_stats_recompute: AtomicBool,
317}
318
319impl LpgStore {
320    /// Creates a new LPG store with default configuration.
321    #[must_use]
322    pub fn new() -> Self {
323        Self::with_config(LpgStoreConfig::default())
324    }
325
326    /// Creates a new LPG store with custom configuration.
327    #[must_use]
328    pub fn with_config(config: LpgStoreConfig) -> Self {
329        let backward_adj = if config.backward_edges {
330            Some(ChunkedAdjacency::new())
331        } else {
332            None
333        };
334
335        Self {
336            #[cfg(not(feature = "tiered-storage"))]
337            nodes: RwLock::new(FxHashMap::default()),
338            #[cfg(not(feature = "tiered-storage"))]
339            edges: RwLock::new(FxHashMap::default()),
340            #[cfg(feature = "tiered-storage")]
341            arena_allocator: Arc::new(ArenaAllocator::new()),
342            #[cfg(feature = "tiered-storage")]
343            node_versions: RwLock::new(FxHashMap::default()),
344            #[cfg(feature = "tiered-storage")]
345            edge_versions: RwLock::new(FxHashMap::default()),
346            #[cfg(feature = "tiered-storage")]
347            epoch_store: Arc::new(EpochStore::new()),
348            node_properties: PropertyStorage::new(),
349            edge_properties: PropertyStorage::new(),
350            label_to_id: RwLock::new(FxHashMap::default()),
351            id_to_label: RwLock::new(Vec::new()),
352            edge_type_to_id: RwLock::new(FxHashMap::default()),
353            id_to_edge_type: RwLock::new(Vec::new()),
354            forward_adj: ChunkedAdjacency::new(),
355            backward_adj,
356            label_index: RwLock::new(Vec::new()),
357            node_labels: RwLock::new(FxHashMap::default()),
358            property_indexes: RwLock::new(FxHashMap::default()),
359            #[cfg(feature = "vector-index")]
360            vector_indexes: RwLock::new(FxHashMap::default()),
361            #[cfg(feature = "text-index")]
362            text_indexes: RwLock::new(FxHashMap::default()),
363            next_node_id: AtomicU64::new(0),
364            next_edge_id: AtomicU64::new(0),
365            current_epoch: AtomicU64::new(0),
366            live_node_count: AtomicI64::new(0),
367            live_edge_count: AtomicI64::new(0),
368            edge_type_live_counts: RwLock::new(Vec::new()),
369            statistics: RwLock::new(Arc::new(Statistics::new())),
370            needs_stats_recompute: AtomicBool::new(false),
371            config,
372        }
373    }
374
375    /// Returns the current epoch.
376    #[must_use]
377    pub fn current_epoch(&self) -> EpochId {
378        EpochId::new(self.current_epoch.load(Ordering::Acquire))
379    }
380
381    /// Creates a new epoch.
382    #[doc(hidden)]
383    pub fn new_epoch(&self) -> EpochId {
384        let id = self.current_epoch.fetch_add(1, Ordering::AcqRel) + 1;
385        EpochId::new(id)
386    }
387
388    /// Syncs the store epoch to match an external epoch counter.
389    ///
390    /// Used by the transaction manager to keep the store's epoch in step
391    /// after a transaction commit advances the global epoch.
392    #[doc(hidden)]
393    pub fn sync_epoch(&self, epoch: EpochId) {
394        self.current_epoch
395            .fetch_max(epoch.as_u64(), Ordering::AcqRel);
396    }
397
398    /// Removes all data from the store, resetting it to an empty state.
399    ///
400    /// Acquires locks in the documented ordering to prevent deadlocks.
401    /// After clearing, the store behaves as if freshly constructed.
402    pub fn clear(&self) {
403        // Level 1: Entity storage
404        #[cfg(not(feature = "tiered-storage"))]
405        {
406            self.nodes.write().clear();
407            self.edges.write().clear();
408        }
409        #[cfg(feature = "tiered-storage")]
410        {
411            self.node_versions.write().clear();
412            self.edge_versions.write().clear();
413            // Arena allocator chunks are leaked; epochs are cleared via epoch_store.
414        }
415
416        // Level 2: Catalogs (acquire as pairs)
417        {
418            self.label_to_id.write().clear();
419            self.id_to_label.write().clear();
420        }
421        {
422            self.edge_type_to_id.write().clear();
423            self.id_to_edge_type.write().clear();
424        }
425
426        // Level 3: Indexes
427        self.label_index.write().clear();
428        self.node_labels.write().clear();
429        self.property_indexes.write().clear();
430        #[cfg(feature = "vector-index")]
431        self.vector_indexes.write().clear();
432        #[cfg(feature = "text-index")]
433        self.text_indexes.write().clear();
434
435        // Nested: Properties and adjacency
436        self.node_properties.clear();
437        self.edge_properties.clear();
438        self.forward_adj.clear();
439        if let Some(ref backward) = self.backward_adj {
440            backward.clear();
441        }
442
443        // Atomics: ID counters
444        self.next_node_id.store(0, Ordering::Release);
445        self.next_edge_id.store(0, Ordering::Release);
446        self.current_epoch.store(0, Ordering::Release);
447
448        // Level 4: Statistics
449        self.live_node_count.store(0, Ordering::Release);
450        self.live_edge_count.store(0, Ordering::Release);
451        self.edge_type_live_counts.write().clear();
452        *self.statistics.write() = Arc::new(Statistics::new());
453        self.needs_stats_recompute.store(false, Ordering::Release);
454    }
455
456    /// Returns whether backward adjacency (incoming edge index) is available.
457    ///
458    /// When backward adjacency is enabled (the default), bidirectional search
459    /// algorithms can traverse from the target toward the source.
460    #[must_use]
461    pub fn has_backward_adjacency(&self) -> bool {
462        self.backward_adj.is_some()
463    }
464
465    // === Internal Helpers ===
466
467    pub(super) fn get_or_create_label_id(&self, label: &str) -> u32 {
468        {
469            let label_to_id = self.label_to_id.read();
470            if let Some(&id) = label_to_id.get(label) {
471                return id;
472            }
473        }
474
475        let mut label_to_id = self.label_to_id.write();
476        let mut id_to_label = self.id_to_label.write();
477
478        // Double-check after acquiring write lock
479        if let Some(&id) = label_to_id.get(label) {
480            return id;
481        }
482
483        let id = id_to_label.len() as u32;
484
485        let label: ArcStr = label.into();
486        label_to_id.insert(label.clone(), id);
487        id_to_label.push(label);
488
489        id
490    }
491
492    pub(super) fn get_or_create_edge_type_id(&self, edge_type: &str) -> u32 {
493        {
494            let type_to_id = self.edge_type_to_id.read();
495            if let Some(&id) = type_to_id.get(edge_type) {
496                return id;
497            }
498        }
499
500        let mut type_to_id = self.edge_type_to_id.write();
501        let mut id_to_type = self.id_to_edge_type.write();
502
503        // Double-check
504        if let Some(&id) = type_to_id.get(edge_type) {
505            return id;
506        }
507
508        let id = id_to_type.len() as u32;
509        let edge_type: ArcStr = edge_type.into();
510        type_to_id.insert(edge_type.clone(), id);
511        id_to_type.push(edge_type);
512
513        // Grow edge type live counts to match
514        let mut counts = self.edge_type_live_counts.write();
515        while counts.len() <= id as usize {
516            counts.push(0);
517        }
518
519        id
520    }
521
522    /// Increments the live edge count for a given edge type.
523    pub(super) fn increment_edge_type_count(&self, type_id: u32) {
524        let mut counts = self.edge_type_live_counts.write();
525        if counts.len() <= type_id as usize {
526            counts.resize(type_id as usize + 1, 0);
527        }
528        counts[type_id as usize] += 1;
529    }
530
531    /// Decrements the live edge count for a given edge type.
532    pub(super) fn decrement_edge_type_count(&self, type_id: u32) {
533        let mut counts = self.edge_type_live_counts.write();
534        if type_id < counts.len() as u32 {
535            counts[type_id as usize] -= 1;
536        }
537    }
538}
539
540impl Default for LpgStore {
541    fn default() -> Self {
542        Self::new()
543    }
544}