Skip to main content

grafeo_core/graph/lpg/store/
mod.rs

1//! The in-memory LPG graph store.
2//!
3//! This is where your nodes and edges actually live. Most users interact
4//! through [`GrafeoDB`](grafeo_engine::GrafeoDB), but algorithm implementers
5//! sometimes need the raw [`LpgStore`] for direct adjacency traversal.
6//!
7//! Key features:
8//! - MVCC versioning - concurrent readers don't block each other
9//! - Columnar properties with zone maps for fast filtering
10//! - Forward and backward adjacency indexes
11
12mod edge_ops;
13mod index;
14mod node_ops;
15mod property_ops;
16mod schema;
17mod search;
18mod statistics;
19mod traversal;
20mod versioning;
21
22#[cfg(test)]
23mod tests;
24
25use super::PropertyStorage;
26#[cfg(not(feature = "tiered-storage"))]
27use super::{EdgeRecord, NodeRecord};
28use crate::index::adjacency::ChunkedAdjacency;
29use crate::statistics::Statistics;
30use arcstr::ArcStr;
31use dashmap::DashMap;
32#[cfg(not(feature = "tiered-storage"))]
33use grafeo_common::mvcc::VersionChain;
34use grafeo_common::types::{EdgeId, EpochId, HashableValue, NodeId, PropertyKey, Value};
35use grafeo_common::utils::hash::{FxHashMap, FxHashSet};
36use parking_lot::RwLock;
37use std::cmp::Ordering as CmpOrdering;
38use std::sync::Arc;
39use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, Ordering};
40
41#[cfg(feature = "vector-index")]
42use crate::index::vector::HnswIndex;
43
44#[cfg(feature = "tiered-storage")]
45use crate::storage::EpochStore;
46#[cfg(feature = "tiered-storage")]
47use grafeo_common::memory::arena::ArenaAllocator;
48#[cfg(feature = "tiered-storage")]
49use grafeo_common::mvcc::VersionIndex;
50
51/// Compares two values for ordering (used for range checks).
52pub(super) fn compare_values_for_range(a: &Value, b: &Value) -> Option<CmpOrdering> {
53    match (a, b) {
54        (Value::Int64(a), Value::Int64(b)) => Some(a.cmp(b)),
55        (Value::Float64(a), Value::Float64(b)) => a.partial_cmp(b),
56        (Value::String(a), Value::String(b)) => Some(a.cmp(b)),
57        (Value::Bool(a), Value::Bool(b)) => Some(a.cmp(b)),
58        _ => None,
59    }
60}
61
62/// Checks if a value is within a range.
63pub(super) fn value_in_range(
64    value: &Value,
65    min: Option<&Value>,
66    max: Option<&Value>,
67    min_inclusive: bool,
68    max_inclusive: bool,
69) -> bool {
70    // Check lower bound
71    if let Some(min_val) = min {
72        match compare_values_for_range(value, min_val) {
73            Some(CmpOrdering::Less) => return false,
74            Some(CmpOrdering::Equal) if !min_inclusive => return false,
75            None => return false, // Can't compare
76            _ => {}
77        }
78    }
79
80    // Check upper bound
81    if let Some(max_val) = max {
82        match compare_values_for_range(value, max_val) {
83            Some(CmpOrdering::Greater) => return false,
84            Some(CmpOrdering::Equal) if !max_inclusive => return false,
85            None => return false,
86            _ => {}
87        }
88    }
89
90    true
91}
92
93/// Configuration for the LPG store.
94///
95/// The defaults work well for most cases. Tune `backward_edges` if you only
96/// traverse in one direction (saves memory), or adjust capacities if you know
97/// your graph size upfront (avoids reallocations).
98#[derive(Debug, Clone)]
99pub struct LpgStoreConfig {
100    /// Maintain backward adjacency for incoming edge queries. Turn off if
101    /// you only traverse outgoing edges - saves ~50% adjacency memory.
102    pub backward_edges: bool,
103    /// Initial capacity for nodes (avoids early reallocations).
104    pub initial_node_capacity: usize,
105    /// Initial capacity for edges (avoids early reallocations).
106    pub initial_edge_capacity: usize,
107}
108
109impl Default for LpgStoreConfig {
110    fn default() -> Self {
111        Self {
112            backward_edges: true,
113            initial_node_capacity: 1024,
114            initial_edge_capacity: 4096,
115        }
116    }
117}
118
119/// The core in-memory graph storage.
120///
121/// Everything lives here: nodes, edges, properties, adjacency indexes, and
122/// version chains for MVCC. Concurrent reads never block each other.
123///
124/// Most users should go through `GrafeoDB` (from the `grafeo_engine` crate) which
125/// adds transaction management and query execution. Use `LpgStore` directly
126/// when you need raw performance for algorithm implementations.
127///
128/// # Example
129///
130/// ```
131/// use grafeo_core::graph::lpg::LpgStore;
132/// use grafeo_core::graph::Direction;
133///
134/// let store = LpgStore::new();
135///
136/// // Create a small social network
137/// let alice = store.create_node(&["Person"]);
138/// let bob = store.create_node(&["Person"]);
139/// store.create_edge(alice, bob, "KNOWS");
140///
141/// // Traverse outgoing edges
142/// for neighbor in store.neighbors(alice, Direction::Outgoing) {
143///     println!("Alice knows node {:?}", neighbor);
144/// }
145/// ```
146///
147/// # Lock Ordering
148///
149/// `LpgStore` contains multiple `RwLock` fields that must be acquired in a
150/// consistent order to prevent deadlocks. Always acquire locks in this order:
151///
152/// ## Level 1 - Entity Storage (mutually exclusive via feature flag)
153/// 1. `nodes` / `node_versions`
154/// 2. `edges` / `edge_versions`
155///
156/// ## Level 2 - Catalogs (acquire as pairs when writing)
157/// 3. `label_to_id` + `id_to_label`
158/// 4. `edge_type_to_id` + `id_to_edge_type`
159///
160/// ## Level 3 - Indexes
161/// 5. `label_index`
162/// 6. `node_labels`
163/// 7. `property_indexes`
164///
165/// ## Level 4 - Statistics
166/// 8. `statistics`
167///
168/// ## Level 5 - Nested Locks (internal to other structs)
169/// 9. `PropertyStorage::columns` (via `node_properties`/`edge_properties`)
170/// 10. `ChunkedAdjacency::lists` (via `forward_adj`/`backward_adj`)
171///
172/// ## Rules
173/// - Catalog pairs must be acquired together when writing.
174/// - Never hold entity locks while acquiring catalog locks in a different scope.
175/// - Statistics lock is always last.
176/// - Read locks are generally safe, but avoid read-to-write upgrades.
177pub struct LpgStore {
178    /// Configuration.
179    #[allow(dead_code)]
180    pub(super) config: LpgStoreConfig,
181
182    /// Node records indexed by NodeId, with version chains for MVCC.
183    /// Used when `tiered-storage` feature is disabled.
184    /// Lock order: 1
185    #[cfg(not(feature = "tiered-storage"))]
186    pub(super) nodes: RwLock<FxHashMap<NodeId, VersionChain<NodeRecord>>>,
187
188    /// Edge records indexed by EdgeId, with version chains for MVCC.
189    /// Used when `tiered-storage` feature is disabled.
190    /// Lock order: 2
191    #[cfg(not(feature = "tiered-storage"))]
192    pub(super) edges: RwLock<FxHashMap<EdgeId, VersionChain<EdgeRecord>>>,
193
194    // === Tiered Storage Fields (feature-gated) ===
195    //
196    // Lock ordering for arena access:
197    //   version_lock (read/write) → arena read lock (via arena_allocator.arena())
198    //
199    // Rules:
200    // - Acquire arena read lock *after* version locks, never before.
201    // - Multiple threads may call arena.read_at() concurrently (shared refs only).
202    // - Never acquire arena write lock (alloc_new_chunk) while holding version locks.
203    // - freeze_epoch order: node_versions.read() → arena.read_at(),
204    //   then edge_versions.read() → arena.read_at().
205    /// Arena allocator for hot data storage.
206    /// Data is stored in per-epoch arenas for fast allocation and bulk deallocation.
207    #[cfg(feature = "tiered-storage")]
208    pub(super) arena_allocator: Arc<ArenaAllocator>,
209
210    /// Node version indexes - store metadata and arena offsets.
211    /// The actual NodeRecord data is stored in the arena.
212    /// Lock order: 1
213    #[cfg(feature = "tiered-storage")]
214    pub(super) node_versions: RwLock<FxHashMap<NodeId, VersionIndex>>,
215
216    /// Edge version indexes - store metadata and arena offsets.
217    /// The actual EdgeRecord data is stored in the arena.
218    /// Lock order: 2
219    #[cfg(feature = "tiered-storage")]
220    pub(super) edge_versions: RwLock<FxHashMap<EdgeId, VersionIndex>>,
221
222    /// Cold storage for frozen epochs.
223    /// Contains compressed epoch blocks for historical data.
224    #[cfg(feature = "tiered-storage")]
225    pub(super) epoch_store: Arc<EpochStore>,
226
227    /// Property storage for nodes.
228    pub(super) node_properties: PropertyStorage<NodeId>,
229
230    /// Property storage for edges.
231    pub(super) edge_properties: PropertyStorage<EdgeId>,
232
233    /// Label name to ID mapping.
234    /// Lock order: 3 (acquire with id_to_label)
235    pub(super) label_to_id: RwLock<FxHashMap<ArcStr, u32>>,
236
237    /// Label ID to name mapping.
238    /// Lock order: 3 (acquire with label_to_id)
239    pub(super) id_to_label: RwLock<Vec<ArcStr>>,
240
241    /// Edge type name to ID mapping.
242    /// Lock order: 4 (acquire with id_to_edge_type)
243    pub(super) edge_type_to_id: RwLock<FxHashMap<ArcStr, u32>>,
244
245    /// Edge type ID to name mapping.
246    /// Lock order: 4 (acquire with edge_type_to_id)
247    pub(super) id_to_edge_type: RwLock<Vec<ArcStr>>,
248
249    /// Forward adjacency lists (outgoing edges).
250    pub(super) forward_adj: ChunkedAdjacency,
251
252    /// Backward adjacency lists (incoming edges).
253    /// Only populated if config.backward_edges is true.
254    pub(super) backward_adj: Option<ChunkedAdjacency>,
255
256    /// Label index: label_id -> set of node IDs.
257    /// Lock order: 5
258    pub(super) label_index: RwLock<Vec<FxHashMap<NodeId, ()>>>,
259
260    /// Node labels: node_id -> set of label IDs.
261    /// Reverse mapping to efficiently get labels for a node.
262    /// Lock order: 6
263    pub(super) node_labels: RwLock<FxHashMap<NodeId, FxHashSet<u32>>>,
264
265    /// Property indexes: property_key -> (value -> set of node IDs).
266    ///
267    /// When a property is indexed, lookups by value are O(1) instead of O(n).
268    /// Use [`create_property_index`] to enable indexing for a property.
269    /// Lock order: 7
270    pub(super) property_indexes:
271        RwLock<FxHashMap<PropertyKey, DashMap<HashableValue, FxHashSet<NodeId>>>>,
272
273    /// Vector indexes: "label:property" -> HNSW index.
274    ///
275    /// Created via [`GrafeoDB::create_vector_index`](grafeo_engine::GrafeoDB::create_vector_index).
276    /// Lock order: 7 (same level as property_indexes, disjoint keys)
277    #[cfg(feature = "vector-index")]
278    pub(super) vector_indexes: RwLock<FxHashMap<String, Arc<HnswIndex>>>,
279
280    /// Text indexes: "label:property" -> inverted index with BM25 scoring.
281    ///
282    /// Created via [`GrafeoDB::create_text_index`](grafeo_engine::GrafeoDB::create_text_index).
283    /// Lock order: 7 (same level as property_indexes, disjoint keys)
284    #[cfg(feature = "text-index")]
285    pub(super) text_indexes:
286        RwLock<FxHashMap<String, Arc<RwLock<crate::index::text::InvertedIndex>>>>,
287
288    /// Next node ID.
289    pub(super) next_node_id: AtomicU64,
290
291    /// Next edge ID.
292    pub(super) next_edge_id: AtomicU64,
293
294    /// Current epoch.
295    pub(super) current_epoch: AtomicU64,
296
297    /// Live (non-deleted) node count, maintained incrementally.
298    /// Avoids O(n) full scan in `compute_statistics()`.
299    pub(super) live_node_count: AtomicI64,
300
301    /// Live (non-deleted) edge count, maintained incrementally.
302    /// Avoids O(m) full scan in `compute_statistics()`.
303    pub(super) live_edge_count: AtomicI64,
304
305    /// Per-edge-type live counts, indexed by edge_type_id.
306    /// Avoids O(m) edge scan in `compute_statistics()`.
307    /// Lock order: 8 (same level as statistics)
308    pub(super) edge_type_live_counts: RwLock<Vec<i64>>,
309
310    /// Statistics for cost-based optimization.
311    /// Lock order: 8 (always last)
312    pub(super) statistics: RwLock<Arc<Statistics>>,
313
314    /// Whether statistics need full recomputation (e.g., after rollback).
315    pub(super) needs_stats_recompute: AtomicBool,
316}
317
318impl LpgStore {
319    /// Creates a new LPG store with default configuration.
320    #[must_use]
321    pub fn new() -> Self {
322        Self::with_config(LpgStoreConfig::default())
323    }
324
325    /// Creates a new LPG store with custom configuration.
326    #[must_use]
327    pub fn with_config(config: LpgStoreConfig) -> Self {
328        let backward_adj = if config.backward_edges {
329            Some(ChunkedAdjacency::new())
330        } else {
331            None
332        };
333
334        Self {
335            #[cfg(not(feature = "tiered-storage"))]
336            nodes: RwLock::new(FxHashMap::default()),
337            #[cfg(not(feature = "tiered-storage"))]
338            edges: RwLock::new(FxHashMap::default()),
339            #[cfg(feature = "tiered-storage")]
340            arena_allocator: Arc::new(ArenaAllocator::new()),
341            #[cfg(feature = "tiered-storage")]
342            node_versions: RwLock::new(FxHashMap::default()),
343            #[cfg(feature = "tiered-storage")]
344            edge_versions: RwLock::new(FxHashMap::default()),
345            #[cfg(feature = "tiered-storage")]
346            epoch_store: Arc::new(EpochStore::new()),
347            node_properties: PropertyStorage::new(),
348            edge_properties: PropertyStorage::new(),
349            label_to_id: RwLock::new(FxHashMap::default()),
350            id_to_label: RwLock::new(Vec::new()),
351            edge_type_to_id: RwLock::new(FxHashMap::default()),
352            id_to_edge_type: RwLock::new(Vec::new()),
353            forward_adj: ChunkedAdjacency::new(),
354            backward_adj,
355            label_index: RwLock::new(Vec::new()),
356            node_labels: RwLock::new(FxHashMap::default()),
357            property_indexes: RwLock::new(FxHashMap::default()),
358            #[cfg(feature = "vector-index")]
359            vector_indexes: RwLock::new(FxHashMap::default()),
360            #[cfg(feature = "text-index")]
361            text_indexes: RwLock::new(FxHashMap::default()),
362            next_node_id: AtomicU64::new(0),
363            next_edge_id: AtomicU64::new(0),
364            current_epoch: AtomicU64::new(0),
365            live_node_count: AtomicI64::new(0),
366            live_edge_count: AtomicI64::new(0),
367            edge_type_live_counts: RwLock::new(Vec::new()),
368            statistics: RwLock::new(Arc::new(Statistics::new())),
369            needs_stats_recompute: AtomicBool::new(false),
370            config,
371        }
372    }
373
374    /// Returns the current epoch.
375    #[must_use]
376    pub fn current_epoch(&self) -> EpochId {
377        EpochId::new(self.current_epoch.load(Ordering::Acquire))
378    }
379
380    /// Creates a new epoch.
381    #[doc(hidden)]
382    pub fn new_epoch(&self) -> EpochId {
383        let id = self.current_epoch.fetch_add(1, Ordering::AcqRel) + 1;
384        EpochId::new(id)
385    }
386
387    /// Syncs the store epoch to match an external epoch counter.
388    ///
389    /// Used by the transaction manager to keep the store's epoch in step
390    /// after a transaction commit advances the global epoch.
391    #[doc(hidden)]
392    pub fn sync_epoch(&self, epoch: EpochId) {
393        self.current_epoch
394            .fetch_max(epoch.as_u64(), Ordering::AcqRel);
395    }
396
397    /// Returns whether backward adjacency (incoming edge index) is available.
398    ///
399    /// When backward adjacency is enabled (the default), bidirectional search
400    /// algorithms can traverse from the target toward the source.
401    #[must_use]
402    pub fn has_backward_adjacency(&self) -> bool {
403        self.backward_adj.is_some()
404    }
405
406    // === Internal Helpers ===
407
408    pub(super) fn get_or_create_label_id(&self, label: &str) -> u32 {
409        {
410            let label_to_id = self.label_to_id.read();
411            if let Some(&id) = label_to_id.get(label) {
412                return id;
413            }
414        }
415
416        let mut label_to_id = self.label_to_id.write();
417        let mut id_to_label = self.id_to_label.write();
418
419        // Double-check after acquiring write lock
420        if let Some(&id) = label_to_id.get(label) {
421            return id;
422        }
423
424        let id = id_to_label.len() as u32;
425
426        let label: ArcStr = label.into();
427        label_to_id.insert(label.clone(), id);
428        id_to_label.push(label);
429
430        id
431    }
432
433    pub(super) fn get_or_create_edge_type_id(&self, edge_type: &str) -> u32 {
434        {
435            let type_to_id = self.edge_type_to_id.read();
436            if let Some(&id) = type_to_id.get(edge_type) {
437                return id;
438            }
439        }
440
441        let mut type_to_id = self.edge_type_to_id.write();
442        let mut id_to_type = self.id_to_edge_type.write();
443
444        // Double-check
445        if let Some(&id) = type_to_id.get(edge_type) {
446            return id;
447        }
448
449        let id = id_to_type.len() as u32;
450        let edge_type: ArcStr = edge_type.into();
451        type_to_id.insert(edge_type.clone(), id);
452        id_to_type.push(edge_type);
453
454        // Grow edge type live counts to match
455        let mut counts = self.edge_type_live_counts.write();
456        while counts.len() <= id as usize {
457            counts.push(0);
458        }
459
460        id
461    }
462
463    /// Increments the live edge count for a given edge type.
464    pub(super) fn increment_edge_type_count(&self, type_id: u32) {
465        let mut counts = self.edge_type_live_counts.write();
466        if counts.len() <= type_id as usize {
467            counts.resize(type_id as usize + 1, 0);
468        }
469        counts[type_id as usize] += 1;
470    }
471
472    /// Decrements the live edge count for a given edge type.
473    pub(super) fn decrement_edge_type_count(&self, type_id: u32) {
474        let mut counts = self.edge_type_live_counts.write();
475        if type_id < counts.len() as u32 {
476            counts[type_id as usize] -= 1;
477        }
478    }
479}
480
481impl Default for LpgStore {
482    fn default() -> Self {
483        Self::new()
484    }
485}