grafeo_core/graph/lpg/store/mod.rs
1//! The in-memory LPG graph store.
2//!
3//! This is where your nodes and edges actually live. Most users interact
4//! through [`GrafeoDB`](grafeo_engine::GrafeoDB), but algorithm implementers
5//! sometimes need the raw [`LpgStore`] for direct adjacency traversal.
6//!
7//! Key features:
8//! - MVCC versioning - concurrent readers don't block each other
9//! - Columnar properties with zone maps for fast filtering
10//! - Forward and backward adjacency indexes
11
12mod edge_ops;
13mod graph_store_impl;
14mod index;
15mod node_ops;
16mod property_ops;
17mod schema;
18mod search;
19mod statistics;
20mod traversal;
21mod versioning;
22
23#[cfg(test)]
24mod tests;
25
26use super::PropertyStorage;
27#[cfg(not(feature = "tiered-storage"))]
28use super::{EdgeRecord, NodeRecord};
29use crate::index::adjacency::ChunkedAdjacency;
30use crate::statistics::Statistics;
31use arcstr::ArcStr;
32use dashmap::DashMap;
33#[cfg(not(feature = "tiered-storage"))]
34use grafeo_common::mvcc::VersionChain;
35use grafeo_common::types::{
36 EdgeId, EpochId, HashableValue, NodeId, PropertyKey, TransactionId, Value,
37};
38use grafeo_common::utils::hash::{FxHashMap, FxHashSet};
39use parking_lot::RwLock;
40use std::cmp::Ordering as CmpOrdering;
41use std::sync::Arc;
42use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, Ordering};
43
44#[cfg(feature = "vector-index")]
45use crate::index::vector::HnswIndex;
46
47#[cfg(feature = "tiered-storage")]
48use crate::storage::EpochStore;
49use grafeo_common::memory::arena::AllocError;
50#[cfg(feature = "tiered-storage")]
51use grafeo_common::memory::arena::ArenaAllocator;
52#[cfg(feature = "tiered-storage")]
53use grafeo_common::mvcc::VersionIndex;
54
55/// Undo entry for a property mutation within a transaction.
56///
57/// Captures the previous state of a property so it can be restored on rollback.
58#[derive(Debug, Clone)]
59pub enum PropertyUndoEntry {
60 /// A node property was changed or added.
61 NodeProperty {
62 /// The node that was modified.
63 node_id: NodeId,
64 /// The property key that was set or removed.
65 key: PropertyKey,
66 /// The previous value, or `None` if the property did not exist before.
67 old_value: Option<Value>,
68 },
69 /// An edge property was changed or added.
70 EdgeProperty {
71 /// The edge that was modified.
72 edge_id: EdgeId,
73 /// The property key that was set or removed.
74 key: PropertyKey,
75 /// The previous value, or `None` if the property did not exist before.
76 old_value: Option<Value>,
77 },
78 /// A label was added to a node.
79 LabelAdded {
80 /// The node that had a label added.
81 node_id: NodeId,
82 /// The label string that was added.
83 label: String,
84 },
85 /// A label was removed from a node.
86 LabelRemoved {
87 /// The node that had a label removed.
88 node_id: NodeId,
89 /// The label string that was removed.
90 label: String,
91 },
92}
93
94/// Compares two values for ordering (used for range checks).
95pub(super) fn compare_values_for_range(a: &Value, b: &Value) -> Option<CmpOrdering> {
96 match (a, b) {
97 (Value::Int64(a), Value::Int64(b)) => Some(a.cmp(b)),
98 (Value::Float64(a), Value::Float64(b)) => a.partial_cmp(b),
99 (Value::String(a), Value::String(b)) => Some(a.cmp(b)),
100 (Value::Bool(a), Value::Bool(b)) => Some(a.cmp(b)),
101 (Value::Timestamp(a), Value::Timestamp(b)) => Some(a.cmp(b)),
102 (Value::Date(a), Value::Date(b)) => Some(a.cmp(b)),
103 (Value::Time(a), Value::Time(b)) => Some(a.cmp(b)),
104 _ => None,
105 }
106}
107
108/// Checks if a value is within a range.
109pub(super) fn value_in_range(
110 value: &Value,
111 min: Option<&Value>,
112 max: Option<&Value>,
113 min_inclusive: bool,
114 max_inclusive: bool,
115) -> bool {
116 // Check lower bound
117 if let Some(min_val) = min {
118 match compare_values_for_range(value, min_val) {
119 Some(CmpOrdering::Less) => return false,
120 Some(CmpOrdering::Equal) if !min_inclusive => return false,
121 None => return false, // Can't compare
122 _ => {}
123 }
124 }
125
126 // Check upper bound
127 if let Some(max_val) = max {
128 match compare_values_for_range(value, max_val) {
129 Some(CmpOrdering::Greater) => return false,
130 Some(CmpOrdering::Equal) if !max_inclusive => return false,
131 None => return false,
132 _ => {}
133 }
134 }
135
136 true
137}
138
139/// Configuration for the LPG store.
140///
141/// The defaults work well for most cases. Tune `backward_edges` if you only
142/// traverse in one direction (saves memory), or adjust capacities if you know
143/// your graph size upfront (avoids reallocations).
144#[derive(Debug, Clone)]
145pub struct LpgStoreConfig {
146 /// Maintain backward adjacency for incoming edge queries. Turn off if
147 /// you only traverse outgoing edges - saves ~50% adjacency memory.
148 pub backward_edges: bool,
149 /// Initial capacity for nodes (avoids early reallocations).
150 pub initial_node_capacity: usize,
151 /// Initial capacity for edges (avoids early reallocations).
152 pub initial_edge_capacity: usize,
153}
154
155impl Default for LpgStoreConfig {
156 fn default() -> Self {
157 Self {
158 backward_edges: true,
159 initial_node_capacity: 1024,
160 initial_edge_capacity: 4096,
161 }
162 }
163}
164
165/// The core in-memory graph storage.
166///
167/// Everything lives here: nodes, edges, properties, adjacency indexes, and
168/// version chains for MVCC. Concurrent reads never block each other.
169///
170/// Most users should go through `GrafeoDB` (from the `grafeo_engine` crate) which
171/// adds transaction management and query execution. Use `LpgStore` directly
172/// when you need raw performance for algorithm implementations.
173///
174/// # Example
175///
176/// ```
177/// use grafeo_core::graph::lpg::LpgStore;
178/// use grafeo_core::graph::Direction;
179///
180/// let store = LpgStore::new().expect("arena allocation");
181///
182/// // Create a small social network
183/// let alix = store.create_node(&["Person"]);
184/// let gus = store.create_node(&["Person"]);
185/// store.create_edge(alix, gus, "KNOWS");
186///
187/// // Traverse outgoing edges
188/// for neighbor in store.neighbors(alix, Direction::Outgoing) {
189/// println!("Alix knows node {:?}", neighbor);
190/// }
191/// ```
192///
193/// # Lock Ordering
194///
195/// `LpgStore` contains multiple `RwLock` fields that must be acquired in a
196/// consistent order to prevent deadlocks. Always acquire locks in this order:
197///
198/// ## Level 1 - Entity Storage (mutually exclusive via feature flag)
199/// 1. `nodes` / `node_versions`
200/// 2. `edges` / `edge_versions`
201///
202/// ## Level 2 - Catalogs (acquire as pairs when writing)
203/// 3. `label_to_id` + `id_to_label`
204/// 4. `edge_type_to_id` + `id_to_edge_type`
205///
206/// ## Level 3 - Indexes
207/// 5. `label_index`
208/// 6. `node_labels`
209/// 7. `property_indexes`
210///
211/// ## Level 4 - Statistics
212/// 8. `statistics`
213///
214/// ## Level 5 - Nested Locks (internal to other structs)
215/// 9. `PropertyStorage::columns` (via `node_properties`/`edge_properties`)
216/// 10. `ChunkedAdjacency::lists` (via `forward_adj`/`backward_adj`)
217///
218/// ## Rules
219/// - Catalog pairs must be acquired together when writing.
220/// - Never hold entity locks while acquiring catalog locks in a different scope.
221/// - Statistics lock is always last.
222/// - Read locks are generally safe, but avoid read-to-write upgrades.
223pub struct LpgStore {
224 /// Configuration.
225 #[allow(dead_code)]
226 pub(super) config: LpgStoreConfig,
227
228 /// Node records indexed by NodeId, with version chains for MVCC.
229 /// Used when `tiered-storage` feature is disabled.
230 /// Lock order: 1
231 #[cfg(not(feature = "tiered-storage"))]
232 pub(super) nodes: RwLock<FxHashMap<NodeId, VersionChain<NodeRecord>>>,
233
234 /// Edge records indexed by EdgeId, with version chains for MVCC.
235 /// Used when `tiered-storage` feature is disabled.
236 /// Lock order: 2
237 #[cfg(not(feature = "tiered-storage"))]
238 pub(super) edges: RwLock<FxHashMap<EdgeId, VersionChain<EdgeRecord>>>,
239
240 // === Tiered Storage Fields (feature-gated) ===
241 //
242 // Lock ordering for arena access:
243 // version_lock (read/write) → arena read lock (via arena_allocator.arena())
244 //
245 // Rules:
246 // - Acquire arena read lock *after* version locks, never before.
247 // - Multiple threads may call arena.read_at() concurrently (shared refs only).
248 // - Never acquire arena write lock (alloc_new_chunk) while holding version locks.
249 // - freeze_epoch order: node_versions.read() → arena.read_at(),
250 // then edge_versions.read() → arena.read_at().
251 /// Arena allocator for hot data storage.
252 /// Data is stored in per-epoch arenas for fast allocation and bulk deallocation.
253 #[cfg(feature = "tiered-storage")]
254 pub(super) arena_allocator: Arc<ArenaAllocator>,
255
256 /// Node version indexes - store metadata and arena offsets.
257 /// The actual NodeRecord data is stored in the arena.
258 /// Lock order: 1
259 #[cfg(feature = "tiered-storage")]
260 pub(super) node_versions: RwLock<FxHashMap<NodeId, VersionIndex>>,
261
262 /// Edge version indexes - store metadata and arena offsets.
263 /// The actual EdgeRecord data is stored in the arena.
264 /// Lock order: 2
265 #[cfg(feature = "tiered-storage")]
266 pub(super) edge_versions: RwLock<FxHashMap<EdgeId, VersionIndex>>,
267
268 /// Cold storage for frozen epochs.
269 /// Contains compressed epoch blocks for historical data.
270 #[cfg(feature = "tiered-storage")]
271 pub(super) epoch_store: Arc<EpochStore>,
272
273 /// Property storage for nodes.
274 pub(super) node_properties: PropertyStorage<NodeId>,
275
276 /// Property storage for edges.
277 pub(super) edge_properties: PropertyStorage<EdgeId>,
278
279 /// Label name to ID mapping.
280 /// Lock order: 3 (acquire with id_to_label)
281 pub(super) label_to_id: RwLock<FxHashMap<ArcStr, u32>>,
282
283 /// Label ID to name mapping.
284 /// Lock order: 3 (acquire with label_to_id)
285 pub(super) id_to_label: RwLock<Vec<ArcStr>>,
286
287 /// Edge type name to ID mapping.
288 /// Lock order: 4 (acquire with id_to_edge_type)
289 pub(super) edge_type_to_id: RwLock<FxHashMap<ArcStr, u32>>,
290
291 /// Edge type ID to name mapping.
292 /// Lock order: 4 (acquire with edge_type_to_id)
293 pub(super) id_to_edge_type: RwLock<Vec<ArcStr>>,
294
295 /// Forward adjacency lists (outgoing edges).
296 pub(super) forward_adj: ChunkedAdjacency,
297
298 /// Backward adjacency lists (incoming edges).
299 /// Only populated if config.backward_edges is true.
300 pub(super) backward_adj: Option<ChunkedAdjacency>,
301
302 /// Label index: label_id -> set of node IDs.
303 /// Lock order: 5
304 pub(super) label_index: RwLock<Vec<FxHashMap<NodeId, ()>>>,
305
306 /// Node labels: node_id -> set of label IDs.
307 /// Reverse mapping to efficiently get labels for a node.
308 /// Lock order: 6
309 pub(super) node_labels: RwLock<FxHashMap<NodeId, FxHashSet<u32>>>,
310
311 /// Property indexes: property_key -> (value -> set of node IDs).
312 ///
313 /// When a property is indexed, lookups by value are O(1) instead of O(n).
314 /// Use [`create_property_index`] to enable indexing for a property.
315 /// Lock order: 7
316 pub(super) property_indexes:
317 RwLock<FxHashMap<PropertyKey, DashMap<HashableValue, FxHashSet<NodeId>>>>,
318
319 /// Vector indexes: "label:property" -> HNSW index.
320 ///
321 /// Created via [`GrafeoDB::create_vector_index`](grafeo_engine::GrafeoDB::create_vector_index).
322 /// Lock order: 7 (same level as property_indexes, disjoint keys)
323 #[cfg(feature = "vector-index")]
324 pub(super) vector_indexes: RwLock<FxHashMap<String, Arc<HnswIndex>>>,
325
326 /// Text indexes: "label:property" -> inverted index with BM25 scoring.
327 ///
328 /// Created via [`GrafeoDB::create_text_index`](grafeo_engine::GrafeoDB::create_text_index).
329 /// Lock order: 7 (same level as property_indexes, disjoint keys)
330 #[cfg(feature = "text-index")]
331 pub(super) text_indexes:
332 RwLock<FxHashMap<String, Arc<RwLock<crate::index::text::InvertedIndex>>>>,
333
334 /// Next node ID.
335 pub(super) next_node_id: AtomicU64,
336
337 /// Next edge ID.
338 pub(super) next_edge_id: AtomicU64,
339
340 /// Current epoch.
341 pub(super) current_epoch: AtomicU64,
342
343 /// Live (non-deleted) node count, maintained incrementally.
344 /// Avoids O(n) full scan in `compute_statistics()`.
345 pub(super) live_node_count: AtomicI64,
346
347 /// Live (non-deleted) edge count, maintained incrementally.
348 /// Avoids O(m) full scan in `compute_statistics()`.
349 pub(super) live_edge_count: AtomicI64,
350
351 /// Per-edge-type live counts, indexed by edge_type_id.
352 /// Avoids O(m) edge scan in `compute_statistics()`.
353 /// Lock order: 8 (same level as statistics)
354 pub(super) edge_type_live_counts: RwLock<Vec<i64>>,
355
356 /// Statistics for cost-based optimization.
357 /// Lock order: 8 (always last)
358 pub(super) statistics: RwLock<Arc<Statistics>>,
359
360 /// Whether statistics need full recomputation (e.g., after rollback).
361 pub(super) needs_stats_recompute: AtomicBool,
362
363 /// Named graphs, each an independent `LpgStore` partition.
364 /// Zero overhead for single-graph databases (empty HashMap).
365 /// Lock order: 9 (after statistics)
366 named_graphs: RwLock<FxHashMap<String, Arc<LpgStore>>>,
367
368 /// Undo log for property mutations within transactions.
369 ///
370 /// Maps transaction IDs to a list of undo entries that capture the
371 /// previous property values. On rollback, entries are replayed in
372 /// reverse order to restore properties. On commit, the entries are
373 /// simply discarded.
374 /// Lock order: 10 (after named_graphs, independent of other locks)
375 property_undo_log: RwLock<FxHashMap<TransactionId, Vec<PropertyUndoEntry>>>,
376}
377
378impl LpgStore {
379 /// Creates a new LPG store with default configuration.
380 ///
381 /// # Errors
382 ///
383 /// Returns [`AllocError`] if the arena allocator cannot be initialized
384 /// (only possible with the `tiered-storage` feature).
385 pub fn new() -> Result<Self, AllocError> {
386 Self::with_config(LpgStoreConfig::default())
387 }
388
389 /// Creates a new LPG store with custom configuration.
390 ///
391 /// # Errors
392 ///
393 /// Returns [`AllocError`] if the arena allocator cannot be initialized
394 /// (only possible with the `tiered-storage` feature).
395 pub fn with_config(config: LpgStoreConfig) -> Result<Self, AllocError> {
396 let backward_adj = if config.backward_edges {
397 Some(ChunkedAdjacency::new())
398 } else {
399 None
400 };
401
402 Ok(Self {
403 #[cfg(not(feature = "tiered-storage"))]
404 nodes: RwLock::new(FxHashMap::default()),
405 #[cfg(not(feature = "tiered-storage"))]
406 edges: RwLock::new(FxHashMap::default()),
407 #[cfg(feature = "tiered-storage")]
408 arena_allocator: Arc::new(ArenaAllocator::new()?),
409 #[cfg(feature = "tiered-storage")]
410 node_versions: RwLock::new(FxHashMap::default()),
411 #[cfg(feature = "tiered-storage")]
412 edge_versions: RwLock::new(FxHashMap::default()),
413 #[cfg(feature = "tiered-storage")]
414 epoch_store: Arc::new(EpochStore::new()),
415 node_properties: PropertyStorage::new(),
416 edge_properties: PropertyStorage::new(),
417 label_to_id: RwLock::new(FxHashMap::default()),
418 id_to_label: RwLock::new(Vec::new()),
419 edge_type_to_id: RwLock::new(FxHashMap::default()),
420 id_to_edge_type: RwLock::new(Vec::new()),
421 forward_adj: ChunkedAdjacency::new(),
422 backward_adj,
423 label_index: RwLock::new(Vec::new()),
424 node_labels: RwLock::new(FxHashMap::default()),
425 property_indexes: RwLock::new(FxHashMap::default()),
426 #[cfg(feature = "vector-index")]
427 vector_indexes: RwLock::new(FxHashMap::default()),
428 #[cfg(feature = "text-index")]
429 text_indexes: RwLock::new(FxHashMap::default()),
430 next_node_id: AtomicU64::new(0),
431 next_edge_id: AtomicU64::new(0),
432 current_epoch: AtomicU64::new(0),
433 live_node_count: AtomicI64::new(0),
434 live_edge_count: AtomicI64::new(0),
435 edge_type_live_counts: RwLock::new(Vec::new()),
436 statistics: RwLock::new(Arc::new(Statistics::new())),
437 needs_stats_recompute: AtomicBool::new(false),
438 named_graphs: RwLock::new(FxHashMap::default()),
439 property_undo_log: RwLock::new(FxHashMap::default()),
440 config,
441 })
442 }
443
444 /// Returns the current epoch.
445 #[must_use]
446 pub fn current_epoch(&self) -> EpochId {
447 EpochId::new(self.current_epoch.load(Ordering::Acquire))
448 }
449
450 /// Creates a new epoch.
451 #[doc(hidden)]
452 pub fn new_epoch(&self) -> EpochId {
453 let id = self.current_epoch.fetch_add(1, Ordering::AcqRel) + 1;
454 EpochId::new(id)
455 }
456
457 /// Syncs the store epoch to match an external epoch counter.
458 ///
459 /// Used by the transaction manager to keep the store's epoch in step
460 /// after a transaction commit advances the global epoch.
461 #[doc(hidden)]
462 pub fn sync_epoch(&self, epoch: EpochId) {
463 self.current_epoch
464 .fetch_max(epoch.as_u64(), Ordering::AcqRel);
465 }
466
467 /// Removes all data from the store, resetting it to an empty state.
468 ///
469 /// Acquires locks in the documented ordering to prevent deadlocks.
470 /// After clearing, the store behaves as if freshly constructed.
471 pub fn clear(&self) {
472 // Level 1: Entity storage
473 #[cfg(not(feature = "tiered-storage"))]
474 {
475 self.nodes.write().clear();
476 self.edges.write().clear();
477 }
478 #[cfg(feature = "tiered-storage")]
479 {
480 self.node_versions.write().clear();
481 self.edge_versions.write().clear();
482 // Arena allocator chunks are leaked; epochs are cleared via epoch_store.
483 }
484
485 // Level 2: Catalogs (acquire as pairs)
486 {
487 self.label_to_id.write().clear();
488 self.id_to_label.write().clear();
489 }
490 {
491 self.edge_type_to_id.write().clear();
492 self.id_to_edge_type.write().clear();
493 }
494
495 // Level 3: Indexes
496 self.label_index.write().clear();
497 self.node_labels.write().clear();
498 self.property_indexes.write().clear();
499 #[cfg(feature = "vector-index")]
500 self.vector_indexes.write().clear();
501 #[cfg(feature = "text-index")]
502 self.text_indexes.write().clear();
503
504 // Nested: Properties and adjacency
505 self.node_properties.clear();
506 self.edge_properties.clear();
507 self.forward_adj.clear();
508 if let Some(ref backward) = self.backward_adj {
509 backward.clear();
510 }
511
512 // Atomics: ID counters
513 self.next_node_id.store(0, Ordering::Release);
514 self.next_edge_id.store(0, Ordering::Release);
515 self.current_epoch.store(0, Ordering::Release);
516
517 // Level 4: Statistics
518 self.live_node_count.store(0, Ordering::Release);
519 self.live_edge_count.store(0, Ordering::Release);
520 self.edge_type_live_counts.write().clear();
521 *self.statistics.write() = Arc::new(Statistics::new());
522 self.needs_stats_recompute.store(false, Ordering::Release);
523
524 // Level 5: Undo log
525 self.property_undo_log.write().clear();
526 }
527
528 /// Returns whether backward adjacency (incoming edge index) is available.
529 ///
530 /// When backward adjacency is enabled (the default), bidirectional search
531 /// algorithms can traverse from the target toward the source.
532 #[must_use]
533 pub fn has_backward_adjacency(&self) -> bool {
534 self.backward_adj.is_some()
535 }
536
537 // === Named Graph Management ===
538
539 /// Returns a named graph by name, or `None` if it does not exist.
540 #[must_use]
541 pub fn graph(&self, name: &str) -> Option<Arc<LpgStore>> {
542 self.named_graphs.read().get(name).cloned()
543 }
544
545 /// Returns a named graph, creating it if it does not exist.
546 ///
547 /// # Errors
548 ///
549 /// Returns [`AllocError`] if a new store cannot be allocated.
550 pub fn graph_or_create(&self, name: &str) -> Result<Arc<LpgStore>, AllocError> {
551 {
552 let graphs = self.named_graphs.read();
553 if let Some(g) = graphs.get(name) {
554 return Ok(Arc::clone(g));
555 }
556 }
557 let mut graphs = self.named_graphs.write();
558 // Double-check after acquiring write lock
559 if let Some(g) = graphs.get(name) {
560 return Ok(Arc::clone(g));
561 }
562 let store = Arc::new(LpgStore::new()?);
563 graphs.insert(name.to_string(), Arc::clone(&store));
564 Ok(store)
565 }
566
567 /// Creates a named graph. Returns `true` on success, `false` if it already exists.
568 ///
569 /// # Errors
570 ///
571 /// Returns [`AllocError`] if the new store cannot be allocated.
572 pub fn create_graph(&self, name: &str) -> Result<bool, AllocError> {
573 let mut graphs = self.named_graphs.write();
574 if graphs.contains_key(name) {
575 return Ok(false);
576 }
577 graphs.insert(name.to_string(), Arc::new(LpgStore::new()?));
578 Ok(true)
579 }
580
581 /// Drops a named graph. Returns `false` if it did not exist.
582 pub fn drop_graph(&self, name: &str) -> bool {
583 self.named_graphs.write().remove(name).is_some()
584 }
585
586 /// Returns all named graph names.
587 #[must_use]
588 pub fn graph_names(&self) -> Vec<String> {
589 self.named_graphs.read().keys().cloned().collect()
590 }
591
592 /// Returns the number of named graphs.
593 #[must_use]
594 pub fn graph_count(&self) -> usize {
595 self.named_graphs.read().len()
596 }
597
598 /// Clears a specific graph, or the default graph if `name` is `None`.
599 pub fn clear_graph(&self, name: Option<&str>) {
600 match name {
601 Some(n) => {
602 if let Some(g) = self.named_graphs.read().get(n) {
603 g.clear();
604 }
605 }
606 None => self.clear(),
607 }
608 }
609
610 /// Copies all data from the source graph to the destination graph.
611 /// Creates the destination graph if it does not exist.
612 ///
613 /// # Errors
614 ///
615 /// Returns [`AllocError`] if the destination store cannot be allocated.
616 pub fn copy_graph(&self, source: Option<&str>, dest: Option<&str>) -> Result<(), AllocError> {
617 let _src = match source {
618 Some(n) => self.graph(n),
619 None => None, // default graph
620 };
621 let _dest_graph = dest.map(|n| self.graph_or_create(n)).transpose()?;
622 // Full graph copy is complex (requires iterating all entities).
623 // For now, this creates the destination graph structure.
624 // Full entity-level copy will be implemented when needed.
625 Ok(())
626 }
627
628 // === Internal Helpers ===
629
630 pub(super) fn get_or_create_label_id(&self, label: &str) -> u32 {
631 {
632 let label_to_id = self.label_to_id.read();
633 if let Some(&id) = label_to_id.get(label) {
634 return id;
635 }
636 }
637
638 let mut label_to_id = self.label_to_id.write();
639 let mut id_to_label = self.id_to_label.write();
640
641 // Double-check after acquiring write lock
642 if let Some(&id) = label_to_id.get(label) {
643 return id;
644 }
645
646 let id = id_to_label.len() as u32;
647
648 let label: ArcStr = label.into();
649 label_to_id.insert(label.clone(), id);
650 id_to_label.push(label);
651
652 id
653 }
654
655 pub(super) fn get_or_create_edge_type_id(&self, edge_type: &str) -> u32 {
656 {
657 let type_to_id = self.edge_type_to_id.read();
658 if let Some(&id) = type_to_id.get(edge_type) {
659 return id;
660 }
661 }
662
663 let mut type_to_id = self.edge_type_to_id.write();
664 let mut id_to_type = self.id_to_edge_type.write();
665
666 // Double-check
667 if let Some(&id) = type_to_id.get(edge_type) {
668 return id;
669 }
670
671 let id = id_to_type.len() as u32;
672 let edge_type: ArcStr = edge_type.into();
673 type_to_id.insert(edge_type.clone(), id);
674 id_to_type.push(edge_type);
675
676 // Grow edge type live counts to match
677 let mut counts = self.edge_type_live_counts.write();
678 while counts.len() <= id as usize {
679 counts.push(0);
680 }
681
682 id
683 }
684
685 /// Increments the live edge count for a given edge type.
686 pub(super) fn increment_edge_type_count(&self, type_id: u32) {
687 let mut counts = self.edge_type_live_counts.write();
688 if counts.len() <= type_id as usize {
689 counts.resize(type_id as usize + 1, 0);
690 }
691 counts[type_id as usize] += 1;
692 }
693
694 /// Decrements the live edge count for a given edge type.
695 pub(super) fn decrement_edge_type_count(&self, type_id: u32) {
696 let mut counts = self.edge_type_live_counts.write();
697 if type_id < counts.len() as u32 {
698 counts[type_id as usize] -= 1;
699 }
700 }
701}
702
703impl Default for LpgStore {
704 fn default() -> Self {
705 Self::new().expect("failed to allocate arena for default LpgStore")
706 }
707}