Skip to main content

grafeo_core/graph/lpg/store/
mod.rs

1//! The in-memory LPG graph store.
2//!
3//! This is where your nodes and edges actually live. Most users interact
4//! through [`GrafeoDB`](grafeo_engine::GrafeoDB), but algorithm implementers
5//! sometimes need the raw [`LpgStore`] for direct adjacency traversal.
6//!
7//! Key features:
8//! - MVCC versioning - concurrent readers don't block each other
9//! - Columnar properties with zone maps for fast filtering
10//! - Forward and backward adjacency indexes
11
12mod edge_ops;
13mod graph_store_impl;
14mod index;
15mod node_ops;
16mod property_ops;
17mod schema;
18mod search;
19mod statistics;
20mod traversal;
21mod versioning;
22
23#[cfg(test)]
24mod tests;
25
26use super::PropertyStorage;
27#[cfg(not(feature = "tiered-storage"))]
28use super::{EdgeRecord, NodeRecord};
29use crate::index::adjacency::ChunkedAdjacency;
30use crate::statistics::Statistics;
31use arcstr::ArcStr;
32use dashmap::DashMap;
33#[cfg(not(feature = "tiered-storage"))]
34use grafeo_common::mvcc::VersionChain;
35use grafeo_common::types::{EdgeId, EpochId, HashableValue, NodeId, PropertyKey, Value};
36use grafeo_common::utils::hash::{FxHashMap, FxHashSet};
37use parking_lot::RwLock;
38use std::cmp::Ordering as CmpOrdering;
39use std::sync::Arc;
40use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, Ordering};
41
42#[cfg(feature = "vector-index")]
43use crate::index::vector::HnswIndex;
44
45#[cfg(feature = "tiered-storage")]
46use crate::storage::EpochStore;
47#[cfg(feature = "tiered-storage")]
48use grafeo_common::memory::arena::ArenaAllocator;
49#[cfg(feature = "tiered-storage")]
50use grafeo_common::mvcc::VersionIndex;
51
52/// Compares two values for ordering (used for range checks).
53pub(super) fn compare_values_for_range(a: &Value, b: &Value) -> Option<CmpOrdering> {
54    match (a, b) {
55        (Value::Int64(a), Value::Int64(b)) => Some(a.cmp(b)),
56        (Value::Float64(a), Value::Float64(b)) => a.partial_cmp(b),
57        (Value::String(a), Value::String(b)) => Some(a.cmp(b)),
58        (Value::Bool(a), Value::Bool(b)) => Some(a.cmp(b)),
59        (Value::Timestamp(a), Value::Timestamp(b)) => Some(a.cmp(b)),
60        (Value::Date(a), Value::Date(b)) => Some(a.cmp(b)),
61        (Value::Time(a), Value::Time(b)) => Some(a.cmp(b)),
62        _ => None,
63    }
64}
65
66/// Checks if a value is within a range.
67pub(super) fn value_in_range(
68    value: &Value,
69    min: Option<&Value>,
70    max: Option<&Value>,
71    min_inclusive: bool,
72    max_inclusive: bool,
73) -> bool {
74    // Check lower bound
75    if let Some(min_val) = min {
76        match compare_values_for_range(value, min_val) {
77            Some(CmpOrdering::Less) => return false,
78            Some(CmpOrdering::Equal) if !min_inclusive => return false,
79            None => return false, // Can't compare
80            _ => {}
81        }
82    }
83
84    // Check upper bound
85    if let Some(max_val) = max {
86        match compare_values_for_range(value, max_val) {
87            Some(CmpOrdering::Greater) => return false,
88            Some(CmpOrdering::Equal) if !max_inclusive => return false,
89            None => return false,
90            _ => {}
91        }
92    }
93
94    true
95}
96
97/// Configuration for the LPG store.
98///
99/// The defaults work well for most cases. Tune `backward_edges` if you only
100/// traverse in one direction (saves memory), or adjust capacities if you know
101/// your graph size upfront (avoids reallocations).
102#[derive(Debug, Clone)]
103pub struct LpgStoreConfig {
104    /// Maintain backward adjacency for incoming edge queries. Turn off if
105    /// you only traverse outgoing edges - saves ~50% adjacency memory.
106    pub backward_edges: bool,
107    /// Initial capacity for nodes (avoids early reallocations).
108    pub initial_node_capacity: usize,
109    /// Initial capacity for edges (avoids early reallocations).
110    pub initial_edge_capacity: usize,
111}
112
113impl Default for LpgStoreConfig {
114    fn default() -> Self {
115        Self {
116            backward_edges: true,
117            initial_node_capacity: 1024,
118            initial_edge_capacity: 4096,
119        }
120    }
121}
122
123/// The core in-memory graph storage.
124///
125/// Everything lives here: nodes, edges, properties, adjacency indexes, and
126/// version chains for MVCC. Concurrent reads never block each other.
127///
128/// Most users should go through `GrafeoDB` (from the `grafeo_engine` crate) which
129/// adds transaction management and query execution. Use `LpgStore` directly
130/// when you need raw performance for algorithm implementations.
131///
132/// # Example
133///
134/// ```
135/// use grafeo_core::graph::lpg::LpgStore;
136/// use grafeo_core::graph::Direction;
137///
138/// let store = LpgStore::new();
139///
140/// // Create a small social network
141/// let alice = store.create_node(&["Person"]);
142/// let bob = store.create_node(&["Person"]);
143/// store.create_edge(alice, bob, "KNOWS");
144///
145/// // Traverse outgoing edges
146/// for neighbor in store.neighbors(alice, Direction::Outgoing) {
147///     println!("Alice knows node {:?}", neighbor);
148/// }
149/// ```
150///
151/// # Lock Ordering
152///
153/// `LpgStore` contains multiple `RwLock` fields that must be acquired in a
154/// consistent order to prevent deadlocks. Always acquire locks in this order:
155///
156/// ## Level 1 - Entity Storage (mutually exclusive via feature flag)
157/// 1. `nodes` / `node_versions`
158/// 2. `edges` / `edge_versions`
159///
160/// ## Level 2 - Catalogs (acquire as pairs when writing)
161/// 3. `label_to_id` + `id_to_label`
162/// 4. `edge_type_to_id` + `id_to_edge_type`
163///
164/// ## Level 3 - Indexes
165/// 5. `label_index`
166/// 6. `node_labels`
167/// 7. `property_indexes`
168///
169/// ## Level 4 - Statistics
170/// 8. `statistics`
171///
172/// ## Level 5 - Nested Locks (internal to other structs)
173/// 9. `PropertyStorage::columns` (via `node_properties`/`edge_properties`)
174/// 10. `ChunkedAdjacency::lists` (via `forward_adj`/`backward_adj`)
175///
176/// ## Rules
177/// - Catalog pairs must be acquired together when writing.
178/// - Never hold entity locks while acquiring catalog locks in a different scope.
179/// - Statistics lock is always last.
180/// - Read locks are generally safe, but avoid read-to-write upgrades.
181pub struct LpgStore {
182    /// Configuration.
183    #[allow(dead_code)]
184    pub(super) config: LpgStoreConfig,
185
186    /// Node records indexed by NodeId, with version chains for MVCC.
187    /// Used when `tiered-storage` feature is disabled.
188    /// Lock order: 1
189    #[cfg(not(feature = "tiered-storage"))]
190    pub(super) nodes: RwLock<FxHashMap<NodeId, VersionChain<NodeRecord>>>,
191
192    /// Edge records indexed by EdgeId, with version chains for MVCC.
193    /// Used when `tiered-storage` feature is disabled.
194    /// Lock order: 2
195    #[cfg(not(feature = "tiered-storage"))]
196    pub(super) edges: RwLock<FxHashMap<EdgeId, VersionChain<EdgeRecord>>>,
197
198    // === Tiered Storage Fields (feature-gated) ===
199    //
200    // Lock ordering for arena access:
201    //   version_lock (read/write) → arena read lock (via arena_allocator.arena())
202    //
203    // Rules:
204    // - Acquire arena read lock *after* version locks, never before.
205    // - Multiple threads may call arena.read_at() concurrently (shared refs only).
206    // - Never acquire arena write lock (alloc_new_chunk) while holding version locks.
207    // - freeze_epoch order: node_versions.read() → arena.read_at(),
208    //   then edge_versions.read() → arena.read_at().
209    /// Arena allocator for hot data storage.
210    /// Data is stored in per-epoch arenas for fast allocation and bulk deallocation.
211    #[cfg(feature = "tiered-storage")]
212    pub(super) arena_allocator: Arc<ArenaAllocator>,
213
214    /// Node version indexes - store metadata and arena offsets.
215    /// The actual NodeRecord data is stored in the arena.
216    /// Lock order: 1
217    #[cfg(feature = "tiered-storage")]
218    pub(super) node_versions: RwLock<FxHashMap<NodeId, VersionIndex>>,
219
220    /// Edge version indexes - store metadata and arena offsets.
221    /// The actual EdgeRecord data is stored in the arena.
222    /// Lock order: 2
223    #[cfg(feature = "tiered-storage")]
224    pub(super) edge_versions: RwLock<FxHashMap<EdgeId, VersionIndex>>,
225
226    /// Cold storage for frozen epochs.
227    /// Contains compressed epoch blocks for historical data.
228    #[cfg(feature = "tiered-storage")]
229    pub(super) epoch_store: Arc<EpochStore>,
230
231    /// Property storage for nodes.
232    pub(super) node_properties: PropertyStorage<NodeId>,
233
234    /// Property storage for edges.
235    pub(super) edge_properties: PropertyStorage<EdgeId>,
236
237    /// Label name to ID mapping.
238    /// Lock order: 3 (acquire with id_to_label)
239    pub(super) label_to_id: RwLock<FxHashMap<ArcStr, u32>>,
240
241    /// Label ID to name mapping.
242    /// Lock order: 3 (acquire with label_to_id)
243    pub(super) id_to_label: RwLock<Vec<ArcStr>>,
244
245    /// Edge type name to ID mapping.
246    /// Lock order: 4 (acquire with id_to_edge_type)
247    pub(super) edge_type_to_id: RwLock<FxHashMap<ArcStr, u32>>,
248
249    /// Edge type ID to name mapping.
250    /// Lock order: 4 (acquire with edge_type_to_id)
251    pub(super) id_to_edge_type: RwLock<Vec<ArcStr>>,
252
253    /// Forward adjacency lists (outgoing edges).
254    pub(super) forward_adj: ChunkedAdjacency,
255
256    /// Backward adjacency lists (incoming edges).
257    /// Only populated if config.backward_edges is true.
258    pub(super) backward_adj: Option<ChunkedAdjacency>,
259
260    /// Label index: label_id -> set of node IDs.
261    /// Lock order: 5
262    pub(super) label_index: RwLock<Vec<FxHashMap<NodeId, ()>>>,
263
264    /// Node labels: node_id -> set of label IDs.
265    /// Reverse mapping to efficiently get labels for a node.
266    /// Lock order: 6
267    pub(super) node_labels: RwLock<FxHashMap<NodeId, FxHashSet<u32>>>,
268
269    /// Property indexes: property_key -> (value -> set of node IDs).
270    ///
271    /// When a property is indexed, lookups by value are O(1) instead of O(n).
272    /// Use [`create_property_index`] to enable indexing for a property.
273    /// Lock order: 7
274    pub(super) property_indexes:
275        RwLock<FxHashMap<PropertyKey, DashMap<HashableValue, FxHashSet<NodeId>>>>,
276
277    /// Vector indexes: "label:property" -> HNSW index.
278    ///
279    /// Created via [`GrafeoDB::create_vector_index`](grafeo_engine::GrafeoDB::create_vector_index).
280    /// Lock order: 7 (same level as property_indexes, disjoint keys)
281    #[cfg(feature = "vector-index")]
282    pub(super) vector_indexes: RwLock<FxHashMap<String, Arc<HnswIndex>>>,
283
284    /// Text indexes: "label:property" -> inverted index with BM25 scoring.
285    ///
286    /// Created via [`GrafeoDB::create_text_index`](grafeo_engine::GrafeoDB::create_text_index).
287    /// Lock order: 7 (same level as property_indexes, disjoint keys)
288    #[cfg(feature = "text-index")]
289    pub(super) text_indexes:
290        RwLock<FxHashMap<String, Arc<RwLock<crate::index::text::InvertedIndex>>>>,
291
292    /// Next node ID.
293    pub(super) next_node_id: AtomicU64,
294
295    /// Next edge ID.
296    pub(super) next_edge_id: AtomicU64,
297
298    /// Current epoch.
299    pub(super) current_epoch: AtomicU64,
300
301    /// Live (non-deleted) node count, maintained incrementally.
302    /// Avoids O(n) full scan in `compute_statistics()`.
303    pub(super) live_node_count: AtomicI64,
304
305    /// Live (non-deleted) edge count, maintained incrementally.
306    /// Avoids O(m) full scan in `compute_statistics()`.
307    pub(super) live_edge_count: AtomicI64,
308
309    /// Per-edge-type live counts, indexed by edge_type_id.
310    /// Avoids O(m) edge scan in `compute_statistics()`.
311    /// Lock order: 8 (same level as statistics)
312    pub(super) edge_type_live_counts: RwLock<Vec<i64>>,
313
314    /// Statistics for cost-based optimization.
315    /// Lock order: 8 (always last)
316    pub(super) statistics: RwLock<Arc<Statistics>>,
317
318    /// Whether statistics need full recomputation (e.g., after rollback).
319    pub(super) needs_stats_recompute: AtomicBool,
320
321    /// Named graphs, each an independent `LpgStore` partition.
322    /// Zero overhead for single-graph databases (empty HashMap).
323    /// Lock order: 9 (after statistics)
324    named_graphs: RwLock<FxHashMap<String, Arc<LpgStore>>>,
325}
326
327impl LpgStore {
328    /// Creates a new LPG store with default configuration.
329    #[must_use]
330    pub fn new() -> Self {
331        Self::with_config(LpgStoreConfig::default())
332    }
333
334    /// Creates a new LPG store with custom configuration.
335    #[must_use]
336    pub fn with_config(config: LpgStoreConfig) -> Self {
337        let backward_adj = if config.backward_edges {
338            Some(ChunkedAdjacency::new())
339        } else {
340            None
341        };
342
343        Self {
344            #[cfg(not(feature = "tiered-storage"))]
345            nodes: RwLock::new(FxHashMap::default()),
346            #[cfg(not(feature = "tiered-storage"))]
347            edges: RwLock::new(FxHashMap::default()),
348            #[cfg(feature = "tiered-storage")]
349            arena_allocator: Arc::new(ArenaAllocator::new()),
350            #[cfg(feature = "tiered-storage")]
351            node_versions: RwLock::new(FxHashMap::default()),
352            #[cfg(feature = "tiered-storage")]
353            edge_versions: RwLock::new(FxHashMap::default()),
354            #[cfg(feature = "tiered-storage")]
355            epoch_store: Arc::new(EpochStore::new()),
356            node_properties: PropertyStorage::new(),
357            edge_properties: PropertyStorage::new(),
358            label_to_id: RwLock::new(FxHashMap::default()),
359            id_to_label: RwLock::new(Vec::new()),
360            edge_type_to_id: RwLock::new(FxHashMap::default()),
361            id_to_edge_type: RwLock::new(Vec::new()),
362            forward_adj: ChunkedAdjacency::new(),
363            backward_adj,
364            label_index: RwLock::new(Vec::new()),
365            node_labels: RwLock::new(FxHashMap::default()),
366            property_indexes: RwLock::new(FxHashMap::default()),
367            #[cfg(feature = "vector-index")]
368            vector_indexes: RwLock::new(FxHashMap::default()),
369            #[cfg(feature = "text-index")]
370            text_indexes: RwLock::new(FxHashMap::default()),
371            next_node_id: AtomicU64::new(0),
372            next_edge_id: AtomicU64::new(0),
373            current_epoch: AtomicU64::new(0),
374            live_node_count: AtomicI64::new(0),
375            live_edge_count: AtomicI64::new(0),
376            edge_type_live_counts: RwLock::new(Vec::new()),
377            statistics: RwLock::new(Arc::new(Statistics::new())),
378            needs_stats_recompute: AtomicBool::new(false),
379            named_graphs: RwLock::new(FxHashMap::default()),
380            config,
381        }
382    }
383
384    /// Returns the current epoch.
385    #[must_use]
386    pub fn current_epoch(&self) -> EpochId {
387        EpochId::new(self.current_epoch.load(Ordering::Acquire))
388    }
389
390    /// Creates a new epoch.
391    #[doc(hidden)]
392    pub fn new_epoch(&self) -> EpochId {
393        let id = self.current_epoch.fetch_add(1, Ordering::AcqRel) + 1;
394        EpochId::new(id)
395    }
396
397    /// Syncs the store epoch to match an external epoch counter.
398    ///
399    /// Used by the transaction manager to keep the store's epoch in step
400    /// after a transaction commit advances the global epoch.
401    #[doc(hidden)]
402    pub fn sync_epoch(&self, epoch: EpochId) {
403        self.current_epoch
404            .fetch_max(epoch.as_u64(), Ordering::AcqRel);
405    }
406
407    /// Removes all data from the store, resetting it to an empty state.
408    ///
409    /// Acquires locks in the documented ordering to prevent deadlocks.
410    /// After clearing, the store behaves as if freshly constructed.
411    pub fn clear(&self) {
412        // Level 1: Entity storage
413        #[cfg(not(feature = "tiered-storage"))]
414        {
415            self.nodes.write().clear();
416            self.edges.write().clear();
417        }
418        #[cfg(feature = "tiered-storage")]
419        {
420            self.node_versions.write().clear();
421            self.edge_versions.write().clear();
422            // Arena allocator chunks are leaked; epochs are cleared via epoch_store.
423        }
424
425        // Level 2: Catalogs (acquire as pairs)
426        {
427            self.label_to_id.write().clear();
428            self.id_to_label.write().clear();
429        }
430        {
431            self.edge_type_to_id.write().clear();
432            self.id_to_edge_type.write().clear();
433        }
434
435        // Level 3: Indexes
436        self.label_index.write().clear();
437        self.node_labels.write().clear();
438        self.property_indexes.write().clear();
439        #[cfg(feature = "vector-index")]
440        self.vector_indexes.write().clear();
441        #[cfg(feature = "text-index")]
442        self.text_indexes.write().clear();
443
444        // Nested: Properties and adjacency
445        self.node_properties.clear();
446        self.edge_properties.clear();
447        self.forward_adj.clear();
448        if let Some(ref backward) = self.backward_adj {
449            backward.clear();
450        }
451
452        // Atomics: ID counters
453        self.next_node_id.store(0, Ordering::Release);
454        self.next_edge_id.store(0, Ordering::Release);
455        self.current_epoch.store(0, Ordering::Release);
456
457        // Level 4: Statistics
458        self.live_node_count.store(0, Ordering::Release);
459        self.live_edge_count.store(0, Ordering::Release);
460        self.edge_type_live_counts.write().clear();
461        *self.statistics.write() = Arc::new(Statistics::new());
462        self.needs_stats_recompute.store(false, Ordering::Release);
463    }
464
465    /// Returns whether backward adjacency (incoming edge index) is available.
466    ///
467    /// When backward adjacency is enabled (the default), bidirectional search
468    /// algorithms can traverse from the target toward the source.
469    #[must_use]
470    pub fn has_backward_adjacency(&self) -> bool {
471        self.backward_adj.is_some()
472    }
473
474    // === Named Graph Management ===
475
476    /// Returns a named graph by name, or `None` if it does not exist.
477    #[must_use]
478    pub fn graph(&self, name: &str) -> Option<Arc<LpgStore>> {
479        self.named_graphs.read().get(name).cloned()
480    }
481
482    /// Returns a named graph, creating it if it does not exist.
483    pub fn graph_or_create(&self, name: &str) -> Arc<LpgStore> {
484        {
485            let graphs = self.named_graphs.read();
486            if let Some(g) = graphs.get(name) {
487                return Arc::clone(g);
488            }
489        }
490        let mut graphs = self.named_graphs.write();
491        // Double-check after acquiring write lock
492        graphs
493            .entry(name.to_string())
494            .or_insert_with(|| Arc::new(LpgStore::new()))
495            .clone()
496    }
497
498    /// Creates a named graph. Returns `false` if it already exists.
499    pub fn create_graph(&self, name: &str) -> bool {
500        let mut graphs = self.named_graphs.write();
501        if graphs.contains_key(name) {
502            return false;
503        }
504        graphs.insert(name.to_string(), Arc::new(LpgStore::new()));
505        true
506    }
507
508    /// Drops a named graph. Returns `false` if it did not exist.
509    pub fn drop_graph(&self, name: &str) -> bool {
510        self.named_graphs.write().remove(name).is_some()
511    }
512
513    /// Returns all named graph names.
514    #[must_use]
515    pub fn graph_names(&self) -> Vec<String> {
516        self.named_graphs.read().keys().cloned().collect()
517    }
518
519    /// Returns the number of named graphs.
520    #[must_use]
521    pub fn graph_count(&self) -> usize {
522        self.named_graphs.read().len()
523    }
524
525    /// Clears a specific graph, or the default graph if `name` is `None`.
526    pub fn clear_graph(&self, name: Option<&str>) {
527        match name {
528            Some(n) => {
529                if let Some(g) = self.named_graphs.read().get(n) {
530                    g.clear();
531                }
532            }
533            None => self.clear(),
534        }
535    }
536
537    /// Copies all data from the source graph to the destination graph.
538    /// Creates the destination graph if it does not exist.
539    pub fn copy_graph(&self, source: Option<&str>, dest: Option<&str>) {
540        let _src = match source {
541            Some(n) => self.graph(n),
542            None => None, // default graph
543        };
544        let _dest_graph = match dest {
545            Some(n) => Some(self.graph_or_create(n)),
546            None => None, // copy into default graph
547        };
548        // Full graph copy is complex (requires iterating all entities).
549        // For now, this creates the destination graph structure.
550        // Full entity-level copy will be implemented when needed.
551    }
552
553    // === Internal Helpers ===
554
555    pub(super) fn get_or_create_label_id(&self, label: &str) -> u32 {
556        {
557            let label_to_id = self.label_to_id.read();
558            if let Some(&id) = label_to_id.get(label) {
559                return id;
560            }
561        }
562
563        let mut label_to_id = self.label_to_id.write();
564        let mut id_to_label = self.id_to_label.write();
565
566        // Double-check after acquiring write lock
567        if let Some(&id) = label_to_id.get(label) {
568            return id;
569        }
570
571        let id = id_to_label.len() as u32;
572
573        let label: ArcStr = label.into();
574        label_to_id.insert(label.clone(), id);
575        id_to_label.push(label);
576
577        id
578    }
579
580    pub(super) fn get_or_create_edge_type_id(&self, edge_type: &str) -> u32 {
581        {
582            let type_to_id = self.edge_type_to_id.read();
583            if let Some(&id) = type_to_id.get(edge_type) {
584                return id;
585            }
586        }
587
588        let mut type_to_id = self.edge_type_to_id.write();
589        let mut id_to_type = self.id_to_edge_type.write();
590
591        // Double-check
592        if let Some(&id) = type_to_id.get(edge_type) {
593            return id;
594        }
595
596        let id = id_to_type.len() as u32;
597        let edge_type: ArcStr = edge_type.into();
598        type_to_id.insert(edge_type.clone(), id);
599        id_to_type.push(edge_type);
600
601        // Grow edge type live counts to match
602        let mut counts = self.edge_type_live_counts.write();
603        while counts.len() <= id as usize {
604            counts.push(0);
605        }
606
607        id
608    }
609
610    /// Increments the live edge count for a given edge type.
611    pub(super) fn increment_edge_type_count(&self, type_id: u32) {
612        let mut counts = self.edge_type_live_counts.write();
613        if counts.len() <= type_id as usize {
614            counts.resize(type_id as usize + 1, 0);
615        }
616        counts[type_id as usize] += 1;
617    }
618
619    /// Decrements the live edge count for a given edge type.
620    pub(super) fn decrement_edge_type_count(&self, type_id: u32) {
621        let mut counts = self.edge_type_live_counts.write();
622        if type_id < counts.len() as u32 {
623            counts[type_id as usize] -= 1;
624        }
625    }
626}
627
628impl Default for LpgStore {
629    fn default() -> Self {
630        Self::new()
631    }
632}