Skip to main content

grafeo_core/graph/lpg/store/
mod.rs

1//! The in-memory LPG graph store.
2//!
3//! This is where your nodes and edges actually live. Most users interact
4//! through [`GrafeoDB`](grafeo_engine::GrafeoDB), but algorithm implementers
5//! sometimes need the raw [`LpgStore`] for direct adjacency traversal.
6//!
7//! Key features:
8//! - MVCC versioning - concurrent readers don't block each other
9//! - Columnar properties with zone maps for fast filtering
10//! - Forward and backward adjacency indexes
11
12mod edge_ops;
13mod graph_store_impl;
14mod index;
15mod node_ops;
16mod property_ops;
17mod schema;
18mod search;
19mod statistics;
20mod traversal;
21mod versioning;
22
23#[cfg(test)]
24mod tests;
25
26use super::PropertyStorage;
27#[cfg(not(feature = "tiered-storage"))]
28use super::{EdgeRecord, NodeRecord};
29use crate::index::adjacency::ChunkedAdjacency;
30use crate::statistics::Statistics;
31use arcstr::ArcStr;
32use dashmap::DashMap;
33#[cfg(not(feature = "tiered-storage"))]
34use grafeo_common::mvcc::VersionChain;
35use grafeo_common::types::{
36    EdgeId, EpochId, HashableValue, NodeId, PropertyKey, TransactionId, Value,
37};
38use grafeo_common::utils::hash::{FxHashMap, FxHashSet};
39use parking_lot::RwLock;
40use std::cmp::Ordering as CmpOrdering;
41use std::sync::Arc;
42use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, Ordering};
43
44#[cfg(feature = "vector-index")]
45use crate::index::vector::HnswIndex;
46
47#[cfg(feature = "tiered-storage")]
48use crate::storage::EpochStore;
49use grafeo_common::memory::arena::AllocError;
50#[cfg(feature = "tiered-storage")]
51use grafeo_common::memory::arena::ArenaAllocator;
52#[cfg(feature = "tiered-storage")]
53use grafeo_common::mvcc::VersionIndex;
54
55/// Undo entry for a property mutation within a transaction.
56///
57/// Captures the previous state of a property so it can be restored on rollback.
58#[derive(Debug, Clone)]
59pub enum PropertyUndoEntry {
60    /// A node property was changed or added.
61    NodeProperty {
62        /// The node that was modified.
63        node_id: NodeId,
64        /// The property key that was set or removed.
65        key: PropertyKey,
66        /// The previous value, or `None` if the property did not exist before.
67        old_value: Option<Value>,
68    },
69    /// An edge property was changed or added.
70    EdgeProperty {
71        /// The edge that was modified.
72        edge_id: EdgeId,
73        /// The property key that was set or removed.
74        key: PropertyKey,
75        /// The previous value, or `None` if the property did not exist before.
76        old_value: Option<Value>,
77    },
78    /// A label was added to a node.
79    LabelAdded {
80        /// The node that had a label added.
81        node_id: NodeId,
82        /// The label string that was added.
83        label: String,
84    },
85    /// A label was removed from a node.
86    LabelRemoved {
87        /// The node that had a label removed.
88        node_id: NodeId,
89        /// The label string that was removed.
90        label: String,
91    },
92}
93
94/// Compares two values for ordering (used for range checks).
95pub(super) fn compare_values_for_range(a: &Value, b: &Value) -> Option<CmpOrdering> {
96    match (a, b) {
97        (Value::Int64(a), Value::Int64(b)) => Some(a.cmp(b)),
98        (Value::Float64(a), Value::Float64(b)) => a.partial_cmp(b),
99        (Value::String(a), Value::String(b)) => Some(a.cmp(b)),
100        (Value::Bool(a), Value::Bool(b)) => Some(a.cmp(b)),
101        (Value::Timestamp(a), Value::Timestamp(b)) => Some(a.cmp(b)),
102        (Value::Date(a), Value::Date(b)) => Some(a.cmp(b)),
103        (Value::Time(a), Value::Time(b)) => Some(a.cmp(b)),
104        _ => None,
105    }
106}
107
108/// Checks if a value is within a range.
109pub(super) fn value_in_range(
110    value: &Value,
111    min: Option<&Value>,
112    max: Option<&Value>,
113    min_inclusive: bool,
114    max_inclusive: bool,
115) -> bool {
116    // Check lower bound
117    if let Some(min_val) = min {
118        match compare_values_for_range(value, min_val) {
119            Some(CmpOrdering::Less) => return false,
120            Some(CmpOrdering::Equal) if !min_inclusive => return false,
121            None => return false, // Can't compare
122            _ => {}
123        }
124    }
125
126    // Check upper bound
127    if let Some(max_val) = max {
128        match compare_values_for_range(value, max_val) {
129            Some(CmpOrdering::Greater) => return false,
130            Some(CmpOrdering::Equal) if !max_inclusive => return false,
131            None => return false,
132            _ => {}
133        }
134    }
135
136    true
137}
138
139/// Configuration for the LPG store.
140///
141/// The defaults work well for most cases. Tune `backward_edges` if you only
142/// traverse in one direction (saves memory), or adjust capacities if you know
143/// your graph size upfront (avoids reallocations).
144#[derive(Debug, Clone)]
145pub struct LpgStoreConfig {
146    /// Maintain backward adjacency for incoming edge queries. Turn off if
147    /// you only traverse outgoing edges - saves ~50% adjacency memory.
148    pub backward_edges: bool,
149    /// Initial capacity for nodes (avoids early reallocations).
150    pub initial_node_capacity: usize,
151    /// Initial capacity for edges (avoids early reallocations).
152    pub initial_edge_capacity: usize,
153}
154
155impl Default for LpgStoreConfig {
156    fn default() -> Self {
157        Self {
158            backward_edges: true,
159            initial_node_capacity: 1024,
160            initial_edge_capacity: 4096,
161        }
162    }
163}
164
165/// The core in-memory graph storage.
166///
167/// Everything lives here: nodes, edges, properties, adjacency indexes, and
168/// version chains for MVCC. Concurrent reads never block each other.
169///
170/// Most users should go through `GrafeoDB` (from the `grafeo_engine` crate) which
171/// adds transaction management and query execution. Use `LpgStore` directly
172/// when you need raw performance for algorithm implementations.
173///
174/// # Example
175///
176/// ```
177/// use grafeo_core::graph::lpg::LpgStore;
178/// use grafeo_core::graph::Direction;
179///
180/// let store = LpgStore::new().expect("arena allocation");
181///
182/// // Create a small social network
183/// let alix = store.create_node(&["Person"]);
184/// let gus = store.create_node(&["Person"]);
185/// store.create_edge(alix, gus, "KNOWS");
186///
187/// // Traverse outgoing edges
188/// for neighbor in store.neighbors(alix, Direction::Outgoing) {
189///     println!("Alix knows node {:?}", neighbor);
190/// }
191/// ```
192///
193/// # Lock Ordering
194///
195/// `LpgStore` contains multiple `RwLock` fields that must be acquired in a
196/// consistent order to prevent deadlocks. Always acquire locks in this order:
197///
198/// ## Level 1 - Entity Storage (mutually exclusive via feature flag)
199/// 1. `nodes` / `node_versions`
200/// 2. `edges` / `edge_versions`
201///
202/// ## Level 2 - Catalogs (acquire as pairs when writing)
203/// 3. `label_to_id` + `id_to_label`
204/// 4. `edge_type_to_id` + `id_to_edge_type`
205///
206/// ## Level 3 - Indexes
207/// 5. `label_index`
208/// 6. `node_labels`
209/// 7. `property_indexes`
210///
211/// ## Level 4 - Statistics
212/// 8. `statistics`
213///
214/// ## Level 5 - Nested Locks (internal to other structs)
215/// 9. `PropertyStorage::columns` (via `node_properties`/`edge_properties`)
216/// 10. `ChunkedAdjacency::lists` (via `forward_adj`/`backward_adj`)
217///
218/// ## Rules
219/// - Catalog pairs must be acquired together when writing.
220/// - Never hold entity locks while acquiring catalog locks in a different scope.
221/// - Statistics lock is always last.
222/// - Read locks are generally safe, but avoid read-to-write upgrades.
223pub struct LpgStore {
224    /// Configuration.
225    #[allow(dead_code)]
226    pub(super) config: LpgStoreConfig,
227
228    /// Node records indexed by NodeId, with version chains for MVCC.
229    /// Used when `tiered-storage` feature is disabled.
230    /// Lock order: 1
231    #[cfg(not(feature = "tiered-storage"))]
232    pub(super) nodes: RwLock<FxHashMap<NodeId, VersionChain<NodeRecord>>>,
233
234    /// Edge records indexed by EdgeId, with version chains for MVCC.
235    /// Used when `tiered-storage` feature is disabled.
236    /// Lock order: 2
237    #[cfg(not(feature = "tiered-storage"))]
238    pub(super) edges: RwLock<FxHashMap<EdgeId, VersionChain<EdgeRecord>>>,
239
240    // === Tiered Storage Fields (feature-gated) ===
241    //
242    // Lock ordering for arena access:
243    //   version_lock (read/write) → arena read lock (via arena_allocator.arena())
244    //
245    // Rules:
246    // - Acquire arena read lock *after* version locks, never before.
247    // - Multiple threads may call arena.read_at() concurrently (shared refs only).
248    // - Never acquire arena write lock (alloc_new_chunk) while holding version locks.
249    // - freeze_epoch order: node_versions.read() → arena.read_at(),
250    //   then edge_versions.read() → arena.read_at().
251    /// Arena allocator for hot data storage.
252    /// Data is stored in per-epoch arenas for fast allocation and bulk deallocation.
253    #[cfg(feature = "tiered-storage")]
254    pub(super) arena_allocator: Arc<ArenaAllocator>,
255
256    /// Node version indexes - store metadata and arena offsets.
257    /// The actual NodeRecord data is stored in the arena.
258    /// Lock order: 1
259    #[cfg(feature = "tiered-storage")]
260    pub(super) node_versions: RwLock<FxHashMap<NodeId, VersionIndex>>,
261
262    /// Edge version indexes - store metadata and arena offsets.
263    /// The actual EdgeRecord data is stored in the arena.
264    /// Lock order: 2
265    #[cfg(feature = "tiered-storage")]
266    pub(super) edge_versions: RwLock<FxHashMap<EdgeId, VersionIndex>>,
267
268    /// Cold storage for frozen epochs.
269    /// Contains compressed epoch blocks for historical data.
270    #[cfg(feature = "tiered-storage")]
271    pub(super) epoch_store: Arc<EpochStore>,
272
273    /// Property storage for nodes.
274    pub(super) node_properties: PropertyStorage<NodeId>,
275
276    /// Property storage for edges.
277    pub(super) edge_properties: PropertyStorage<EdgeId>,
278
279    /// Label name to ID mapping.
280    /// Lock order: 3 (acquire with id_to_label)
281    pub(super) label_to_id: RwLock<FxHashMap<ArcStr, u32>>,
282
283    /// Label ID to name mapping.
284    /// Lock order: 3 (acquire with label_to_id)
285    pub(super) id_to_label: RwLock<Vec<ArcStr>>,
286
287    /// Edge type name to ID mapping.
288    /// Lock order: 4 (acquire with id_to_edge_type)
289    pub(super) edge_type_to_id: RwLock<FxHashMap<ArcStr, u32>>,
290
291    /// Edge type ID to name mapping.
292    /// Lock order: 4 (acquire with edge_type_to_id)
293    pub(super) id_to_edge_type: RwLock<Vec<ArcStr>>,
294
295    /// Forward adjacency lists (outgoing edges).
296    pub(super) forward_adj: ChunkedAdjacency,
297
298    /// Backward adjacency lists (incoming edges).
299    /// Only populated if config.backward_edges is true.
300    pub(super) backward_adj: Option<ChunkedAdjacency>,
301
302    /// Label index: label_id -> set of node IDs.
303    /// Lock order: 5
304    pub(super) label_index: RwLock<Vec<FxHashMap<NodeId, ()>>>,
305
306    /// Node labels: node_id -> set of label IDs.
307    /// Reverse mapping to efficiently get labels for a node.
308    /// Lock order: 6
309    pub(super) node_labels: RwLock<FxHashMap<NodeId, FxHashSet<u32>>>,
310
311    /// Property indexes: property_key -> (value -> set of node IDs).
312    ///
313    /// When a property is indexed, lookups by value are O(1) instead of O(n).
314    /// Use [`create_property_index`] to enable indexing for a property.
315    /// Lock order: 7
316    pub(super) property_indexes:
317        RwLock<FxHashMap<PropertyKey, DashMap<HashableValue, FxHashSet<NodeId>>>>,
318
319    /// Vector indexes: "label:property" -> HNSW index.
320    ///
321    /// Created via [`GrafeoDB::create_vector_index`](grafeo_engine::GrafeoDB::create_vector_index).
322    /// Lock order: 7 (same level as property_indexes, disjoint keys)
323    #[cfg(feature = "vector-index")]
324    pub(super) vector_indexes: RwLock<FxHashMap<String, Arc<HnswIndex>>>,
325
326    /// Text indexes: "label:property" -> inverted index with BM25 scoring.
327    ///
328    /// Created via [`GrafeoDB::create_text_index`](grafeo_engine::GrafeoDB::create_text_index).
329    /// Lock order: 7 (same level as property_indexes, disjoint keys)
330    #[cfg(feature = "text-index")]
331    pub(super) text_indexes:
332        RwLock<FxHashMap<String, Arc<RwLock<crate::index::text::InvertedIndex>>>>,
333
334    /// Next node ID.
335    pub(super) next_node_id: AtomicU64,
336
337    /// Next edge ID.
338    pub(super) next_edge_id: AtomicU64,
339
340    /// Current epoch.
341    pub(super) current_epoch: AtomicU64,
342
343    /// Live (non-deleted) node count, maintained incrementally.
344    /// Avoids O(n) full scan in `compute_statistics()`.
345    pub(super) live_node_count: AtomicI64,
346
347    /// Live (non-deleted) edge count, maintained incrementally.
348    /// Avoids O(m) full scan in `compute_statistics()`.
349    pub(super) live_edge_count: AtomicI64,
350
351    /// Per-edge-type live counts, indexed by edge_type_id.
352    /// Avoids O(m) edge scan in `compute_statistics()`.
353    /// Lock order: 8 (same level as statistics)
354    pub(super) edge_type_live_counts: RwLock<Vec<i64>>,
355
356    /// Statistics for cost-based optimization.
357    /// Lock order: 8 (always last)
358    pub(super) statistics: RwLock<Arc<Statistics>>,
359
360    /// Whether statistics need full recomputation (e.g., after rollback).
361    pub(super) needs_stats_recompute: AtomicBool,
362
363    /// Named graphs, each an independent `LpgStore` partition.
364    /// Zero overhead for single-graph databases (empty HashMap).
365    /// Lock order: 9 (after statistics)
366    named_graphs: RwLock<FxHashMap<String, Arc<LpgStore>>>,
367
368    /// Undo log for property mutations within transactions.
369    ///
370    /// Maps transaction IDs to a list of undo entries that capture the
371    /// previous property values. On rollback, entries are replayed in
372    /// reverse order to restore properties. On commit, the entries are
373    /// simply discarded.
374    /// Lock order: 10 (after named_graphs, independent of other locks)
375    property_undo_log: RwLock<FxHashMap<TransactionId, Vec<PropertyUndoEntry>>>,
376}
377
378impl LpgStore {
379    /// Creates a new LPG store with default configuration.
380    ///
381    /// # Errors
382    ///
383    /// Returns [`AllocError`] if the arena allocator cannot be initialized
384    /// (only possible with the `tiered-storage` feature).
385    pub fn new() -> Result<Self, AllocError> {
386        Self::with_config(LpgStoreConfig::default())
387    }
388
389    /// Creates a new LPG store with custom configuration.
390    ///
391    /// # Errors
392    ///
393    /// Returns [`AllocError`] if the arena allocator cannot be initialized
394    /// (only possible with the `tiered-storage` feature).
395    pub fn with_config(config: LpgStoreConfig) -> Result<Self, AllocError> {
396        let backward_adj = if config.backward_edges {
397            Some(ChunkedAdjacency::new())
398        } else {
399            None
400        };
401
402        Ok(Self {
403            #[cfg(not(feature = "tiered-storage"))]
404            nodes: RwLock::new(FxHashMap::default()),
405            #[cfg(not(feature = "tiered-storage"))]
406            edges: RwLock::new(FxHashMap::default()),
407            #[cfg(feature = "tiered-storage")]
408            arena_allocator: Arc::new(ArenaAllocator::new()?),
409            #[cfg(feature = "tiered-storage")]
410            node_versions: RwLock::new(FxHashMap::default()),
411            #[cfg(feature = "tiered-storage")]
412            edge_versions: RwLock::new(FxHashMap::default()),
413            #[cfg(feature = "tiered-storage")]
414            epoch_store: Arc::new(EpochStore::new()),
415            node_properties: PropertyStorage::new(),
416            edge_properties: PropertyStorage::new(),
417            label_to_id: RwLock::new(FxHashMap::default()),
418            id_to_label: RwLock::new(Vec::new()),
419            edge_type_to_id: RwLock::new(FxHashMap::default()),
420            id_to_edge_type: RwLock::new(Vec::new()),
421            forward_adj: ChunkedAdjacency::new(),
422            backward_adj,
423            label_index: RwLock::new(Vec::new()),
424            node_labels: RwLock::new(FxHashMap::default()),
425            property_indexes: RwLock::new(FxHashMap::default()),
426            #[cfg(feature = "vector-index")]
427            vector_indexes: RwLock::new(FxHashMap::default()),
428            #[cfg(feature = "text-index")]
429            text_indexes: RwLock::new(FxHashMap::default()),
430            next_node_id: AtomicU64::new(0),
431            next_edge_id: AtomicU64::new(0),
432            current_epoch: AtomicU64::new(0),
433            live_node_count: AtomicI64::new(0),
434            live_edge_count: AtomicI64::new(0),
435            edge_type_live_counts: RwLock::new(Vec::new()),
436            statistics: RwLock::new(Arc::new(Statistics::new())),
437            needs_stats_recompute: AtomicBool::new(false),
438            named_graphs: RwLock::new(FxHashMap::default()),
439            property_undo_log: RwLock::new(FxHashMap::default()),
440            config,
441        })
442    }
443
444    /// Returns the current epoch.
445    #[must_use]
446    pub fn current_epoch(&self) -> EpochId {
447        EpochId::new(self.current_epoch.load(Ordering::Acquire))
448    }
449
450    /// Creates a new epoch.
451    #[doc(hidden)]
452    pub fn new_epoch(&self) -> EpochId {
453        let id = self.current_epoch.fetch_add(1, Ordering::AcqRel) + 1;
454        EpochId::new(id)
455    }
456
457    /// Syncs the store epoch to match an external epoch counter.
458    ///
459    /// Used by the transaction manager to keep the store's epoch in step
460    /// after a transaction commit advances the global epoch.
461    #[doc(hidden)]
462    pub fn sync_epoch(&self, epoch: EpochId) {
463        self.current_epoch
464            .fetch_max(epoch.as_u64(), Ordering::AcqRel);
465    }
466
467    /// Removes all data from the store, resetting it to an empty state.
468    ///
469    /// Acquires locks in the documented ordering to prevent deadlocks.
470    /// After clearing, the store behaves as if freshly constructed.
471    pub fn clear(&self) {
472        // Level 1: Entity storage
473        #[cfg(not(feature = "tiered-storage"))]
474        {
475            self.nodes.write().clear();
476            self.edges.write().clear();
477        }
478        #[cfg(feature = "tiered-storage")]
479        {
480            self.node_versions.write().clear();
481            self.edge_versions.write().clear();
482            // Arena allocator chunks are leaked; epochs are cleared via epoch_store.
483        }
484
485        // Level 2: Catalogs (acquire as pairs)
486        {
487            self.label_to_id.write().clear();
488            self.id_to_label.write().clear();
489        }
490        {
491            self.edge_type_to_id.write().clear();
492            self.id_to_edge_type.write().clear();
493        }
494
495        // Level 3: Indexes
496        self.label_index.write().clear();
497        self.node_labels.write().clear();
498        self.property_indexes.write().clear();
499        #[cfg(feature = "vector-index")]
500        self.vector_indexes.write().clear();
501        #[cfg(feature = "text-index")]
502        self.text_indexes.write().clear();
503
504        // Nested: Properties and adjacency
505        self.node_properties.clear();
506        self.edge_properties.clear();
507        self.forward_adj.clear();
508        if let Some(ref backward) = self.backward_adj {
509            backward.clear();
510        }
511
512        // Atomics: ID counters
513        self.next_node_id.store(0, Ordering::Release);
514        self.next_edge_id.store(0, Ordering::Release);
515        self.current_epoch.store(0, Ordering::Release);
516
517        // Level 4: Statistics
518        self.live_node_count.store(0, Ordering::Release);
519        self.live_edge_count.store(0, Ordering::Release);
520        self.edge_type_live_counts.write().clear();
521        *self.statistics.write() = Arc::new(Statistics::new());
522        self.needs_stats_recompute.store(false, Ordering::Release);
523
524        // Level 5: Undo log
525        self.property_undo_log.write().clear();
526    }
527
528    /// Returns whether backward adjacency (incoming edge index) is available.
529    ///
530    /// When backward adjacency is enabled (the default), bidirectional search
531    /// algorithms can traverse from the target toward the source.
532    #[must_use]
533    pub fn has_backward_adjacency(&self) -> bool {
534        self.backward_adj.is_some()
535    }
536
537    // === Named Graph Management ===
538
539    /// Returns a named graph by name, or `None` if it does not exist.
540    #[must_use]
541    pub fn graph(&self, name: &str) -> Option<Arc<LpgStore>> {
542        self.named_graphs.read().get(name).cloned()
543    }
544
545    /// Returns a named graph, creating it if it does not exist.
546    ///
547    /// # Errors
548    ///
549    /// Returns [`AllocError`] if a new store cannot be allocated.
550    pub fn graph_or_create(&self, name: &str) -> Result<Arc<LpgStore>, AllocError> {
551        {
552            let graphs = self.named_graphs.read();
553            if let Some(g) = graphs.get(name) {
554                return Ok(Arc::clone(g));
555            }
556        }
557        let mut graphs = self.named_graphs.write();
558        // Double-check after acquiring write lock
559        if let Some(g) = graphs.get(name) {
560            return Ok(Arc::clone(g));
561        }
562        let store = Arc::new(LpgStore::new()?);
563        graphs.insert(name.to_string(), Arc::clone(&store));
564        Ok(store)
565    }
566
567    /// Creates a named graph. Returns `true` on success, `false` if it already exists.
568    ///
569    /// # Errors
570    ///
571    /// Returns [`AllocError`] if the new store cannot be allocated.
572    pub fn create_graph(&self, name: &str) -> Result<bool, AllocError> {
573        let mut graphs = self.named_graphs.write();
574        if graphs.contains_key(name) {
575            return Ok(false);
576        }
577        graphs.insert(name.to_string(), Arc::new(LpgStore::new()?));
578        Ok(true)
579    }
580
581    /// Drops a named graph. Returns `false` if it did not exist.
582    pub fn drop_graph(&self, name: &str) -> bool {
583        self.named_graphs.write().remove(name).is_some()
584    }
585
586    /// Returns all named graph names.
587    #[must_use]
588    pub fn graph_names(&self) -> Vec<String> {
589        self.named_graphs.read().keys().cloned().collect()
590    }
591
592    /// Returns the number of named graphs.
593    #[must_use]
594    pub fn graph_count(&self) -> usize {
595        self.named_graphs.read().len()
596    }
597
598    /// Clears a specific graph, or the default graph if `name` is `None`.
599    pub fn clear_graph(&self, name: Option<&str>) {
600        match name {
601            Some(n) => {
602                if let Some(g) = self.named_graphs.read().get(n) {
603                    g.clear();
604                }
605            }
606            None => self.clear(),
607        }
608    }
609
610    /// Copies all data from the source graph to the destination graph.
611    /// Creates the destination graph if it does not exist.
612    ///
613    /// # Errors
614    ///
615    /// Returns [`AllocError`] if the destination store cannot be allocated.
616    pub fn copy_graph(&self, source: Option<&str>, dest: Option<&str>) -> Result<(), AllocError> {
617        let _src = match source {
618            Some(n) => self.graph(n),
619            None => None, // default graph
620        };
621        let _dest_graph = dest.map(|n| self.graph_or_create(n)).transpose()?;
622        // Full graph copy is complex (requires iterating all entities).
623        // For now, this creates the destination graph structure.
624        // Full entity-level copy will be implemented when needed.
625        Ok(())
626    }
627
628    // === Internal Helpers ===
629
630    pub(super) fn get_or_create_label_id(&self, label: &str) -> u32 {
631        {
632            let label_to_id = self.label_to_id.read();
633            if let Some(&id) = label_to_id.get(label) {
634                return id;
635            }
636        }
637
638        let mut label_to_id = self.label_to_id.write();
639        let mut id_to_label = self.id_to_label.write();
640
641        // Double-check after acquiring write lock
642        if let Some(&id) = label_to_id.get(label) {
643            return id;
644        }
645
646        let id = id_to_label.len() as u32;
647
648        let label: ArcStr = label.into();
649        label_to_id.insert(label.clone(), id);
650        id_to_label.push(label);
651
652        id
653    }
654
655    pub(super) fn get_or_create_edge_type_id(&self, edge_type: &str) -> u32 {
656        {
657            let type_to_id = self.edge_type_to_id.read();
658            if let Some(&id) = type_to_id.get(edge_type) {
659                return id;
660            }
661        }
662
663        let mut type_to_id = self.edge_type_to_id.write();
664        let mut id_to_type = self.id_to_edge_type.write();
665
666        // Double-check
667        if let Some(&id) = type_to_id.get(edge_type) {
668            return id;
669        }
670
671        let id = id_to_type.len() as u32;
672        let edge_type: ArcStr = edge_type.into();
673        type_to_id.insert(edge_type.clone(), id);
674        id_to_type.push(edge_type);
675
676        // Grow edge type live counts to match
677        let mut counts = self.edge_type_live_counts.write();
678        while counts.len() <= id as usize {
679            counts.push(0);
680        }
681
682        id
683    }
684
685    /// Increments the live edge count for a given edge type.
686    pub(super) fn increment_edge_type_count(&self, type_id: u32) {
687        let mut counts = self.edge_type_live_counts.write();
688        if counts.len() <= type_id as usize {
689            counts.resize(type_id as usize + 1, 0);
690        }
691        counts[type_id as usize] += 1;
692    }
693
694    /// Decrements the live edge count for a given edge type.
695    pub(super) fn decrement_edge_type_count(&self, type_id: u32) {
696        let mut counts = self.edge_type_live_counts.write();
697        if type_id < counts.len() as u32 {
698            counts[type_id as usize] -= 1;
699        }
700    }
701}
702
703impl Default for LpgStore {
704    fn default() -> Self {
705        Self::new().expect("failed to allocate arena for default LpgStore")
706    }
707}