grafeo_core/graph/lpg/store/
mod.rs1mod edge_ops;
13mod graph_store_impl;
14mod index;
15mod node_ops;
16mod property_ops;
17mod schema;
18mod search;
19mod statistics;
20mod traversal;
21mod versioning;
22
23#[cfg(test)]
24mod tests;
25
26use super::PropertyStorage;
27#[cfg(not(feature = "tiered-storage"))]
28use super::{EdgeRecord, NodeRecord};
29use crate::index::adjacency::ChunkedAdjacency;
30use crate::statistics::Statistics;
31use arcstr::ArcStr;
32use dashmap::DashMap;
33#[cfg(not(feature = "tiered-storage"))]
34use grafeo_common::mvcc::VersionChain;
35use grafeo_common::types::{EdgeId, EpochId, HashableValue, NodeId, PropertyKey, Value};
36use grafeo_common::utils::hash::{FxHashMap, FxHashSet};
37use parking_lot::RwLock;
38use std::cmp::Ordering as CmpOrdering;
39use std::sync::Arc;
40use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, Ordering};
41
42#[cfg(feature = "vector-index")]
43use crate::index::vector::HnswIndex;
44
45#[cfg(feature = "tiered-storage")]
46use crate::storage::EpochStore;
47#[cfg(feature = "tiered-storage")]
48use grafeo_common::memory::arena::ArenaAllocator;
49#[cfg(feature = "tiered-storage")]
50use grafeo_common::mvcc::VersionIndex;
51
52pub(super) fn compare_values_for_range(a: &Value, b: &Value) -> Option<CmpOrdering> {
54 match (a, b) {
55 (Value::Int64(a), Value::Int64(b)) => Some(a.cmp(b)),
56 (Value::Float64(a), Value::Float64(b)) => a.partial_cmp(b),
57 (Value::String(a), Value::String(b)) => Some(a.cmp(b)),
58 (Value::Bool(a), Value::Bool(b)) => Some(a.cmp(b)),
59 (Value::Timestamp(a), Value::Timestamp(b)) => Some(a.cmp(b)),
60 (Value::Date(a), Value::Date(b)) => Some(a.cmp(b)),
61 (Value::Time(a), Value::Time(b)) => Some(a.cmp(b)),
62 _ => None,
63 }
64}
65
66pub(super) fn value_in_range(
68 value: &Value,
69 min: Option<&Value>,
70 max: Option<&Value>,
71 min_inclusive: bool,
72 max_inclusive: bool,
73) -> bool {
74 if let Some(min_val) = min {
76 match compare_values_for_range(value, min_val) {
77 Some(CmpOrdering::Less) => return false,
78 Some(CmpOrdering::Equal) if !min_inclusive => return false,
79 None => return false, _ => {}
81 }
82 }
83
84 if let Some(max_val) = max {
86 match compare_values_for_range(value, max_val) {
87 Some(CmpOrdering::Greater) => return false,
88 Some(CmpOrdering::Equal) if !max_inclusive => return false,
89 None => return false,
90 _ => {}
91 }
92 }
93
94 true
95}
96
97#[derive(Debug, Clone)]
103pub struct LpgStoreConfig {
104 pub backward_edges: bool,
107 pub initial_node_capacity: usize,
109 pub initial_edge_capacity: usize,
111}
112
113impl Default for LpgStoreConfig {
114 fn default() -> Self {
115 Self {
116 backward_edges: true,
117 initial_node_capacity: 1024,
118 initial_edge_capacity: 4096,
119 }
120 }
121}
122
123pub struct LpgStore {
182 #[allow(dead_code)]
184 pub(super) config: LpgStoreConfig,
185
186 #[cfg(not(feature = "tiered-storage"))]
190 pub(super) nodes: RwLock<FxHashMap<NodeId, VersionChain<NodeRecord>>>,
191
192 #[cfg(not(feature = "tiered-storage"))]
196 pub(super) edges: RwLock<FxHashMap<EdgeId, VersionChain<EdgeRecord>>>,
197
198 #[cfg(feature = "tiered-storage")]
212 pub(super) arena_allocator: Arc<ArenaAllocator>,
213
214 #[cfg(feature = "tiered-storage")]
218 pub(super) node_versions: RwLock<FxHashMap<NodeId, VersionIndex>>,
219
220 #[cfg(feature = "tiered-storage")]
224 pub(super) edge_versions: RwLock<FxHashMap<EdgeId, VersionIndex>>,
225
226 #[cfg(feature = "tiered-storage")]
229 pub(super) epoch_store: Arc<EpochStore>,
230
231 pub(super) node_properties: PropertyStorage<NodeId>,
233
234 pub(super) edge_properties: PropertyStorage<EdgeId>,
236
237 pub(super) label_to_id: RwLock<FxHashMap<ArcStr, u32>>,
240
241 pub(super) id_to_label: RwLock<Vec<ArcStr>>,
244
245 pub(super) edge_type_to_id: RwLock<FxHashMap<ArcStr, u32>>,
248
249 pub(super) id_to_edge_type: RwLock<Vec<ArcStr>>,
252
253 pub(super) forward_adj: ChunkedAdjacency,
255
256 pub(super) backward_adj: Option<ChunkedAdjacency>,
259
260 pub(super) label_index: RwLock<Vec<FxHashMap<NodeId, ()>>>,
263
264 pub(super) node_labels: RwLock<FxHashMap<NodeId, FxHashSet<u32>>>,
268
269 pub(super) property_indexes:
275 RwLock<FxHashMap<PropertyKey, DashMap<HashableValue, FxHashSet<NodeId>>>>,
276
277 #[cfg(feature = "vector-index")]
282 pub(super) vector_indexes: RwLock<FxHashMap<String, Arc<HnswIndex>>>,
283
284 #[cfg(feature = "text-index")]
289 pub(super) text_indexes:
290 RwLock<FxHashMap<String, Arc<RwLock<crate::index::text::InvertedIndex>>>>,
291
292 pub(super) next_node_id: AtomicU64,
294
295 pub(super) next_edge_id: AtomicU64,
297
298 pub(super) current_epoch: AtomicU64,
300
301 pub(super) live_node_count: AtomicI64,
304
305 pub(super) live_edge_count: AtomicI64,
308
309 pub(super) edge_type_live_counts: RwLock<Vec<i64>>,
313
314 pub(super) statistics: RwLock<Arc<Statistics>>,
317
318 pub(super) needs_stats_recompute: AtomicBool,
320
321 named_graphs: RwLock<FxHashMap<String, Arc<LpgStore>>>,
325}
326
327impl LpgStore {
328 #[must_use]
330 pub fn new() -> Self {
331 Self::with_config(LpgStoreConfig::default())
332 }
333
334 #[must_use]
336 pub fn with_config(config: LpgStoreConfig) -> Self {
337 let backward_adj = if config.backward_edges {
338 Some(ChunkedAdjacency::new())
339 } else {
340 None
341 };
342
343 Self {
344 #[cfg(not(feature = "tiered-storage"))]
345 nodes: RwLock::new(FxHashMap::default()),
346 #[cfg(not(feature = "tiered-storage"))]
347 edges: RwLock::new(FxHashMap::default()),
348 #[cfg(feature = "tiered-storage")]
349 arena_allocator: Arc::new(ArenaAllocator::new()),
350 #[cfg(feature = "tiered-storage")]
351 node_versions: RwLock::new(FxHashMap::default()),
352 #[cfg(feature = "tiered-storage")]
353 edge_versions: RwLock::new(FxHashMap::default()),
354 #[cfg(feature = "tiered-storage")]
355 epoch_store: Arc::new(EpochStore::new()),
356 node_properties: PropertyStorage::new(),
357 edge_properties: PropertyStorage::new(),
358 label_to_id: RwLock::new(FxHashMap::default()),
359 id_to_label: RwLock::new(Vec::new()),
360 edge_type_to_id: RwLock::new(FxHashMap::default()),
361 id_to_edge_type: RwLock::new(Vec::new()),
362 forward_adj: ChunkedAdjacency::new(),
363 backward_adj,
364 label_index: RwLock::new(Vec::new()),
365 node_labels: RwLock::new(FxHashMap::default()),
366 property_indexes: RwLock::new(FxHashMap::default()),
367 #[cfg(feature = "vector-index")]
368 vector_indexes: RwLock::new(FxHashMap::default()),
369 #[cfg(feature = "text-index")]
370 text_indexes: RwLock::new(FxHashMap::default()),
371 next_node_id: AtomicU64::new(0),
372 next_edge_id: AtomicU64::new(0),
373 current_epoch: AtomicU64::new(0),
374 live_node_count: AtomicI64::new(0),
375 live_edge_count: AtomicI64::new(0),
376 edge_type_live_counts: RwLock::new(Vec::new()),
377 statistics: RwLock::new(Arc::new(Statistics::new())),
378 needs_stats_recompute: AtomicBool::new(false),
379 named_graphs: RwLock::new(FxHashMap::default()),
380 config,
381 }
382 }
383
384 #[must_use]
386 pub fn current_epoch(&self) -> EpochId {
387 EpochId::new(self.current_epoch.load(Ordering::Acquire))
388 }
389
390 #[doc(hidden)]
392 pub fn new_epoch(&self) -> EpochId {
393 let id = self.current_epoch.fetch_add(1, Ordering::AcqRel) + 1;
394 EpochId::new(id)
395 }
396
397 #[doc(hidden)]
402 pub fn sync_epoch(&self, epoch: EpochId) {
403 self.current_epoch
404 .fetch_max(epoch.as_u64(), Ordering::AcqRel);
405 }
406
407 pub fn clear(&self) {
412 #[cfg(not(feature = "tiered-storage"))]
414 {
415 self.nodes.write().clear();
416 self.edges.write().clear();
417 }
418 #[cfg(feature = "tiered-storage")]
419 {
420 self.node_versions.write().clear();
421 self.edge_versions.write().clear();
422 }
424
425 {
427 self.label_to_id.write().clear();
428 self.id_to_label.write().clear();
429 }
430 {
431 self.edge_type_to_id.write().clear();
432 self.id_to_edge_type.write().clear();
433 }
434
435 self.label_index.write().clear();
437 self.node_labels.write().clear();
438 self.property_indexes.write().clear();
439 #[cfg(feature = "vector-index")]
440 self.vector_indexes.write().clear();
441 #[cfg(feature = "text-index")]
442 self.text_indexes.write().clear();
443
444 self.node_properties.clear();
446 self.edge_properties.clear();
447 self.forward_adj.clear();
448 if let Some(ref backward) = self.backward_adj {
449 backward.clear();
450 }
451
452 self.next_node_id.store(0, Ordering::Release);
454 self.next_edge_id.store(0, Ordering::Release);
455 self.current_epoch.store(0, Ordering::Release);
456
457 self.live_node_count.store(0, Ordering::Release);
459 self.live_edge_count.store(0, Ordering::Release);
460 self.edge_type_live_counts.write().clear();
461 *self.statistics.write() = Arc::new(Statistics::new());
462 self.needs_stats_recompute.store(false, Ordering::Release);
463 }
464
465 #[must_use]
470 pub fn has_backward_adjacency(&self) -> bool {
471 self.backward_adj.is_some()
472 }
473
474 #[must_use]
478 pub fn graph(&self, name: &str) -> Option<Arc<LpgStore>> {
479 self.named_graphs.read().get(name).cloned()
480 }
481
482 pub fn graph_or_create(&self, name: &str) -> Arc<LpgStore> {
484 {
485 let graphs = self.named_graphs.read();
486 if let Some(g) = graphs.get(name) {
487 return Arc::clone(g);
488 }
489 }
490 let mut graphs = self.named_graphs.write();
491 graphs
493 .entry(name.to_string())
494 .or_insert_with(|| Arc::new(LpgStore::new()))
495 .clone()
496 }
497
498 pub fn create_graph(&self, name: &str) -> bool {
500 let mut graphs = self.named_graphs.write();
501 if graphs.contains_key(name) {
502 return false;
503 }
504 graphs.insert(name.to_string(), Arc::new(LpgStore::new()));
505 true
506 }
507
508 pub fn drop_graph(&self, name: &str) -> bool {
510 self.named_graphs.write().remove(name).is_some()
511 }
512
513 #[must_use]
515 pub fn graph_names(&self) -> Vec<String> {
516 self.named_graphs.read().keys().cloned().collect()
517 }
518
519 #[must_use]
521 pub fn graph_count(&self) -> usize {
522 self.named_graphs.read().len()
523 }
524
525 pub fn clear_graph(&self, name: Option<&str>) {
527 match name {
528 Some(n) => {
529 if let Some(g) = self.named_graphs.read().get(n) {
530 g.clear();
531 }
532 }
533 None => self.clear(),
534 }
535 }
536
537 pub fn copy_graph(&self, source: Option<&str>, dest: Option<&str>) {
540 let _src = match source {
541 Some(n) => self.graph(n),
542 None => None, };
544 let _dest_graph = match dest {
545 Some(n) => Some(self.graph_or_create(n)),
546 None => None, };
548 }
552
553 pub(super) fn get_or_create_label_id(&self, label: &str) -> u32 {
556 {
557 let label_to_id = self.label_to_id.read();
558 if let Some(&id) = label_to_id.get(label) {
559 return id;
560 }
561 }
562
563 let mut label_to_id = self.label_to_id.write();
564 let mut id_to_label = self.id_to_label.write();
565
566 if let Some(&id) = label_to_id.get(label) {
568 return id;
569 }
570
571 let id = id_to_label.len() as u32;
572
573 let label: ArcStr = label.into();
574 label_to_id.insert(label.clone(), id);
575 id_to_label.push(label);
576
577 id
578 }
579
580 pub(super) fn get_or_create_edge_type_id(&self, edge_type: &str) -> u32 {
581 {
582 let type_to_id = self.edge_type_to_id.read();
583 if let Some(&id) = type_to_id.get(edge_type) {
584 return id;
585 }
586 }
587
588 let mut type_to_id = self.edge_type_to_id.write();
589 let mut id_to_type = self.id_to_edge_type.write();
590
591 if let Some(&id) = type_to_id.get(edge_type) {
593 return id;
594 }
595
596 let id = id_to_type.len() as u32;
597 let edge_type: ArcStr = edge_type.into();
598 type_to_id.insert(edge_type.clone(), id);
599 id_to_type.push(edge_type);
600
601 let mut counts = self.edge_type_live_counts.write();
603 while counts.len() <= id as usize {
604 counts.push(0);
605 }
606
607 id
608 }
609
610 pub(super) fn increment_edge_type_count(&self, type_id: u32) {
612 let mut counts = self.edge_type_live_counts.write();
613 if counts.len() <= type_id as usize {
614 counts.resize(type_id as usize + 1, 0);
615 }
616 counts[type_id as usize] += 1;
617 }
618
619 pub(super) fn decrement_edge_type_count(&self, type_id: u32) {
621 let mut counts = self.edge_type_live_counts.write();
622 if type_id < counts.len() as u32 {
623 counts[type_id as usize] -= 1;
624 }
625 }
626}
627
628impl Default for LpgStore {
629 fn default() -> Self {
630 Self::new()
631 }
632}