grafeo_core/graph/lpg/store/
mod.rs1mod edge_ops;
13mod graph_store_impl;
14mod index;
15mod node_ops;
16mod property_ops;
17mod schema;
18mod search;
19mod statistics;
20mod traversal;
21mod versioning;
22
23#[cfg(test)]
24mod tests;
25
26use super::PropertyStorage;
27#[cfg(not(feature = "tiered-storage"))]
28use super::{EdgeRecord, NodeRecord};
29use crate::index::adjacency::ChunkedAdjacency;
30use crate::statistics::Statistics;
31use arcstr::ArcStr;
32use dashmap::DashMap;
33#[cfg(not(feature = "tiered-storage"))]
34use grafeo_common::mvcc::VersionChain;
35use grafeo_common::types::{EdgeId, EpochId, HashableValue, NodeId, PropertyKey, Value};
36use grafeo_common::utils::hash::{FxHashMap, FxHashSet};
37use parking_lot::RwLock;
38use std::cmp::Ordering as CmpOrdering;
39use std::sync::Arc;
40use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, Ordering};
41
42#[cfg(feature = "vector-index")]
43use crate::index::vector::HnswIndex;
44
45#[cfg(feature = "tiered-storage")]
46use crate::storage::EpochStore;
47#[cfg(feature = "tiered-storage")]
48use grafeo_common::memory::arena::ArenaAllocator;
49#[cfg(feature = "tiered-storage")]
50use grafeo_common::mvcc::VersionIndex;
51
52pub(super) fn compare_values_for_range(a: &Value, b: &Value) -> Option<CmpOrdering> {
54 match (a, b) {
55 (Value::Int64(a), Value::Int64(b)) => Some(a.cmp(b)),
56 (Value::Float64(a), Value::Float64(b)) => a.partial_cmp(b),
57 (Value::String(a), Value::String(b)) => Some(a.cmp(b)),
58 (Value::Bool(a), Value::Bool(b)) => Some(a.cmp(b)),
59 _ => None,
60 }
61}
62
63pub(super) fn value_in_range(
65 value: &Value,
66 min: Option<&Value>,
67 max: Option<&Value>,
68 min_inclusive: bool,
69 max_inclusive: bool,
70) -> bool {
71 if let Some(min_val) = min {
73 match compare_values_for_range(value, min_val) {
74 Some(CmpOrdering::Less) => return false,
75 Some(CmpOrdering::Equal) if !min_inclusive => return false,
76 None => return false, _ => {}
78 }
79 }
80
81 if let Some(max_val) = max {
83 match compare_values_for_range(value, max_val) {
84 Some(CmpOrdering::Greater) => return false,
85 Some(CmpOrdering::Equal) if !max_inclusive => return false,
86 None => return false,
87 _ => {}
88 }
89 }
90
91 true
92}
93
94#[derive(Debug, Clone)]
100pub struct LpgStoreConfig {
101 pub backward_edges: bool,
104 pub initial_node_capacity: usize,
106 pub initial_edge_capacity: usize,
108}
109
110impl Default for LpgStoreConfig {
111 fn default() -> Self {
112 Self {
113 backward_edges: true,
114 initial_node_capacity: 1024,
115 initial_edge_capacity: 4096,
116 }
117 }
118}
119
120pub struct LpgStore {
179 #[allow(dead_code)]
181 pub(super) config: LpgStoreConfig,
182
183 #[cfg(not(feature = "tiered-storage"))]
187 pub(super) nodes: RwLock<FxHashMap<NodeId, VersionChain<NodeRecord>>>,
188
189 #[cfg(not(feature = "tiered-storage"))]
193 pub(super) edges: RwLock<FxHashMap<EdgeId, VersionChain<EdgeRecord>>>,
194
195 #[cfg(feature = "tiered-storage")]
209 pub(super) arena_allocator: Arc<ArenaAllocator>,
210
211 #[cfg(feature = "tiered-storage")]
215 pub(super) node_versions: RwLock<FxHashMap<NodeId, VersionIndex>>,
216
217 #[cfg(feature = "tiered-storage")]
221 pub(super) edge_versions: RwLock<FxHashMap<EdgeId, VersionIndex>>,
222
223 #[cfg(feature = "tiered-storage")]
226 pub(super) epoch_store: Arc<EpochStore>,
227
228 pub(super) node_properties: PropertyStorage<NodeId>,
230
231 pub(super) edge_properties: PropertyStorage<EdgeId>,
233
234 pub(super) label_to_id: RwLock<FxHashMap<ArcStr, u32>>,
237
238 pub(super) id_to_label: RwLock<Vec<ArcStr>>,
241
242 pub(super) edge_type_to_id: RwLock<FxHashMap<ArcStr, u32>>,
245
246 pub(super) id_to_edge_type: RwLock<Vec<ArcStr>>,
249
250 pub(super) forward_adj: ChunkedAdjacency,
252
253 pub(super) backward_adj: Option<ChunkedAdjacency>,
256
257 pub(super) label_index: RwLock<Vec<FxHashMap<NodeId, ()>>>,
260
261 pub(super) node_labels: RwLock<FxHashMap<NodeId, FxHashSet<u32>>>,
265
266 pub(super) property_indexes:
272 RwLock<FxHashMap<PropertyKey, DashMap<HashableValue, FxHashSet<NodeId>>>>,
273
274 #[cfg(feature = "vector-index")]
279 pub(super) vector_indexes: RwLock<FxHashMap<String, Arc<HnswIndex>>>,
280
281 #[cfg(feature = "text-index")]
286 pub(super) text_indexes:
287 RwLock<FxHashMap<String, Arc<RwLock<crate::index::text::InvertedIndex>>>>,
288
289 pub(super) next_node_id: AtomicU64,
291
292 pub(super) next_edge_id: AtomicU64,
294
295 pub(super) current_epoch: AtomicU64,
297
298 pub(super) live_node_count: AtomicI64,
301
302 pub(super) live_edge_count: AtomicI64,
305
306 pub(super) edge_type_live_counts: RwLock<Vec<i64>>,
310
311 pub(super) statistics: RwLock<Arc<Statistics>>,
314
315 pub(super) needs_stats_recompute: AtomicBool,
317}
318
319impl LpgStore {
320 #[must_use]
322 pub fn new() -> Self {
323 Self::with_config(LpgStoreConfig::default())
324 }
325
326 #[must_use]
328 pub fn with_config(config: LpgStoreConfig) -> Self {
329 let backward_adj = if config.backward_edges {
330 Some(ChunkedAdjacency::new())
331 } else {
332 None
333 };
334
335 Self {
336 #[cfg(not(feature = "tiered-storage"))]
337 nodes: RwLock::new(FxHashMap::default()),
338 #[cfg(not(feature = "tiered-storage"))]
339 edges: RwLock::new(FxHashMap::default()),
340 #[cfg(feature = "tiered-storage")]
341 arena_allocator: Arc::new(ArenaAllocator::new()),
342 #[cfg(feature = "tiered-storage")]
343 node_versions: RwLock::new(FxHashMap::default()),
344 #[cfg(feature = "tiered-storage")]
345 edge_versions: RwLock::new(FxHashMap::default()),
346 #[cfg(feature = "tiered-storage")]
347 epoch_store: Arc::new(EpochStore::new()),
348 node_properties: PropertyStorage::new(),
349 edge_properties: PropertyStorage::new(),
350 label_to_id: RwLock::new(FxHashMap::default()),
351 id_to_label: RwLock::new(Vec::new()),
352 edge_type_to_id: RwLock::new(FxHashMap::default()),
353 id_to_edge_type: RwLock::new(Vec::new()),
354 forward_adj: ChunkedAdjacency::new(),
355 backward_adj,
356 label_index: RwLock::new(Vec::new()),
357 node_labels: RwLock::new(FxHashMap::default()),
358 property_indexes: RwLock::new(FxHashMap::default()),
359 #[cfg(feature = "vector-index")]
360 vector_indexes: RwLock::new(FxHashMap::default()),
361 #[cfg(feature = "text-index")]
362 text_indexes: RwLock::new(FxHashMap::default()),
363 next_node_id: AtomicU64::new(0),
364 next_edge_id: AtomicU64::new(0),
365 current_epoch: AtomicU64::new(0),
366 live_node_count: AtomicI64::new(0),
367 live_edge_count: AtomicI64::new(0),
368 edge_type_live_counts: RwLock::new(Vec::new()),
369 statistics: RwLock::new(Arc::new(Statistics::new())),
370 needs_stats_recompute: AtomicBool::new(false),
371 config,
372 }
373 }
374
375 #[must_use]
377 pub fn current_epoch(&self) -> EpochId {
378 EpochId::new(self.current_epoch.load(Ordering::Acquire))
379 }
380
381 #[doc(hidden)]
383 pub fn new_epoch(&self) -> EpochId {
384 let id = self.current_epoch.fetch_add(1, Ordering::AcqRel) + 1;
385 EpochId::new(id)
386 }
387
388 #[doc(hidden)]
393 pub fn sync_epoch(&self, epoch: EpochId) {
394 self.current_epoch
395 .fetch_max(epoch.as_u64(), Ordering::AcqRel);
396 }
397
398 pub fn clear(&self) {
403 #[cfg(not(feature = "tiered-storage"))]
405 {
406 self.nodes.write().clear();
407 self.edges.write().clear();
408 }
409 #[cfg(feature = "tiered-storage")]
410 {
411 self.node_versions.write().clear();
412 self.edge_versions.write().clear();
413 }
415
416 {
418 self.label_to_id.write().clear();
419 self.id_to_label.write().clear();
420 }
421 {
422 self.edge_type_to_id.write().clear();
423 self.id_to_edge_type.write().clear();
424 }
425
426 self.label_index.write().clear();
428 self.node_labels.write().clear();
429 self.property_indexes.write().clear();
430 #[cfg(feature = "vector-index")]
431 self.vector_indexes.write().clear();
432 #[cfg(feature = "text-index")]
433 self.text_indexes.write().clear();
434
435 self.node_properties.clear();
437 self.edge_properties.clear();
438 self.forward_adj.clear();
439 if let Some(ref backward) = self.backward_adj {
440 backward.clear();
441 }
442
443 self.next_node_id.store(0, Ordering::Release);
445 self.next_edge_id.store(0, Ordering::Release);
446 self.current_epoch.store(0, Ordering::Release);
447
448 self.live_node_count.store(0, Ordering::Release);
450 self.live_edge_count.store(0, Ordering::Release);
451 self.edge_type_live_counts.write().clear();
452 *self.statistics.write() = Arc::new(Statistics::new());
453 self.needs_stats_recompute.store(false, Ordering::Release);
454 }
455
456 #[must_use]
461 pub fn has_backward_adjacency(&self) -> bool {
462 self.backward_adj.is_some()
463 }
464
465 pub(super) fn get_or_create_label_id(&self, label: &str) -> u32 {
468 {
469 let label_to_id = self.label_to_id.read();
470 if let Some(&id) = label_to_id.get(label) {
471 return id;
472 }
473 }
474
475 let mut label_to_id = self.label_to_id.write();
476 let mut id_to_label = self.id_to_label.write();
477
478 if let Some(&id) = label_to_id.get(label) {
480 return id;
481 }
482
483 let id = id_to_label.len() as u32;
484
485 let label: ArcStr = label.into();
486 label_to_id.insert(label.clone(), id);
487 id_to_label.push(label);
488
489 id
490 }
491
492 pub(super) fn get_or_create_edge_type_id(&self, edge_type: &str) -> u32 {
493 {
494 let type_to_id = self.edge_type_to_id.read();
495 if let Some(&id) = type_to_id.get(edge_type) {
496 return id;
497 }
498 }
499
500 let mut type_to_id = self.edge_type_to_id.write();
501 let mut id_to_type = self.id_to_edge_type.write();
502
503 if let Some(&id) = type_to_id.get(edge_type) {
505 return id;
506 }
507
508 let id = id_to_type.len() as u32;
509 let edge_type: ArcStr = edge_type.into();
510 type_to_id.insert(edge_type.clone(), id);
511 id_to_type.push(edge_type);
512
513 let mut counts = self.edge_type_live_counts.write();
515 while counts.len() <= id as usize {
516 counts.push(0);
517 }
518
519 id
520 }
521
522 pub(super) fn increment_edge_type_count(&self, type_id: u32) {
524 let mut counts = self.edge_type_live_counts.write();
525 if counts.len() <= type_id as usize {
526 counts.resize(type_id as usize + 1, 0);
527 }
528 counts[type_id as usize] += 1;
529 }
530
531 pub(super) fn decrement_edge_type_count(&self, type_id: u32) {
533 let mut counts = self.edge_type_live_counts.write();
534 if type_id < counts.len() as u32 {
535 counts[type_id as usize] -= 1;
536 }
537 }
538}
539
540impl Default for LpgStore {
541 fn default() -> Self {
542 Self::new()
543 }
544}