1use super::property::CompareOp;
13use super::{Edge, EdgeRecord, Node, NodeRecord, PropertyStorage};
14use crate::graph::Direction;
15use crate::index::adjacency::ChunkedAdjacency;
16use crate::index::zone_map::ZoneMapEntry;
17use crate::statistics::{EdgeTypeStatistics, LabelStatistics, Statistics};
18use arcstr::ArcStr;
19use dashmap::DashMap;
20#[cfg(not(feature = "tiered-storage"))]
21use grafeo_common::mvcc::VersionChain;
22use grafeo_common::types::{EdgeId, EpochId, HashableValue, NodeId, PropertyKey, TxId, Value};
23use grafeo_common::utils::hash::{FxHashMap, FxHashSet};
24use parking_lot::RwLock;
25use std::cmp::Ordering as CmpOrdering;
26#[cfg(any(feature = "tiered-storage", feature = "vector-index"))]
27use std::sync::Arc;
28use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
29
30#[cfg(feature = "vector-index")]
31use crate::index::vector::HnswIndex;
32
33fn compare_values_for_range(a: &Value, b: &Value) -> Option<CmpOrdering> {
35 match (a, b) {
36 (Value::Int64(a), Value::Int64(b)) => Some(a.cmp(b)),
37 (Value::Float64(a), Value::Float64(b)) => a.partial_cmp(b),
38 (Value::String(a), Value::String(b)) => Some(a.cmp(b)),
39 (Value::Bool(a), Value::Bool(b)) => Some(a.cmp(b)),
40 _ => None,
41 }
42}
43
44fn value_in_range(
46 value: &Value,
47 min: Option<&Value>,
48 max: Option<&Value>,
49 min_inclusive: bool,
50 max_inclusive: bool,
51) -> bool {
52 if let Some(min_val) = min {
54 match compare_values_for_range(value, min_val) {
55 Some(CmpOrdering::Less) => return false,
56 Some(CmpOrdering::Equal) if !min_inclusive => return false,
57 None => return false, _ => {}
59 }
60 }
61
62 if let Some(max_val) = max {
64 match compare_values_for_range(value, max_val) {
65 Some(CmpOrdering::Greater) => return false,
66 Some(CmpOrdering::Equal) if !max_inclusive => return false,
67 None => return false,
68 _ => {}
69 }
70 }
71
72 true
73}
74
75#[cfg(feature = "tiered-storage")]
77use crate::storage::EpochStore;
78#[cfg(feature = "tiered-storage")]
79use grafeo_common::memory::arena::ArenaAllocator;
80#[cfg(feature = "tiered-storage")]
81use grafeo_common::mvcc::{ColdVersionRef, HotVersionRef, VersionIndex, VersionRef};
82
83#[derive(Debug, Clone)]
89pub struct LpgStoreConfig {
90 pub backward_edges: bool,
93 pub initial_node_capacity: usize,
95 pub initial_edge_capacity: usize,
97}
98
99impl Default for LpgStoreConfig {
100 fn default() -> Self {
101 Self {
102 backward_edges: true,
103 initial_node_capacity: 1024,
104 initial_edge_capacity: 4096,
105 }
106 }
107}
108
109pub struct LpgStore {
168 #[allow(dead_code)]
170 config: LpgStoreConfig,
171
172 #[cfg(not(feature = "tiered-storage"))]
176 nodes: RwLock<FxHashMap<NodeId, VersionChain<NodeRecord>>>,
177
178 #[cfg(not(feature = "tiered-storage"))]
182 edges: RwLock<FxHashMap<EdgeId, VersionChain<EdgeRecord>>>,
183
184 #[cfg(feature = "tiered-storage")]
198 arena_allocator: Arc<ArenaAllocator>,
199
200 #[cfg(feature = "tiered-storage")]
204 node_versions: RwLock<FxHashMap<NodeId, VersionIndex>>,
205
206 #[cfg(feature = "tiered-storage")]
210 edge_versions: RwLock<FxHashMap<EdgeId, VersionIndex>>,
211
212 #[cfg(feature = "tiered-storage")]
215 epoch_store: Arc<EpochStore>,
216
217 node_properties: PropertyStorage<NodeId>,
219
220 edge_properties: PropertyStorage<EdgeId>,
222
223 label_to_id: RwLock<FxHashMap<ArcStr, u32>>,
226
227 id_to_label: RwLock<Vec<ArcStr>>,
230
231 edge_type_to_id: RwLock<FxHashMap<ArcStr, u32>>,
234
235 id_to_edge_type: RwLock<Vec<ArcStr>>,
238
239 forward_adj: ChunkedAdjacency,
241
242 backward_adj: Option<ChunkedAdjacency>,
245
246 label_index: RwLock<Vec<FxHashMap<NodeId, ()>>>,
249
250 node_labels: RwLock<FxHashMap<NodeId, FxHashSet<u32>>>,
254
255 property_indexes: RwLock<FxHashMap<PropertyKey, DashMap<HashableValue, FxHashSet<NodeId>>>>,
261
262 #[cfg(feature = "vector-index")]
267 vector_indexes: RwLock<FxHashMap<String, Arc<HnswIndex>>>,
268
269 next_node_id: AtomicU64,
271
272 next_edge_id: AtomicU64,
274
275 current_epoch: AtomicU64,
277
278 statistics: RwLock<Statistics>,
281
282 needs_stats_recompute: AtomicBool,
284}
285
286impl LpgStore {
287 #[must_use]
289 pub fn new() -> Self {
290 Self::with_config(LpgStoreConfig::default())
291 }
292
293 #[must_use]
295 pub fn with_config(config: LpgStoreConfig) -> Self {
296 let backward_adj = if config.backward_edges {
297 Some(ChunkedAdjacency::new())
298 } else {
299 None
300 };
301
302 Self {
303 #[cfg(not(feature = "tiered-storage"))]
304 nodes: RwLock::new(FxHashMap::default()),
305 #[cfg(not(feature = "tiered-storage"))]
306 edges: RwLock::new(FxHashMap::default()),
307 #[cfg(feature = "tiered-storage")]
308 arena_allocator: Arc::new(ArenaAllocator::new()),
309 #[cfg(feature = "tiered-storage")]
310 node_versions: RwLock::new(FxHashMap::default()),
311 #[cfg(feature = "tiered-storage")]
312 edge_versions: RwLock::new(FxHashMap::default()),
313 #[cfg(feature = "tiered-storage")]
314 epoch_store: Arc::new(EpochStore::new()),
315 node_properties: PropertyStorage::new(),
316 edge_properties: PropertyStorage::new(),
317 label_to_id: RwLock::new(FxHashMap::default()),
318 id_to_label: RwLock::new(Vec::new()),
319 edge_type_to_id: RwLock::new(FxHashMap::default()),
320 id_to_edge_type: RwLock::new(Vec::new()),
321 forward_adj: ChunkedAdjacency::new(),
322 backward_adj,
323 label_index: RwLock::new(Vec::new()),
324 node_labels: RwLock::new(FxHashMap::default()),
325 property_indexes: RwLock::new(FxHashMap::default()),
326 #[cfg(feature = "vector-index")]
327 vector_indexes: RwLock::new(FxHashMap::default()),
328 next_node_id: AtomicU64::new(0),
329 next_edge_id: AtomicU64::new(0),
330 current_epoch: AtomicU64::new(0),
331 statistics: RwLock::new(Statistics::new()),
332 needs_stats_recompute: AtomicBool::new(true),
333 config,
334 }
335 }
336
337 #[must_use]
339 pub fn current_epoch(&self) -> EpochId {
340 EpochId::new(self.current_epoch.load(Ordering::Acquire))
341 }
342
343 pub fn new_epoch(&self) -> EpochId {
345 let id = self.current_epoch.fetch_add(1, Ordering::AcqRel) + 1;
346 EpochId::new(id)
347 }
348
349 pub fn create_node(&self, labels: &[&str]) -> NodeId {
355 self.needs_stats_recompute.store(true, Ordering::Relaxed);
356 self.create_node_versioned(labels, self.current_epoch(), TxId::SYSTEM)
357 }
358
359 #[cfg(not(feature = "tiered-storage"))]
361 pub fn create_node_versioned(&self, labels: &[&str], epoch: EpochId, tx_id: TxId) -> NodeId {
362 let id = NodeId::new(self.next_node_id.fetch_add(1, Ordering::Relaxed));
363
364 let mut record = NodeRecord::new(id, epoch);
365 record.set_label_count(labels.len() as u16);
366
367 let mut node_label_set = FxHashSet::default();
369 for label in labels {
370 let label_id = self.get_or_create_label_id(*label);
371 node_label_set.insert(label_id);
372
373 let mut index = self.label_index.write();
375 while index.len() <= label_id as usize {
376 index.push(FxHashMap::default());
377 }
378 index[label_id as usize].insert(id, ());
379 }
380
381 self.node_labels.write().insert(id, node_label_set);
383
384 let chain = VersionChain::with_initial(record, epoch, tx_id);
386 self.nodes.write().insert(id, chain);
387 id
388 }
389
390 #[cfg(feature = "tiered-storage")]
393 pub fn create_node_versioned(&self, labels: &[&str], epoch: EpochId, tx_id: TxId) -> NodeId {
394 let id = NodeId::new(self.next_node_id.fetch_add(1, Ordering::Relaxed));
395
396 let mut record = NodeRecord::new(id, epoch);
397 record.set_label_count(labels.len() as u16);
398
399 let mut node_label_set = FxHashSet::default();
401 for label in labels {
402 let label_id = self.get_or_create_label_id(*label);
403 node_label_set.insert(label_id);
404
405 let mut index = self.label_index.write();
407 while index.len() <= label_id as usize {
408 index.push(FxHashMap::default());
409 }
410 index[label_id as usize].insert(id, ());
411 }
412
413 self.node_labels.write().insert(id, node_label_set);
415
416 let arena = self.arena_allocator.arena_or_create(epoch);
418 let (offset, _stored) = arena.alloc_value_with_offset(record);
419
420 let hot_ref = HotVersionRef::new(epoch, offset, tx_id);
422
423 let mut versions = self.node_versions.write();
425 if let Some(index) = versions.get_mut(&id) {
426 index.add_hot(hot_ref);
427 } else {
428 versions.insert(id, VersionIndex::with_initial(hot_ref));
429 }
430
431 id
432 }
433
434 pub fn create_node_with_props(
436 &self,
437 labels: &[&str],
438 properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
439 ) -> NodeId {
440 self.create_node_with_props_versioned(
441 labels,
442 properties,
443 self.current_epoch(),
444 TxId::SYSTEM,
445 )
446 }
447
448 #[cfg(not(feature = "tiered-storage"))]
450 pub fn create_node_with_props_versioned(
451 &self,
452 labels: &[&str],
453 properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
454 epoch: EpochId,
455 tx_id: TxId,
456 ) -> NodeId {
457 let id = self.create_node_versioned(labels, epoch, tx_id);
458
459 for (key, value) in properties {
460 let prop_key: PropertyKey = key.into();
461 let prop_value: Value = value.into();
462 self.update_property_index_on_set(id, &prop_key, &prop_value);
464 self.node_properties.set(id, prop_key, prop_value);
465 }
466
467 let count = self.node_properties.get_all(id).len() as u16;
469 if let Some(chain) = self.nodes.write().get_mut(&id) {
470 if let Some(record) = chain.latest_mut() {
471 record.props_count = count;
472 }
473 }
474
475 id
476 }
477
478 #[cfg(feature = "tiered-storage")]
481 pub fn create_node_with_props_versioned(
482 &self,
483 labels: &[&str],
484 properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
485 epoch: EpochId,
486 tx_id: TxId,
487 ) -> NodeId {
488 let id = self.create_node_versioned(labels, epoch, tx_id);
489
490 for (key, value) in properties {
491 let prop_key: PropertyKey = key.into();
492 let prop_value: Value = value.into();
493 self.update_property_index_on_set(id, &prop_key, &prop_value);
495 self.node_properties.set(id, prop_key, prop_value);
496 }
497
498 id
502 }
503
504 #[must_use]
506 pub fn get_node(&self, id: NodeId) -> Option<Node> {
507 self.get_node_at_epoch(id, self.current_epoch())
508 }
509
510 #[must_use]
512 #[cfg(not(feature = "tiered-storage"))]
513 pub fn get_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> Option<Node> {
514 let nodes = self.nodes.read();
515 let chain = nodes.get(&id)?;
516 let record = chain.visible_at(epoch)?;
517
518 if record.is_deleted() {
519 return None;
520 }
521
522 let mut node = Node::new(id);
523
524 let id_to_label = self.id_to_label.read();
526 let node_labels = self.node_labels.read();
527 if let Some(label_ids) = node_labels.get(&id) {
528 for &label_id in label_ids {
529 if let Some(label) = id_to_label.get(label_id as usize) {
530 node.labels.push(label.clone());
531 }
532 }
533 }
534
535 node.properties = self.node_properties.get_all(id).into_iter().collect();
537
538 Some(node)
539 }
540
541 #[must_use]
544 #[cfg(feature = "tiered-storage")]
545 pub fn get_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> Option<Node> {
546 let versions = self.node_versions.read();
547 let index = versions.get(&id)?;
548 let version_ref = index.visible_at(epoch)?;
549
550 let record = self.read_node_record(&version_ref)?;
552
553 if record.is_deleted() {
554 return None;
555 }
556
557 let mut node = Node::new(id);
558
559 let id_to_label = self.id_to_label.read();
561 let node_labels = self.node_labels.read();
562 if let Some(label_ids) = node_labels.get(&id) {
563 for &label_id in label_ids {
564 if let Some(label) = id_to_label.get(label_id as usize) {
565 node.labels.push(label.clone());
566 }
567 }
568 }
569
570 node.properties = self.node_properties.get_all(id).into_iter().collect();
572
573 Some(node)
574 }
575
576 #[must_use]
578 #[cfg(not(feature = "tiered-storage"))]
579 pub fn get_node_versioned(&self, id: NodeId, epoch: EpochId, tx_id: TxId) -> Option<Node> {
580 let nodes = self.nodes.read();
581 let chain = nodes.get(&id)?;
582 let record = chain.visible_to(epoch, tx_id)?;
583
584 if record.is_deleted() {
585 return None;
586 }
587
588 let mut node = Node::new(id);
589
590 let id_to_label = self.id_to_label.read();
592 let node_labels = self.node_labels.read();
593 if let Some(label_ids) = node_labels.get(&id) {
594 for &label_id in label_ids {
595 if let Some(label) = id_to_label.get(label_id as usize) {
596 node.labels.push(label.clone());
597 }
598 }
599 }
600
601 node.properties = self.node_properties.get_all(id).into_iter().collect();
603
604 Some(node)
605 }
606
607 #[must_use]
610 #[cfg(feature = "tiered-storage")]
611 pub fn get_node_versioned(&self, id: NodeId, epoch: EpochId, tx_id: TxId) -> Option<Node> {
612 let versions = self.node_versions.read();
613 let index = versions.get(&id)?;
614 let version_ref = index.visible_to(epoch, tx_id)?;
615
616 let record = self.read_node_record(&version_ref)?;
618
619 if record.is_deleted() {
620 return None;
621 }
622
623 let mut node = Node::new(id);
624
625 let id_to_label = self.id_to_label.read();
627 let node_labels = self.node_labels.read();
628 if let Some(label_ids) = node_labels.get(&id) {
629 for &label_id in label_ids {
630 if let Some(label) = id_to_label.get(label_id as usize) {
631 node.labels.push(label.clone());
632 }
633 }
634 }
635
636 node.properties = self.node_properties.get_all(id).into_iter().collect();
638
639 Some(node)
640 }
641
642 #[cfg(feature = "tiered-storage")]
644 #[allow(unsafe_code)]
645 fn read_node_record(&self, version_ref: &VersionRef) -> Option<NodeRecord> {
646 match version_ref {
647 VersionRef::Hot(hot_ref) => {
648 let arena = self.arena_allocator.arena(hot_ref.epoch);
649 let record: &NodeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
651 Some(*record)
652 }
653 VersionRef::Cold(cold_ref) => {
654 self.epoch_store
656 .get_node(cold_ref.epoch, cold_ref.block_offset, cold_ref.length)
657 }
658 }
659 }
660
661 pub fn delete_node(&self, id: NodeId) -> bool {
663 self.needs_stats_recompute.store(true, Ordering::Relaxed);
664 self.delete_node_at_epoch(id, self.current_epoch())
665 }
666
667 #[cfg(not(feature = "tiered-storage"))]
669 pub fn delete_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> bool {
670 let mut nodes = self.nodes.write();
671 if let Some(chain) = nodes.get_mut(&id) {
672 if let Some(record) = chain.visible_at(epoch) {
674 if record.is_deleted() {
675 return false;
676 }
677 } else {
678 return false;
680 }
681
682 chain.mark_deleted(epoch);
684
685 let mut index = self.label_index.write();
687 let mut node_labels = self.node_labels.write();
688 if let Some(label_ids) = node_labels.remove(&id) {
689 for label_id in label_ids {
690 if let Some(set) = index.get_mut(label_id as usize) {
691 set.remove(&id);
692 }
693 }
694 }
695
696 drop(nodes); drop(index);
699 drop(node_labels);
700 self.node_properties.remove_all(id);
701
702 true
705 } else {
706 false
707 }
708 }
709
710 #[cfg(feature = "tiered-storage")]
713 pub fn delete_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> bool {
714 let mut versions = self.node_versions.write();
715 if let Some(index) = versions.get_mut(&id) {
716 if let Some(version_ref) = index.visible_at(epoch) {
718 if let Some(record) = self.read_node_record(&version_ref) {
719 if record.is_deleted() {
720 return false;
721 }
722 } else {
723 return false;
724 }
725 } else {
726 return false;
727 }
728
729 index.mark_deleted(epoch);
731
732 let mut label_index = self.label_index.write();
734 let mut node_labels = self.node_labels.write();
735 if let Some(label_ids) = node_labels.remove(&id) {
736 for label_id in label_ids {
737 if let Some(set) = label_index.get_mut(label_id as usize) {
738 set.remove(&id);
739 }
740 }
741 }
742
743 drop(versions);
745 drop(label_index);
746 drop(node_labels);
747 self.node_properties.remove_all(id);
748
749 true
750 } else {
751 false
752 }
753 }
754
755 #[cfg(not(feature = "tiered-storage"))]
760 pub fn delete_node_edges(&self, node_id: NodeId) {
761 let outgoing: Vec<EdgeId> = self
763 .forward_adj
764 .edges_from(node_id)
765 .into_iter()
766 .map(|(_, edge_id)| edge_id)
767 .collect();
768
769 let incoming: Vec<EdgeId> = if let Some(ref backward) = self.backward_adj {
771 backward
772 .edges_from(node_id)
773 .into_iter()
774 .map(|(_, edge_id)| edge_id)
775 .collect()
776 } else {
777 let epoch = self.current_epoch();
779 self.edges
780 .read()
781 .iter()
782 .filter_map(|(id, chain)| {
783 chain.visible_at(epoch).and_then(|r| {
784 if !r.is_deleted() && r.dst == node_id {
785 Some(*id)
786 } else {
787 None
788 }
789 })
790 })
791 .collect()
792 };
793
794 for edge_id in outgoing.into_iter().chain(incoming) {
796 self.delete_edge(edge_id);
797 }
798 }
799
800 #[cfg(feature = "tiered-storage")]
803 pub fn delete_node_edges(&self, node_id: NodeId) {
804 let outgoing: Vec<EdgeId> = self
806 .forward_adj
807 .edges_from(node_id)
808 .into_iter()
809 .map(|(_, edge_id)| edge_id)
810 .collect();
811
812 let incoming: Vec<EdgeId> = if let Some(ref backward) = self.backward_adj {
814 backward
815 .edges_from(node_id)
816 .into_iter()
817 .map(|(_, edge_id)| edge_id)
818 .collect()
819 } else {
820 let epoch = self.current_epoch();
822 let versions = self.edge_versions.read();
823 versions
824 .iter()
825 .filter_map(|(id, index)| {
826 index.visible_at(epoch).and_then(|vref| {
827 self.read_edge_record(&vref).and_then(|r| {
828 if !r.is_deleted() && r.dst == node_id {
829 Some(*id)
830 } else {
831 None
832 }
833 })
834 })
835 })
836 .collect()
837 };
838
839 for edge_id in outgoing.into_iter().chain(incoming) {
841 self.delete_edge(edge_id);
842 }
843 }
844
845 #[cfg(not(feature = "tiered-storage"))]
847 pub fn set_node_property(&self, id: NodeId, key: &str, value: Value) {
848 let prop_key: PropertyKey = key.into();
849
850 self.update_property_index_on_set(id, &prop_key, &value);
852
853 self.node_properties.set(id, prop_key, value);
854
855 let count = self.node_properties.get_all(id).len() as u16;
857 if let Some(chain) = self.nodes.write().get_mut(&id) {
858 if let Some(record) = chain.latest_mut() {
859 record.props_count = count;
860 }
861 }
862 }
863
864 #[cfg(feature = "tiered-storage")]
867 pub fn set_node_property(&self, id: NodeId, key: &str, value: Value) {
868 let prop_key: PropertyKey = key.into();
869
870 self.update_property_index_on_set(id, &prop_key, &value);
872
873 self.node_properties.set(id, prop_key, value);
874 }
878
879 pub fn set_edge_property(&self, id: EdgeId, key: &str, value: Value) {
881 self.edge_properties.set(id, key.into(), value);
882 }
883
884 #[cfg(not(feature = "tiered-storage"))]
888 pub fn remove_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
889 let prop_key: PropertyKey = key.into();
890
891 self.update_property_index_on_remove(id, &prop_key);
893
894 let result = self.node_properties.remove(id, &prop_key);
895
896 let count = self.node_properties.get_all(id).len() as u16;
898 if let Some(chain) = self.nodes.write().get_mut(&id) {
899 if let Some(record) = chain.latest_mut() {
900 record.props_count = count;
901 }
902 }
903
904 result
905 }
906
907 #[cfg(feature = "tiered-storage")]
910 pub fn remove_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
911 let prop_key: PropertyKey = key.into();
912
913 self.update_property_index_on_remove(id, &prop_key);
915
916 self.node_properties.remove(id, &prop_key)
917 }
919
920 pub fn remove_edge_property(&self, id: EdgeId, key: &str) -> Option<Value> {
924 self.edge_properties.remove(id, &key.into())
925 }
926
927 #[must_use]
942 pub fn get_node_property(&self, id: NodeId, key: &PropertyKey) -> Option<Value> {
943 self.node_properties.get(id, key)
944 }
945
946 #[must_use]
950 pub fn get_edge_property(&self, id: EdgeId, key: &PropertyKey) -> Option<Value> {
951 self.edge_properties.get(id, key)
952 }
953
954 #[must_use]
977 pub fn get_node_property_batch(&self, ids: &[NodeId], key: &PropertyKey) -> Vec<Option<Value>> {
978 self.node_properties.get_batch(ids, key)
979 }
980
981 #[must_use]
986 pub fn get_nodes_properties_batch(&self, ids: &[NodeId]) -> Vec<FxHashMap<PropertyKey, Value>> {
987 self.node_properties.get_all_batch(ids)
988 }
989
990 #[must_use]
1018 pub fn get_nodes_properties_selective_batch(
1019 &self,
1020 ids: &[NodeId],
1021 keys: &[PropertyKey],
1022 ) -> Vec<FxHashMap<PropertyKey, Value>> {
1023 self.node_properties.get_selective_batch(ids, keys)
1024 }
1025
1026 #[must_use]
1030 pub fn get_edges_properties_selective_batch(
1031 &self,
1032 ids: &[EdgeId],
1033 keys: &[PropertyKey],
1034 ) -> Vec<FxHashMap<PropertyKey, Value>> {
1035 self.edge_properties.get_selective_batch(ids, keys)
1036 }
1037
1038 #[must_use]
1074 pub fn find_nodes_in_range(
1075 &self,
1076 property: &str,
1077 min: Option<&Value>,
1078 max: Option<&Value>,
1079 min_inclusive: bool,
1080 max_inclusive: bool,
1081 ) -> Vec<NodeId> {
1082 let key = PropertyKey::new(property);
1083
1084 if !self
1086 .node_properties
1087 .might_match_range(&key, min, max, min_inclusive, max_inclusive)
1088 {
1089 return Vec::new();
1090 }
1091
1092 self.node_ids()
1094 .into_iter()
1095 .filter(|&node_id| {
1096 self.node_properties
1097 .get(node_id, &key)
1098 .is_some_and(|v| value_in_range(&v, min, max, min_inclusive, max_inclusive))
1099 })
1100 .collect()
1101 }
1102
1103 #[must_use]
1128 pub fn find_nodes_by_properties(&self, conditions: &[(&str, Value)]) -> Vec<NodeId> {
1129 if conditions.is_empty() {
1130 return self.node_ids();
1131 }
1132
1133 let mut best_start: Option<(usize, Vec<NodeId>)> = None;
1136 let indexes = self.property_indexes.read();
1137
1138 for (i, (prop, value)) in conditions.iter().enumerate() {
1139 let key = PropertyKey::new(*prop);
1140 let hv = HashableValue::new(value.clone());
1141
1142 if let Some(index) = indexes.get(&key) {
1143 let matches: Vec<NodeId> = index
1144 .get(&hv)
1145 .map(|nodes| nodes.iter().copied().collect())
1146 .unwrap_or_default();
1147
1148 if matches.is_empty() {
1150 return Vec::new();
1151 }
1152
1153 if best_start
1155 .as_ref()
1156 .is_none_or(|(_, best)| matches.len() < best.len())
1157 {
1158 best_start = Some((i, matches));
1159 }
1160 }
1161 }
1162 drop(indexes);
1163
1164 let (start_idx, mut candidates) = best_start.unwrap_or_else(|| {
1166 let (prop, value) = &conditions[0];
1168 (0, self.find_nodes_by_property(prop, value))
1169 });
1170
1171 for (i, (prop, value)) in conditions.iter().enumerate() {
1173 if i == start_idx {
1174 continue;
1175 }
1176
1177 let key = PropertyKey::new(*prop);
1178 candidates.retain(|&node_id| {
1179 self.node_properties
1180 .get(node_id, &key)
1181 .is_some_and(|v| v == *value)
1182 });
1183
1184 if candidates.is_empty() {
1186 return Vec::new();
1187 }
1188 }
1189
1190 candidates
1191 }
1192
1193 pub fn create_property_index(&self, property: &str) {
1221 let key = PropertyKey::new(property);
1222
1223 let mut indexes = self.property_indexes.write();
1224 if indexes.contains_key(&key) {
1225 return; }
1227
1228 let index: DashMap<HashableValue, FxHashSet<NodeId>> = DashMap::new();
1230
1231 for node_id in self.node_ids() {
1233 if let Some(value) = self.node_properties.get(node_id, &key) {
1234 let hv = HashableValue::new(value);
1235 index.entry(hv).or_default().insert(node_id);
1236 }
1237 }
1238
1239 indexes.insert(key, index);
1240 }
1241
1242 pub fn drop_property_index(&self, property: &str) -> bool {
1246 let key = PropertyKey::new(property);
1247 self.property_indexes.write().remove(&key).is_some()
1248 }
1249
1250 #[must_use]
1252 pub fn has_property_index(&self, property: &str) -> bool {
1253 let key = PropertyKey::new(property);
1254 self.property_indexes.read().contains_key(&key)
1255 }
1256
1257 #[cfg(feature = "vector-index")]
1259 pub fn add_vector_index(&self, label: &str, property: &str, index: Arc<HnswIndex>) {
1260 let key = format!("{label}:{property}");
1261 self.vector_indexes.write().insert(key, index);
1262 }
1263
1264 #[cfg(feature = "vector-index")]
1266 #[must_use]
1267 pub fn get_vector_index(&self, label: &str, property: &str) -> Option<Arc<HnswIndex>> {
1268 let key = format!("{label}:{property}");
1269 self.vector_indexes.read().get(&key).cloned()
1270 }
1271
1272 #[must_use]
1295 pub fn find_nodes_by_property(&self, property: &str, value: &Value) -> Vec<NodeId> {
1296 let key = PropertyKey::new(property);
1297 let hv = HashableValue::new(value.clone());
1298
1299 let indexes = self.property_indexes.read();
1301 if let Some(index) = indexes.get(&key) {
1302 if let Some(nodes) = index.get(&hv) {
1303 return nodes.iter().copied().collect();
1304 }
1305 return Vec::new();
1306 }
1307 drop(indexes);
1308
1309 self.node_ids()
1311 .into_iter()
1312 .filter(|&node_id| {
1313 self.node_properties
1314 .get(node_id, &key)
1315 .is_some_and(|v| v == *value)
1316 })
1317 .collect()
1318 }
1319
1320 fn update_property_index_on_set(&self, node_id: NodeId, key: &PropertyKey, new_value: &Value) {
1322 let indexes = self.property_indexes.read();
1323 if let Some(index) = indexes.get(key) {
1324 if let Some(old_value) = self.node_properties.get(node_id, key) {
1326 let old_hv = HashableValue::new(old_value);
1327 if let Some(mut nodes) = index.get_mut(&old_hv) {
1328 nodes.remove(&node_id);
1329 if nodes.is_empty() {
1330 drop(nodes);
1331 index.remove(&old_hv);
1332 }
1333 }
1334 }
1335
1336 let new_hv = HashableValue::new(new_value.clone());
1338 index
1339 .entry(new_hv)
1340 .or_insert_with(FxHashSet::default)
1341 .insert(node_id);
1342 }
1343 }
1344
1345 fn update_property_index_on_remove(&self, node_id: NodeId, key: &PropertyKey) {
1347 let indexes = self.property_indexes.read();
1348 if let Some(index) = indexes.get(key) {
1349 if let Some(old_value) = self.node_properties.get(node_id, key) {
1351 let old_hv = HashableValue::new(old_value);
1352 if let Some(mut nodes) = index.get_mut(&old_hv) {
1353 nodes.remove(&node_id);
1354 if nodes.is_empty() {
1355 drop(nodes);
1356 index.remove(&old_hv);
1357 }
1358 }
1359 }
1360 }
1361 }
1362
1363 #[cfg(not(feature = "tiered-storage"))]
1368 pub fn add_label(&self, node_id: NodeId, label: &str) -> bool {
1369 let epoch = self.current_epoch();
1370
1371 let nodes = self.nodes.read();
1373 if let Some(chain) = nodes.get(&node_id) {
1374 if chain.visible_at(epoch).map_or(true, |r| r.is_deleted()) {
1375 return false;
1376 }
1377 } else {
1378 return false;
1379 }
1380 drop(nodes);
1381
1382 let label_id = self.get_or_create_label_id(label);
1384
1385 let mut node_labels = self.node_labels.write();
1387 let label_set = node_labels
1388 .entry(node_id)
1389 .or_insert_with(FxHashSet::default);
1390
1391 if label_set.contains(&label_id) {
1392 return false; }
1394
1395 label_set.insert(label_id);
1396 drop(node_labels);
1397
1398 let mut index = self.label_index.write();
1400 if (label_id as usize) >= index.len() {
1401 index.resize(label_id as usize + 1, FxHashMap::default());
1402 }
1403 index[label_id as usize].insert(node_id, ());
1404
1405 if let Some(chain) = self.nodes.write().get_mut(&node_id) {
1407 if let Some(record) = chain.latest_mut() {
1408 let count = self.node_labels.read().get(&node_id).map_or(0, |s| s.len());
1409 record.set_label_count(count as u16);
1410 }
1411 }
1412
1413 true
1414 }
1415
1416 #[cfg(feature = "tiered-storage")]
1419 pub fn add_label(&self, node_id: NodeId, label: &str) -> bool {
1420 let epoch = self.current_epoch();
1421
1422 let versions = self.node_versions.read();
1424 if let Some(index) = versions.get(&node_id) {
1425 if let Some(vref) = index.visible_at(epoch) {
1426 if let Some(record) = self.read_node_record(&vref) {
1427 if record.is_deleted() {
1428 return false;
1429 }
1430 } else {
1431 return false;
1432 }
1433 } else {
1434 return false;
1435 }
1436 } else {
1437 return false;
1438 }
1439 drop(versions);
1440
1441 let label_id = self.get_or_create_label_id(label);
1443
1444 let mut node_labels = self.node_labels.write();
1446 let label_set = node_labels.entry(node_id).or_default();
1447
1448 if label_set.contains(&label_id) {
1449 return false; }
1451
1452 label_set.insert(label_id);
1453 drop(node_labels);
1454
1455 let mut index = self.label_index.write();
1457 if (label_id as usize) >= index.len() {
1458 index.resize(label_id as usize + 1, FxHashMap::default());
1459 }
1460 index[label_id as usize].insert(node_id, ());
1461
1462 true
1466 }
1467
1468 #[cfg(not(feature = "tiered-storage"))]
1473 pub fn remove_label(&self, node_id: NodeId, label: &str) -> bool {
1474 let epoch = self.current_epoch();
1475
1476 let nodes = self.nodes.read();
1478 if let Some(chain) = nodes.get(&node_id) {
1479 if chain.visible_at(epoch).map_or(true, |r| r.is_deleted()) {
1480 return false;
1481 }
1482 } else {
1483 return false;
1484 }
1485 drop(nodes);
1486
1487 let label_id = {
1489 let label_ids = self.label_to_id.read();
1490 match label_ids.get(label) {
1491 Some(&id) => id,
1492 None => return false, }
1494 };
1495
1496 let mut node_labels = self.node_labels.write();
1498 if let Some(label_set) = node_labels.get_mut(&node_id) {
1499 if !label_set.remove(&label_id) {
1500 return false; }
1502 } else {
1503 return false;
1504 }
1505 drop(node_labels);
1506
1507 let mut index = self.label_index.write();
1509 if (label_id as usize) < index.len() {
1510 index[label_id as usize].remove(&node_id);
1511 }
1512
1513 if let Some(chain) = self.nodes.write().get_mut(&node_id) {
1515 if let Some(record) = chain.latest_mut() {
1516 let count = self.node_labels.read().get(&node_id).map_or(0, |s| s.len());
1517 record.set_label_count(count as u16);
1518 }
1519 }
1520
1521 true
1522 }
1523
1524 #[cfg(feature = "tiered-storage")]
1527 pub fn remove_label(&self, node_id: NodeId, label: &str) -> bool {
1528 let epoch = self.current_epoch();
1529
1530 let versions = self.node_versions.read();
1532 if let Some(index) = versions.get(&node_id) {
1533 if let Some(vref) = index.visible_at(epoch) {
1534 if let Some(record) = self.read_node_record(&vref) {
1535 if record.is_deleted() {
1536 return false;
1537 }
1538 } else {
1539 return false;
1540 }
1541 } else {
1542 return false;
1543 }
1544 } else {
1545 return false;
1546 }
1547 drop(versions);
1548
1549 let label_id = {
1551 let label_ids = self.label_to_id.read();
1552 match label_ids.get(label) {
1553 Some(&id) => id,
1554 None => return false, }
1556 };
1557
1558 let mut node_labels = self.node_labels.write();
1560 if let Some(label_set) = node_labels.get_mut(&node_id) {
1561 if !label_set.remove(&label_id) {
1562 return false; }
1564 } else {
1565 return false;
1566 }
1567 drop(node_labels);
1568
1569 let mut index = self.label_index.write();
1571 if (label_id as usize) < index.len() {
1572 index[label_id as usize].remove(&node_id);
1573 }
1574
1575 true
1578 }
1579
1580 #[must_use]
1582 #[cfg(not(feature = "tiered-storage"))]
1583 pub fn node_count(&self) -> usize {
1584 let epoch = self.current_epoch();
1585 self.nodes
1586 .read()
1587 .values()
1588 .filter_map(|chain| chain.visible_at(epoch))
1589 .filter(|r| !r.is_deleted())
1590 .count()
1591 }
1592
1593 #[must_use]
1596 #[cfg(feature = "tiered-storage")]
1597 pub fn node_count(&self) -> usize {
1598 let epoch = self.current_epoch();
1599 let versions = self.node_versions.read();
1600 versions
1601 .iter()
1602 .filter(|(_, index)| {
1603 index.visible_at(epoch).map_or(false, |vref| {
1604 self.read_node_record(&vref)
1605 .map_or(false, |r| !r.is_deleted())
1606 })
1607 })
1608 .count()
1609 }
1610
1611 #[must_use]
1617 #[cfg(not(feature = "tiered-storage"))]
1618 pub fn node_ids(&self) -> Vec<NodeId> {
1619 let epoch = self.current_epoch();
1620 let mut ids: Vec<NodeId> = self
1621 .nodes
1622 .read()
1623 .iter()
1624 .filter_map(|(id, chain)| {
1625 chain
1626 .visible_at(epoch)
1627 .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
1628 })
1629 .collect();
1630 ids.sort_unstable();
1631 ids
1632 }
1633
1634 #[must_use]
1637 #[cfg(feature = "tiered-storage")]
1638 pub fn node_ids(&self) -> Vec<NodeId> {
1639 let epoch = self.current_epoch();
1640 let versions = self.node_versions.read();
1641 let mut ids: Vec<NodeId> = versions
1642 .iter()
1643 .filter_map(|(id, index)| {
1644 index.visible_at(epoch).and_then(|vref| {
1645 self.read_node_record(&vref)
1646 .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
1647 })
1648 })
1649 .collect();
1650 ids.sort_unstable();
1651 ids
1652 }
1653
1654 pub fn create_edge(&self, src: NodeId, dst: NodeId, edge_type: &str) -> EdgeId {
1658 self.needs_stats_recompute.store(true, Ordering::Relaxed);
1659 self.create_edge_versioned(src, dst, edge_type, self.current_epoch(), TxId::SYSTEM)
1660 }
1661
1662 #[cfg(not(feature = "tiered-storage"))]
1664 pub fn create_edge_versioned(
1665 &self,
1666 src: NodeId,
1667 dst: NodeId,
1668 edge_type: &str,
1669 epoch: EpochId,
1670 tx_id: TxId,
1671 ) -> EdgeId {
1672 let id = EdgeId::new(self.next_edge_id.fetch_add(1, Ordering::Relaxed));
1673 let type_id = self.get_or_create_edge_type_id(edge_type);
1674
1675 let record = EdgeRecord::new(id, src, dst, type_id, epoch);
1676 let chain = VersionChain::with_initial(record, epoch, tx_id);
1677 self.edges.write().insert(id, chain);
1678
1679 self.forward_adj.add_edge(src, dst, id);
1681 if let Some(ref backward) = self.backward_adj {
1682 backward.add_edge(dst, src, id);
1683 }
1684
1685 id
1686 }
1687
1688 #[cfg(feature = "tiered-storage")]
1691 pub fn create_edge_versioned(
1692 &self,
1693 src: NodeId,
1694 dst: NodeId,
1695 edge_type: &str,
1696 epoch: EpochId,
1697 tx_id: TxId,
1698 ) -> EdgeId {
1699 let id = EdgeId::new(self.next_edge_id.fetch_add(1, Ordering::Relaxed));
1700 let type_id = self.get_or_create_edge_type_id(edge_type);
1701
1702 let record = EdgeRecord::new(id, src, dst, type_id, epoch);
1703
1704 let arena = self.arena_allocator.arena_or_create(epoch);
1706 let (offset, _stored) = arena.alloc_value_with_offset(record);
1707
1708 let hot_ref = HotVersionRef::new(epoch, offset, tx_id);
1710
1711 let mut versions = self.edge_versions.write();
1713 if let Some(index) = versions.get_mut(&id) {
1714 index.add_hot(hot_ref);
1715 } else {
1716 versions.insert(id, VersionIndex::with_initial(hot_ref));
1717 }
1718
1719 self.forward_adj.add_edge(src, dst, id);
1721 if let Some(ref backward) = self.backward_adj {
1722 backward.add_edge(dst, src, id);
1723 }
1724
1725 id
1726 }
1727
1728 pub fn create_edge_with_props(
1730 &self,
1731 src: NodeId,
1732 dst: NodeId,
1733 edge_type: &str,
1734 properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
1735 ) -> EdgeId {
1736 let id = self.create_edge(src, dst, edge_type);
1737
1738 for (key, value) in properties {
1739 self.edge_properties.set(id, key.into(), value.into());
1740 }
1741
1742 id
1743 }
1744
1745 #[must_use]
1747 pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
1748 self.get_edge_at_epoch(id, self.current_epoch())
1749 }
1750
1751 #[must_use]
1753 #[cfg(not(feature = "tiered-storage"))]
1754 pub fn get_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> Option<Edge> {
1755 let edges = self.edges.read();
1756 let chain = edges.get(&id)?;
1757 let record = chain.visible_at(epoch)?;
1758
1759 if record.is_deleted() {
1760 return None;
1761 }
1762
1763 let edge_type = {
1764 let id_to_type = self.id_to_edge_type.read();
1765 id_to_type.get(record.type_id as usize)?.clone()
1766 };
1767
1768 let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1769
1770 edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1772
1773 Some(edge)
1774 }
1775
1776 #[must_use]
1779 #[cfg(feature = "tiered-storage")]
1780 pub fn get_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> Option<Edge> {
1781 let versions = self.edge_versions.read();
1782 let index = versions.get(&id)?;
1783 let version_ref = index.visible_at(epoch)?;
1784
1785 let record = self.read_edge_record(&version_ref)?;
1786
1787 if record.is_deleted() {
1788 return None;
1789 }
1790
1791 let edge_type = {
1792 let id_to_type = self.id_to_edge_type.read();
1793 id_to_type.get(record.type_id as usize)?.clone()
1794 };
1795
1796 let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1797
1798 edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1800
1801 Some(edge)
1802 }
1803
1804 #[must_use]
1806 #[cfg(not(feature = "tiered-storage"))]
1807 pub fn get_edge_versioned(&self, id: EdgeId, epoch: EpochId, tx_id: TxId) -> Option<Edge> {
1808 let edges = self.edges.read();
1809 let chain = edges.get(&id)?;
1810 let record = chain.visible_to(epoch, tx_id)?;
1811
1812 if record.is_deleted() {
1813 return None;
1814 }
1815
1816 let edge_type = {
1817 let id_to_type = self.id_to_edge_type.read();
1818 id_to_type.get(record.type_id as usize)?.clone()
1819 };
1820
1821 let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1822
1823 edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1825
1826 Some(edge)
1827 }
1828
1829 #[must_use]
1832 #[cfg(feature = "tiered-storage")]
1833 pub fn get_edge_versioned(&self, id: EdgeId, epoch: EpochId, tx_id: TxId) -> Option<Edge> {
1834 let versions = self.edge_versions.read();
1835 let index = versions.get(&id)?;
1836 let version_ref = index.visible_to(epoch, tx_id)?;
1837
1838 let record = self.read_edge_record(&version_ref)?;
1839
1840 if record.is_deleted() {
1841 return None;
1842 }
1843
1844 let edge_type = {
1845 let id_to_type = self.id_to_edge_type.read();
1846 id_to_type.get(record.type_id as usize)?.clone()
1847 };
1848
1849 let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1850
1851 edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1853
1854 Some(edge)
1855 }
1856
1857 #[cfg(feature = "tiered-storage")]
1859 #[allow(unsafe_code)]
1860 fn read_edge_record(&self, version_ref: &VersionRef) -> Option<EdgeRecord> {
1861 match version_ref {
1862 VersionRef::Hot(hot_ref) => {
1863 let arena = self.arena_allocator.arena(hot_ref.epoch);
1864 let record: &EdgeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
1866 Some(*record)
1867 }
1868 VersionRef::Cold(cold_ref) => {
1869 self.epoch_store
1871 .get_edge(cold_ref.epoch, cold_ref.block_offset, cold_ref.length)
1872 }
1873 }
1874 }
1875
1876 pub fn delete_edge(&self, id: EdgeId) -> bool {
1878 self.needs_stats_recompute.store(true, Ordering::Relaxed);
1879 self.delete_edge_at_epoch(id, self.current_epoch())
1880 }
1881
1882 #[cfg(not(feature = "tiered-storage"))]
1884 pub fn delete_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> bool {
1885 let mut edges = self.edges.write();
1886 if let Some(chain) = edges.get_mut(&id) {
1887 let (src, dst) = {
1889 match chain.visible_at(epoch) {
1890 Some(record) => {
1891 if record.is_deleted() {
1892 return false;
1893 }
1894 (record.src, record.dst)
1895 }
1896 None => return false, }
1898 };
1899
1900 chain.mark_deleted(epoch);
1902
1903 drop(edges); self.forward_adj.mark_deleted(src, id);
1907 if let Some(ref backward) = self.backward_adj {
1908 backward.mark_deleted(dst, id);
1909 }
1910
1911 self.edge_properties.remove_all(id);
1913
1914 true
1915 } else {
1916 false
1917 }
1918 }
1919
1920 #[cfg(feature = "tiered-storage")]
1923 pub fn delete_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> bool {
1924 let mut versions = self.edge_versions.write();
1925 if let Some(index) = versions.get_mut(&id) {
1926 let (src, dst) = {
1928 match index.visible_at(epoch) {
1929 Some(version_ref) => {
1930 if let Some(record) = self.read_edge_record(&version_ref) {
1931 if record.is_deleted() {
1932 return false;
1933 }
1934 (record.src, record.dst)
1935 } else {
1936 return false;
1937 }
1938 }
1939 None => return false,
1940 }
1941 };
1942
1943 index.mark_deleted(epoch);
1945
1946 drop(versions); self.forward_adj.mark_deleted(src, id);
1950 if let Some(ref backward) = self.backward_adj {
1951 backward.mark_deleted(dst, id);
1952 }
1953
1954 self.edge_properties.remove_all(id);
1956
1957 true
1958 } else {
1959 false
1960 }
1961 }
1962
1963 #[must_use]
1965 #[cfg(not(feature = "tiered-storage"))]
1966 pub fn edge_count(&self) -> usize {
1967 let epoch = self.current_epoch();
1968 self.edges
1969 .read()
1970 .values()
1971 .filter_map(|chain| chain.visible_at(epoch))
1972 .filter(|r| !r.is_deleted())
1973 .count()
1974 }
1975
1976 #[must_use]
1979 #[cfg(feature = "tiered-storage")]
1980 pub fn edge_count(&self) -> usize {
1981 let epoch = self.current_epoch();
1982 let versions = self.edge_versions.read();
1983 versions
1984 .iter()
1985 .filter(|(_, index)| {
1986 index.visible_at(epoch).map_or(false, |vref| {
1987 self.read_edge_record(&vref)
1988 .map_or(false, |r| !r.is_deleted())
1989 })
1990 })
1991 .count()
1992 }
1993
1994 #[cfg(not(feature = "tiered-storage"))]
1999 pub fn discard_uncommitted_versions(&self, tx_id: TxId) {
2000 {
2002 let mut nodes = self.nodes.write();
2003 for chain in nodes.values_mut() {
2004 chain.remove_versions_by(tx_id);
2005 }
2006 nodes.retain(|_, chain| !chain.is_empty());
2008 }
2009
2010 {
2012 let mut edges = self.edges.write();
2013 for chain in edges.values_mut() {
2014 chain.remove_versions_by(tx_id);
2015 }
2016 edges.retain(|_, chain| !chain.is_empty());
2018 }
2019 }
2020
2021 #[cfg(feature = "tiered-storage")]
2024 pub fn discard_uncommitted_versions(&self, tx_id: TxId) {
2025 {
2027 let mut versions = self.node_versions.write();
2028 for index in versions.values_mut() {
2029 index.remove_versions_by(tx_id);
2030 }
2031 versions.retain(|_, index| !index.is_empty());
2033 }
2034
2035 {
2037 let mut versions = self.edge_versions.write();
2038 for index in versions.values_mut() {
2039 index.remove_versions_by(tx_id);
2040 }
2041 versions.retain(|_, index| !index.is_empty());
2043 }
2044 }
2045
2046 #[cfg(feature = "tiered-storage")]
2065 #[allow(unsafe_code)]
2066 pub fn freeze_epoch(&self, epoch: EpochId) -> usize {
2067 let mut node_records: Vec<(u64, NodeRecord)> = Vec::new();
2069 let mut node_hot_refs: Vec<(NodeId, HotVersionRef)> = Vec::new();
2070
2071 {
2072 let versions = self.node_versions.read();
2073 for (node_id, index) in versions.iter() {
2074 for hot_ref in index.hot_refs_for_epoch(epoch) {
2075 let arena = self.arena_allocator.arena(hot_ref.epoch);
2076 let record: &NodeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
2078 node_records.push((node_id.as_u64(), *record));
2079 node_hot_refs.push((*node_id, *hot_ref));
2080 }
2081 }
2082 }
2083
2084 let mut edge_records: Vec<(u64, EdgeRecord)> = Vec::new();
2086 let mut edge_hot_refs: Vec<(EdgeId, HotVersionRef)> = Vec::new();
2087
2088 {
2089 let versions = self.edge_versions.read();
2090 for (edge_id, index) in versions.iter() {
2091 for hot_ref in index.hot_refs_for_epoch(epoch) {
2092 let arena = self.arena_allocator.arena(hot_ref.epoch);
2093 let record: &EdgeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
2095 edge_records.push((edge_id.as_u64(), *record));
2096 edge_hot_refs.push((*edge_id, *hot_ref));
2097 }
2098 }
2099 }
2100
2101 let total_frozen = node_records.len() + edge_records.len();
2102
2103 if total_frozen == 0 {
2104 return 0;
2105 }
2106
2107 let (node_entries, edge_entries) =
2109 self.epoch_store
2110 .freeze_epoch(epoch, node_records, edge_records);
2111
2112 let node_entry_map: FxHashMap<u64, _> = node_entries
2114 .iter()
2115 .map(|e| (e.entity_id, (e.offset, e.length)))
2116 .collect();
2117 let edge_entry_map: FxHashMap<u64, _> = edge_entries
2118 .iter()
2119 .map(|e| (e.entity_id, (e.offset, e.length)))
2120 .collect();
2121
2122 {
2124 let mut versions = self.node_versions.write();
2125 for (node_id, hot_ref) in &node_hot_refs {
2126 if let Some(index) = versions.get_mut(node_id)
2127 && let Some(&(offset, length)) = node_entry_map.get(&node_id.as_u64())
2128 {
2129 let cold_ref = ColdVersionRef {
2130 epoch,
2131 block_offset: offset,
2132 length,
2133 created_by: hot_ref.created_by,
2134 deleted_epoch: hot_ref.deleted_epoch,
2135 };
2136 index.freeze_epoch(epoch, std::iter::once(cold_ref));
2137 }
2138 }
2139 }
2140
2141 {
2142 let mut versions = self.edge_versions.write();
2143 for (edge_id, hot_ref) in &edge_hot_refs {
2144 if let Some(index) = versions.get_mut(edge_id)
2145 && let Some(&(offset, length)) = edge_entry_map.get(&edge_id.as_u64())
2146 {
2147 let cold_ref = ColdVersionRef {
2148 epoch,
2149 block_offset: offset,
2150 length,
2151 created_by: hot_ref.created_by,
2152 deleted_epoch: hot_ref.deleted_epoch,
2153 };
2154 index.freeze_epoch(epoch, std::iter::once(cold_ref));
2155 }
2156 }
2157 }
2158
2159 total_frozen
2160 }
2161
2162 #[cfg(feature = "tiered-storage")]
2164 #[must_use]
2165 pub fn epoch_store(&self) -> &EpochStore {
2166 &self.epoch_store
2167 }
2168
2169 #[must_use]
2171 pub fn label_count(&self) -> usize {
2172 self.id_to_label.read().len()
2173 }
2174
2175 #[must_use]
2179 pub fn property_key_count(&self) -> usize {
2180 let node_keys = self.node_properties.column_count();
2181 let edge_keys = self.edge_properties.column_count();
2182 node_keys + edge_keys
2186 }
2187
2188 #[must_use]
2190 pub fn edge_type_count(&self) -> usize {
2191 self.id_to_edge_type.read().len()
2192 }
2193
2194 pub fn neighbors(
2201 &self,
2202 node: NodeId,
2203 direction: Direction,
2204 ) -> impl Iterator<Item = NodeId> + '_ {
2205 let forward: Box<dyn Iterator<Item = NodeId>> = match direction {
2206 Direction::Outgoing | Direction::Both => {
2207 Box::new(self.forward_adj.neighbors(node).into_iter())
2208 }
2209 Direction::Incoming => Box::new(std::iter::empty()),
2210 };
2211
2212 let backward: Box<dyn Iterator<Item = NodeId>> = match direction {
2213 Direction::Incoming | Direction::Both => {
2214 if let Some(ref adj) = self.backward_adj {
2215 Box::new(adj.neighbors(node).into_iter())
2216 } else {
2217 Box::new(std::iter::empty())
2218 }
2219 }
2220 Direction::Outgoing => Box::new(std::iter::empty()),
2221 };
2222
2223 forward.chain(backward)
2224 }
2225
2226 pub fn edges_from(
2230 &self,
2231 node: NodeId,
2232 direction: Direction,
2233 ) -> impl Iterator<Item = (NodeId, EdgeId)> + '_ {
2234 let forward: Box<dyn Iterator<Item = (NodeId, EdgeId)>> = match direction {
2235 Direction::Outgoing | Direction::Both => {
2236 Box::new(self.forward_adj.edges_from(node).into_iter())
2237 }
2238 Direction::Incoming => Box::new(std::iter::empty()),
2239 };
2240
2241 let backward: Box<dyn Iterator<Item = (NodeId, EdgeId)>> = match direction {
2242 Direction::Incoming | Direction::Both => {
2243 if let Some(ref adj) = self.backward_adj {
2244 Box::new(adj.edges_from(node).into_iter())
2245 } else {
2246 Box::new(std::iter::empty())
2247 }
2248 }
2249 Direction::Outgoing => Box::new(std::iter::empty()),
2250 };
2251
2252 forward.chain(backward)
2253 }
2254
2255 pub fn edges_to(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
2268 if let Some(ref backward) = self.backward_adj {
2269 backward.edges_from(node)
2270 } else {
2271 self.all_edges()
2273 .filter_map(|edge| {
2274 if edge.dst == node {
2275 Some((edge.src, edge.id))
2276 } else {
2277 None
2278 }
2279 })
2280 .collect()
2281 }
2282 }
2283
2284 #[must_use]
2288 pub fn out_degree(&self, node: NodeId) -> usize {
2289 self.forward_adj.out_degree(node)
2290 }
2291
2292 #[must_use]
2297 pub fn in_degree(&self, node: NodeId) -> usize {
2298 if let Some(ref backward) = self.backward_adj {
2299 backward.in_degree(node)
2300 } else {
2301 self.all_edges().filter(|edge| edge.dst == node).count()
2303 }
2304 }
2305
2306 #[must_use]
2308 #[cfg(not(feature = "tiered-storage"))]
2309 pub fn edge_type(&self, id: EdgeId) -> Option<ArcStr> {
2310 let edges = self.edges.read();
2311 let chain = edges.get(&id)?;
2312 let epoch = self.current_epoch();
2313 let record = chain.visible_at(epoch)?;
2314 let id_to_type = self.id_to_edge_type.read();
2315 id_to_type.get(record.type_id as usize).cloned()
2316 }
2317
2318 #[must_use]
2321 #[cfg(feature = "tiered-storage")]
2322 pub fn edge_type(&self, id: EdgeId) -> Option<ArcStr> {
2323 let versions = self.edge_versions.read();
2324 let index = versions.get(&id)?;
2325 let epoch = self.current_epoch();
2326 let vref = index.visible_at(epoch)?;
2327 let record = self.read_edge_record(&vref)?;
2328 let id_to_type = self.id_to_edge_type.read();
2329 id_to_type.get(record.type_id as usize).cloned()
2330 }
2331
2332 pub fn nodes_by_label(&self, label: &str) -> Vec<NodeId> {
2338 let label_to_id = self.label_to_id.read();
2339 if let Some(&label_id) = label_to_id.get(label) {
2340 let index = self.label_index.read();
2341 if let Some(set) = index.get(label_id as usize) {
2342 let mut ids: Vec<NodeId> = set.keys().copied().collect();
2343 ids.sort_unstable();
2344 return ids;
2345 }
2346 }
2347 Vec::new()
2348 }
2349
2350 #[cfg(not(feature = "tiered-storage"))]
2357 pub fn all_nodes(&self) -> impl Iterator<Item = Node> + '_ {
2358 let epoch = self.current_epoch();
2359 let node_ids: Vec<NodeId> = self
2360 .nodes
2361 .read()
2362 .iter()
2363 .filter_map(|(id, chain)| {
2364 chain
2365 .visible_at(epoch)
2366 .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
2367 })
2368 .collect();
2369
2370 node_ids.into_iter().filter_map(move |id| self.get_node(id))
2371 }
2372
2373 #[cfg(feature = "tiered-storage")]
2376 pub fn all_nodes(&self) -> impl Iterator<Item = Node> + '_ {
2377 let node_ids = self.node_ids();
2378 node_ids.into_iter().filter_map(move |id| self.get_node(id))
2379 }
2380
2381 #[cfg(not(feature = "tiered-storage"))]
2386 pub fn all_edges(&self) -> impl Iterator<Item = Edge> + '_ {
2387 let epoch = self.current_epoch();
2388 let edge_ids: Vec<EdgeId> = self
2389 .edges
2390 .read()
2391 .iter()
2392 .filter_map(|(id, chain)| {
2393 chain
2394 .visible_at(epoch)
2395 .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
2396 })
2397 .collect();
2398
2399 edge_ids.into_iter().filter_map(move |id| self.get_edge(id))
2400 }
2401
2402 #[cfg(feature = "tiered-storage")]
2405 pub fn all_edges(&self) -> impl Iterator<Item = Edge> + '_ {
2406 let epoch = self.current_epoch();
2407 let versions = self.edge_versions.read();
2408 let edge_ids: Vec<EdgeId> = versions
2409 .iter()
2410 .filter_map(|(id, index)| {
2411 index.visible_at(epoch).and_then(|vref| {
2412 self.read_edge_record(&vref)
2413 .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
2414 })
2415 })
2416 .collect();
2417
2418 edge_ids.into_iter().filter_map(move |id| self.get_edge(id))
2419 }
2420
2421 pub fn all_labels(&self) -> Vec<String> {
2423 self.id_to_label
2424 .read()
2425 .iter()
2426 .map(|s| s.to_string())
2427 .collect()
2428 }
2429
2430 pub fn all_edge_types(&self) -> Vec<String> {
2432 self.id_to_edge_type
2433 .read()
2434 .iter()
2435 .map(|s| s.to_string())
2436 .collect()
2437 }
2438
2439 pub fn all_property_keys(&self) -> Vec<String> {
2441 let mut keys = std::collections::HashSet::new();
2442 for key in self.node_properties.keys() {
2443 keys.insert(key.to_string());
2444 }
2445 for key in self.edge_properties.keys() {
2446 keys.insert(key.to_string());
2447 }
2448 keys.into_iter().collect()
2449 }
2450
2451 pub fn nodes_with_label<'a>(&'a self, label: &str) -> impl Iterator<Item = Node> + 'a {
2453 let node_ids = self.nodes_by_label(label);
2454 node_ids.into_iter().filter_map(move |id| self.get_node(id))
2455 }
2456
2457 #[cfg(not(feature = "tiered-storage"))]
2459 pub fn edges_with_type<'a>(&'a self, edge_type: &str) -> impl Iterator<Item = Edge> + 'a {
2460 let epoch = self.current_epoch();
2461 let type_to_id = self.edge_type_to_id.read();
2462
2463 if let Some(&type_id) = type_to_id.get(edge_type) {
2464 let edge_ids: Vec<EdgeId> = self
2465 .edges
2466 .read()
2467 .iter()
2468 .filter_map(|(id, chain)| {
2469 chain.visible_at(epoch).and_then(|r| {
2470 if !r.is_deleted() && r.type_id == type_id {
2471 Some(*id)
2472 } else {
2473 None
2474 }
2475 })
2476 })
2477 .collect();
2478
2479 Box::new(edge_ids.into_iter().filter_map(move |id| self.get_edge(id)))
2481 as Box<dyn Iterator<Item = Edge> + 'a>
2482 } else {
2483 Box::new(std::iter::empty()) as Box<dyn Iterator<Item = Edge> + 'a>
2485 }
2486 }
2487
2488 #[cfg(feature = "tiered-storage")]
2491 pub fn edges_with_type<'a>(&'a self, edge_type: &str) -> impl Iterator<Item = Edge> + 'a {
2492 let epoch = self.current_epoch();
2493 let type_to_id = self.edge_type_to_id.read();
2494
2495 if let Some(&type_id) = type_to_id.get(edge_type) {
2496 let versions = self.edge_versions.read();
2497 let edge_ids: Vec<EdgeId> = versions
2498 .iter()
2499 .filter_map(|(id, index)| {
2500 index.visible_at(epoch).and_then(|vref| {
2501 self.read_edge_record(&vref).and_then(|r| {
2502 if !r.is_deleted() && r.type_id == type_id {
2503 Some(*id)
2504 } else {
2505 None
2506 }
2507 })
2508 })
2509 })
2510 .collect();
2511
2512 Box::new(edge_ids.into_iter().filter_map(move |id| self.get_edge(id)))
2513 as Box<dyn Iterator<Item = Edge> + 'a>
2514 } else {
2515 Box::new(std::iter::empty()) as Box<dyn Iterator<Item = Edge> + 'a>
2516 }
2517 }
2518
2519 #[must_use]
2526 pub fn node_property_might_match(
2527 &self,
2528 property: &PropertyKey,
2529 op: CompareOp,
2530 value: &Value,
2531 ) -> bool {
2532 self.node_properties.might_match(property, op, value)
2533 }
2534
2535 #[must_use]
2537 pub fn edge_property_might_match(
2538 &self,
2539 property: &PropertyKey,
2540 op: CompareOp,
2541 value: &Value,
2542 ) -> bool {
2543 self.edge_properties.might_match(property, op, value)
2544 }
2545
2546 #[must_use]
2548 pub fn node_property_zone_map(&self, property: &PropertyKey) -> Option<ZoneMapEntry> {
2549 self.node_properties.zone_map(property)
2550 }
2551
2552 #[must_use]
2554 pub fn edge_property_zone_map(&self, property: &PropertyKey) -> Option<ZoneMapEntry> {
2555 self.edge_properties.zone_map(property)
2556 }
2557
2558 pub fn rebuild_zone_maps(&self) {
2560 self.node_properties.rebuild_zone_maps();
2561 self.edge_properties.rebuild_zone_maps();
2562 }
2563
2564 #[must_use]
2568 pub fn statistics(&self) -> Statistics {
2569 self.statistics.read().clone()
2570 }
2571
2572 pub fn ensure_statistics_fresh(&self) {
2577 if self.needs_stats_recompute.swap(false, Ordering::Relaxed) {
2578 self.compute_statistics();
2579 }
2580 }
2581
2582 #[cfg(not(feature = "tiered-storage"))]
2587 pub fn compute_statistics(&self) {
2588 let mut stats = Statistics::new();
2589
2590 stats.total_nodes = self.node_count() as u64;
2592 stats.total_edges = self.edge_count() as u64;
2593
2594 let id_to_label = self.id_to_label.read();
2596 let label_index = self.label_index.read();
2597
2598 for (label_id, label_name) in id_to_label.iter().enumerate() {
2599 let node_count = label_index
2600 .get(label_id)
2601 .map(|set| set.len() as u64)
2602 .unwrap_or(0);
2603
2604 if node_count > 0 {
2605 let avg_out_degree = if stats.total_nodes > 0 {
2607 stats.total_edges as f64 / stats.total_nodes as f64
2608 } else {
2609 0.0
2610 };
2611
2612 let label_stats =
2613 LabelStatistics::new(node_count).with_degrees(avg_out_degree, avg_out_degree);
2614
2615 stats.update_label(label_name.as_ref(), label_stats);
2616 }
2617 }
2618
2619 let id_to_edge_type = self.id_to_edge_type.read();
2621 let edges = self.edges.read();
2622 let epoch = self.current_epoch();
2623
2624 let mut edge_type_counts: FxHashMap<u32, u64> = FxHashMap::default();
2625 for chain in edges.values() {
2626 if let Some(record) = chain.visible_at(epoch) {
2627 if !record.is_deleted() {
2628 *edge_type_counts.entry(record.type_id).or_default() += 1;
2629 }
2630 }
2631 }
2632
2633 for (type_id, count) in edge_type_counts {
2634 if let Some(type_name) = id_to_edge_type.get(type_id as usize) {
2635 let avg_degree = if stats.total_nodes > 0 {
2636 count as f64 / stats.total_nodes as f64
2637 } else {
2638 0.0
2639 };
2640
2641 let edge_stats = EdgeTypeStatistics::new(count, avg_degree, avg_degree);
2642 stats.update_edge_type(type_name.as_ref(), edge_stats);
2643 }
2644 }
2645
2646 *self.statistics.write() = stats;
2647 }
2648
2649 #[cfg(feature = "tiered-storage")]
2652 pub fn compute_statistics(&self) {
2653 let mut stats = Statistics::new();
2654
2655 stats.total_nodes = self.node_count() as u64;
2657 stats.total_edges = self.edge_count() as u64;
2658
2659 let id_to_label = self.id_to_label.read();
2661 let label_index = self.label_index.read();
2662
2663 for (label_id, label_name) in id_to_label.iter().enumerate() {
2664 let node_count = label_index.get(label_id).map_or(0, |set| set.len() as u64);
2665
2666 if node_count > 0 {
2667 let avg_out_degree = if stats.total_nodes > 0 {
2668 stats.total_edges as f64 / stats.total_nodes as f64
2669 } else {
2670 0.0
2671 };
2672
2673 let label_stats =
2674 LabelStatistics::new(node_count).with_degrees(avg_out_degree, avg_out_degree);
2675
2676 stats.update_label(label_name.as_ref(), label_stats);
2677 }
2678 }
2679
2680 let id_to_edge_type = self.id_to_edge_type.read();
2682 let versions = self.edge_versions.read();
2683 let epoch = self.current_epoch();
2684
2685 let mut edge_type_counts: FxHashMap<u32, u64> = FxHashMap::default();
2686 for index in versions.values() {
2687 if let Some(vref) = index.visible_at(epoch)
2688 && let Some(record) = self.read_edge_record(&vref)
2689 && !record.is_deleted()
2690 {
2691 *edge_type_counts.entry(record.type_id).or_default() += 1;
2692 }
2693 }
2694
2695 for (type_id, count) in edge_type_counts {
2696 if let Some(type_name) = id_to_edge_type.get(type_id as usize) {
2697 let avg_degree = if stats.total_nodes > 0 {
2698 count as f64 / stats.total_nodes as f64
2699 } else {
2700 0.0
2701 };
2702
2703 let edge_stats = EdgeTypeStatistics::new(count, avg_degree, avg_degree);
2704 stats.update_edge_type(type_name.as_ref(), edge_stats);
2705 }
2706 }
2707
2708 *self.statistics.write() = stats;
2709 }
2710
2711 #[must_use]
2713 pub fn estimate_label_cardinality(&self, label: &str) -> f64 {
2714 self.statistics.read().estimate_label_cardinality(label)
2715 }
2716
2717 #[must_use]
2719 pub fn estimate_avg_degree(&self, edge_type: &str, outgoing: bool) -> f64 {
2720 self.statistics
2721 .read()
2722 .estimate_avg_degree(edge_type, outgoing)
2723 }
2724
2725 fn get_or_create_label_id(&self, label: &str) -> u32 {
2728 {
2729 let label_to_id = self.label_to_id.read();
2730 if let Some(&id) = label_to_id.get(label) {
2731 return id;
2732 }
2733 }
2734
2735 let mut label_to_id = self.label_to_id.write();
2736 let mut id_to_label = self.id_to_label.write();
2737
2738 if let Some(&id) = label_to_id.get(label) {
2740 return id;
2741 }
2742
2743 let id = id_to_label.len() as u32;
2744
2745 let label: ArcStr = label.into();
2746 label_to_id.insert(label.clone(), id);
2747 id_to_label.push(label);
2748
2749 id
2750 }
2751
2752 fn get_or_create_edge_type_id(&self, edge_type: &str) -> u32 {
2753 {
2754 let type_to_id = self.edge_type_to_id.read();
2755 if let Some(&id) = type_to_id.get(edge_type) {
2756 return id;
2757 }
2758 }
2759
2760 let mut type_to_id = self.edge_type_to_id.write();
2761 let mut id_to_type = self.id_to_edge_type.write();
2762
2763 if let Some(&id) = type_to_id.get(edge_type) {
2765 return id;
2766 }
2767
2768 let id = id_to_type.len() as u32;
2769 let edge_type: ArcStr = edge_type.into();
2770 type_to_id.insert(edge_type.clone(), id);
2771 id_to_type.push(edge_type);
2772
2773 id
2774 }
2775
2776 #[cfg(not(feature = "tiered-storage"))]
2783 pub fn create_node_with_id(&self, id: NodeId, labels: &[&str]) {
2784 let epoch = self.current_epoch();
2785 let mut record = NodeRecord::new(id, epoch);
2786 record.set_label_count(labels.len() as u16);
2787
2788 let mut node_label_set = FxHashSet::default();
2790 for label in labels {
2791 let label_id = self.get_or_create_label_id(*label);
2792 node_label_set.insert(label_id);
2793
2794 let mut index = self.label_index.write();
2796 while index.len() <= label_id as usize {
2797 index.push(FxHashMap::default());
2798 }
2799 index[label_id as usize].insert(id, ());
2800 }
2801
2802 self.node_labels.write().insert(id, node_label_set);
2804
2805 let chain = VersionChain::with_initial(record, epoch, TxId::SYSTEM);
2807 self.nodes.write().insert(id, chain);
2808
2809 let id_val = id.as_u64();
2811 let _ = self
2812 .next_node_id
2813 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
2814 if id_val >= current {
2815 Some(id_val + 1)
2816 } else {
2817 None
2818 }
2819 });
2820 }
2821
2822 #[cfg(feature = "tiered-storage")]
2825 pub fn create_node_with_id(&self, id: NodeId, labels: &[&str]) {
2826 let epoch = self.current_epoch();
2827 let mut record = NodeRecord::new(id, epoch);
2828 record.set_label_count(labels.len() as u16);
2829
2830 let mut node_label_set = FxHashSet::default();
2832 for label in labels {
2833 let label_id = self.get_or_create_label_id(*label);
2834 node_label_set.insert(label_id);
2835
2836 let mut index = self.label_index.write();
2838 while index.len() <= label_id as usize {
2839 index.push(FxHashMap::default());
2840 }
2841 index[label_id as usize].insert(id, ());
2842 }
2843
2844 self.node_labels.write().insert(id, node_label_set);
2846
2847 let arena = self.arena_allocator.arena_or_create(epoch);
2849 let (offset, _stored) = arena.alloc_value_with_offset(record);
2850
2851 let hot_ref = HotVersionRef::new(epoch, offset, TxId::SYSTEM);
2853 let mut versions = self.node_versions.write();
2854 versions.insert(id, VersionIndex::with_initial(hot_ref));
2855
2856 let id_val = id.as_u64();
2858 let _ = self
2859 .next_node_id
2860 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
2861 if id_val >= current {
2862 Some(id_val + 1)
2863 } else {
2864 None
2865 }
2866 });
2867 }
2868
2869 #[cfg(not(feature = "tiered-storage"))]
2873 pub fn create_edge_with_id(&self, id: EdgeId, src: NodeId, dst: NodeId, edge_type: &str) {
2874 let epoch = self.current_epoch();
2875 let type_id = self.get_or_create_edge_type_id(edge_type);
2876
2877 let record = EdgeRecord::new(id, src, dst, type_id, epoch);
2878 let chain = VersionChain::with_initial(record, epoch, TxId::SYSTEM);
2879 self.edges.write().insert(id, chain);
2880
2881 self.forward_adj.add_edge(src, dst, id);
2883 if let Some(ref backward) = self.backward_adj {
2884 backward.add_edge(dst, src, id);
2885 }
2886
2887 let id_val = id.as_u64();
2889 let _ = self
2890 .next_edge_id
2891 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
2892 if id_val >= current {
2893 Some(id_val + 1)
2894 } else {
2895 None
2896 }
2897 });
2898 }
2899
2900 #[cfg(feature = "tiered-storage")]
2903 pub fn create_edge_with_id(&self, id: EdgeId, src: NodeId, dst: NodeId, edge_type: &str) {
2904 let epoch = self.current_epoch();
2905 let type_id = self.get_or_create_edge_type_id(edge_type);
2906
2907 let record = EdgeRecord::new(id, src, dst, type_id, epoch);
2908
2909 let arena = self.arena_allocator.arena_or_create(epoch);
2911 let (offset, _stored) = arena.alloc_value_with_offset(record);
2912
2913 let hot_ref = HotVersionRef::new(epoch, offset, TxId::SYSTEM);
2915 let mut versions = self.edge_versions.write();
2916 versions.insert(id, VersionIndex::with_initial(hot_ref));
2917
2918 self.forward_adj.add_edge(src, dst, id);
2920 if let Some(ref backward) = self.backward_adj {
2921 backward.add_edge(dst, src, id);
2922 }
2923
2924 let id_val = id.as_u64();
2926 let _ = self
2927 .next_edge_id
2928 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
2929 if id_val >= current {
2930 Some(id_val + 1)
2931 } else {
2932 None
2933 }
2934 });
2935 }
2936
2937 pub fn set_epoch(&self, epoch: EpochId) {
2939 self.current_epoch.store(epoch.as_u64(), Ordering::SeqCst);
2940 }
2941}
2942
2943impl Default for LpgStore {
2944 fn default() -> Self {
2945 Self::new()
2946 }
2947}
2948
2949#[cfg(test)]
2950mod tests {
2951 use super::*;
2952
2953 #[test]
2954 fn test_create_node() {
2955 let store = LpgStore::new();
2956
2957 let id = store.create_node(&["Person"]);
2958 assert!(id.is_valid());
2959
2960 let node = store.get_node(id).unwrap();
2961 assert!(node.has_label("Person"));
2962 assert!(!node.has_label("Animal"));
2963 }
2964
2965 #[test]
2966 fn test_create_node_with_props() {
2967 let store = LpgStore::new();
2968
2969 let id = store.create_node_with_props(
2970 &["Person"],
2971 [("name", Value::from("Alice")), ("age", Value::from(30i64))],
2972 );
2973
2974 let node = store.get_node(id).unwrap();
2975 assert_eq!(
2976 node.get_property("name").and_then(|v| v.as_str()),
2977 Some("Alice")
2978 );
2979 assert_eq!(
2980 node.get_property("age").and_then(|v| v.as_int64()),
2981 Some(30)
2982 );
2983 }
2984
2985 #[test]
2986 fn test_delete_node() {
2987 let store = LpgStore::new();
2988
2989 let id = store.create_node(&["Person"]);
2990 assert_eq!(store.node_count(), 1);
2991
2992 assert!(store.delete_node(id));
2993 assert_eq!(store.node_count(), 0);
2994 assert!(store.get_node(id).is_none());
2995
2996 assert!(!store.delete_node(id));
2998 }
2999
3000 #[test]
3001 fn test_create_edge() {
3002 let store = LpgStore::new();
3003
3004 let alice = store.create_node(&["Person"]);
3005 let bob = store.create_node(&["Person"]);
3006
3007 let edge_id = store.create_edge(alice, bob, "KNOWS");
3008 assert!(edge_id.is_valid());
3009
3010 let edge = store.get_edge(edge_id).unwrap();
3011 assert_eq!(edge.src, alice);
3012 assert_eq!(edge.dst, bob);
3013 assert_eq!(edge.edge_type.as_str(), "KNOWS");
3014 }
3015
3016 #[test]
3017 fn test_neighbors() {
3018 let store = LpgStore::new();
3019
3020 let a = store.create_node(&["Person"]);
3021 let b = store.create_node(&["Person"]);
3022 let c = store.create_node(&["Person"]);
3023
3024 store.create_edge(a, b, "KNOWS");
3025 store.create_edge(a, c, "KNOWS");
3026
3027 let outgoing: Vec<_> = store.neighbors(a, Direction::Outgoing).collect();
3028 assert_eq!(outgoing.len(), 2);
3029 assert!(outgoing.contains(&b));
3030 assert!(outgoing.contains(&c));
3031
3032 let incoming: Vec<_> = store.neighbors(b, Direction::Incoming).collect();
3033 assert_eq!(incoming.len(), 1);
3034 assert!(incoming.contains(&a));
3035 }
3036
3037 #[test]
3038 fn test_nodes_by_label() {
3039 let store = LpgStore::new();
3040
3041 let p1 = store.create_node(&["Person"]);
3042 let p2 = store.create_node(&["Person"]);
3043 let _a = store.create_node(&["Animal"]);
3044
3045 let persons = store.nodes_by_label("Person");
3046 assert_eq!(persons.len(), 2);
3047 assert!(persons.contains(&p1));
3048 assert!(persons.contains(&p2));
3049
3050 let animals = store.nodes_by_label("Animal");
3051 assert_eq!(animals.len(), 1);
3052 }
3053
3054 #[test]
3055 fn test_delete_edge() {
3056 let store = LpgStore::new();
3057
3058 let a = store.create_node(&["Person"]);
3059 let b = store.create_node(&["Person"]);
3060 let edge_id = store.create_edge(a, b, "KNOWS");
3061
3062 assert_eq!(store.edge_count(), 1);
3063
3064 assert!(store.delete_edge(edge_id));
3065 assert_eq!(store.edge_count(), 0);
3066 assert!(store.get_edge(edge_id).is_none());
3067 }
3068
3069 #[test]
3072 fn test_lpg_store_config() {
3073 let config = LpgStoreConfig {
3075 backward_edges: false,
3076 initial_node_capacity: 100,
3077 initial_edge_capacity: 200,
3078 };
3079 let store = LpgStore::with_config(config);
3080
3081 let a = store.create_node(&["Person"]);
3083 let b = store.create_node(&["Person"]);
3084 store.create_edge(a, b, "KNOWS");
3085
3086 let outgoing: Vec<_> = store.neighbors(a, Direction::Outgoing).collect();
3088 assert_eq!(outgoing.len(), 1);
3089
3090 let incoming: Vec<_> = store.neighbors(b, Direction::Incoming).collect();
3092 assert_eq!(incoming.len(), 0);
3093 }
3094
3095 #[test]
3096 fn test_epoch_management() {
3097 let store = LpgStore::new();
3098
3099 let epoch0 = store.current_epoch();
3100 assert_eq!(epoch0.as_u64(), 0);
3101
3102 let epoch1 = store.new_epoch();
3103 assert_eq!(epoch1.as_u64(), 1);
3104
3105 let current = store.current_epoch();
3106 assert_eq!(current.as_u64(), 1);
3107 }
3108
3109 #[test]
3110 fn test_node_properties() {
3111 let store = LpgStore::new();
3112 let id = store.create_node(&["Person"]);
3113
3114 store.set_node_property(id, "name", Value::from("Alice"));
3116 let name = store.get_node_property(id, &"name".into());
3117 assert!(matches!(name, Some(Value::String(s)) if s.as_str() == "Alice"));
3118
3119 store.set_node_property(id, "name", Value::from("Bob"));
3121 let name = store.get_node_property(id, &"name".into());
3122 assert!(matches!(name, Some(Value::String(s)) if s.as_str() == "Bob"));
3123
3124 let old = store.remove_node_property(id, "name");
3126 assert!(matches!(old, Some(Value::String(s)) if s.as_str() == "Bob"));
3127
3128 let name = store.get_node_property(id, &"name".into());
3130 assert!(name.is_none());
3131
3132 let none = store.remove_node_property(id, "nonexistent");
3134 assert!(none.is_none());
3135 }
3136
3137 #[test]
3138 fn test_edge_properties() {
3139 let store = LpgStore::new();
3140 let a = store.create_node(&["Person"]);
3141 let b = store.create_node(&["Person"]);
3142 let edge_id = store.create_edge(a, b, "KNOWS");
3143
3144 store.set_edge_property(edge_id, "since", Value::from(2020i64));
3146 let since = store.get_edge_property(edge_id, &"since".into());
3147 assert_eq!(since.and_then(|v| v.as_int64()), Some(2020));
3148
3149 let old = store.remove_edge_property(edge_id, "since");
3151 assert_eq!(old.and_then(|v| v.as_int64()), Some(2020));
3152
3153 let since = store.get_edge_property(edge_id, &"since".into());
3154 assert!(since.is_none());
3155 }
3156
3157 #[test]
3158 fn test_add_remove_label() {
3159 let store = LpgStore::new();
3160 let id = store.create_node(&["Person"]);
3161
3162 assert!(store.add_label(id, "Employee"));
3164
3165 let node = store.get_node(id).unwrap();
3166 assert!(node.has_label("Person"));
3167 assert!(node.has_label("Employee"));
3168
3169 assert!(!store.add_label(id, "Employee"));
3171
3172 assert!(store.remove_label(id, "Employee"));
3174
3175 let node = store.get_node(id).unwrap();
3176 assert!(node.has_label("Person"));
3177 assert!(!node.has_label("Employee"));
3178
3179 assert!(!store.remove_label(id, "Employee"));
3181 assert!(!store.remove_label(id, "NonExistent"));
3182 }
3183
3184 #[test]
3185 fn test_add_label_to_nonexistent_node() {
3186 let store = LpgStore::new();
3187 let fake_id = NodeId::new(999);
3188 assert!(!store.add_label(fake_id, "Label"));
3189 }
3190
3191 #[test]
3192 fn test_remove_label_from_nonexistent_node() {
3193 let store = LpgStore::new();
3194 let fake_id = NodeId::new(999);
3195 assert!(!store.remove_label(fake_id, "Label"));
3196 }
3197
3198 #[test]
3199 fn test_node_ids() {
3200 let store = LpgStore::new();
3201
3202 let n1 = store.create_node(&["Person"]);
3203 let n2 = store.create_node(&["Person"]);
3204 let n3 = store.create_node(&["Person"]);
3205
3206 let ids = store.node_ids();
3207 assert_eq!(ids.len(), 3);
3208 assert!(ids.contains(&n1));
3209 assert!(ids.contains(&n2));
3210 assert!(ids.contains(&n3));
3211
3212 store.delete_node(n2);
3214 let ids = store.node_ids();
3215 assert_eq!(ids.len(), 2);
3216 assert!(!ids.contains(&n2));
3217 }
3218
3219 #[test]
3220 fn test_delete_node_nonexistent() {
3221 let store = LpgStore::new();
3222 let fake_id = NodeId::new(999);
3223 assert!(!store.delete_node(fake_id));
3224 }
3225
3226 #[test]
3227 fn test_delete_edge_nonexistent() {
3228 let store = LpgStore::new();
3229 let fake_id = EdgeId::new(999);
3230 assert!(!store.delete_edge(fake_id));
3231 }
3232
3233 #[test]
3234 fn test_delete_edge_double() {
3235 let store = LpgStore::new();
3236 let a = store.create_node(&["Person"]);
3237 let b = store.create_node(&["Person"]);
3238 let edge_id = store.create_edge(a, b, "KNOWS");
3239
3240 assert!(store.delete_edge(edge_id));
3241 assert!(!store.delete_edge(edge_id)); }
3243
3244 #[test]
3245 fn test_create_edge_with_props() {
3246 let store = LpgStore::new();
3247 let a = store.create_node(&["Person"]);
3248 let b = store.create_node(&["Person"]);
3249
3250 let edge_id = store.create_edge_with_props(
3251 a,
3252 b,
3253 "KNOWS",
3254 [
3255 ("since", Value::from(2020i64)),
3256 ("weight", Value::from(1.0)),
3257 ],
3258 );
3259
3260 let edge = store.get_edge(edge_id).unwrap();
3261 assert_eq!(
3262 edge.get_property("since").and_then(|v| v.as_int64()),
3263 Some(2020)
3264 );
3265 assert_eq!(
3266 edge.get_property("weight").and_then(|v| v.as_float64()),
3267 Some(1.0)
3268 );
3269 }
3270
3271 #[test]
3272 fn test_delete_node_edges() {
3273 let store = LpgStore::new();
3274
3275 let a = store.create_node(&["Person"]);
3276 let b = store.create_node(&["Person"]);
3277 let c = store.create_node(&["Person"]);
3278
3279 store.create_edge(a, b, "KNOWS"); store.create_edge(c, a, "KNOWS"); assert_eq!(store.edge_count(), 2);
3283
3284 store.delete_node_edges(a);
3286
3287 assert_eq!(store.edge_count(), 0);
3288 }
3289
3290 #[test]
3291 fn test_neighbors_both_directions() {
3292 let store = LpgStore::new();
3293
3294 let a = store.create_node(&["Person"]);
3295 let b = store.create_node(&["Person"]);
3296 let c = store.create_node(&["Person"]);
3297
3298 store.create_edge(a, b, "KNOWS"); store.create_edge(c, a, "KNOWS"); let neighbors: Vec<_> = store.neighbors(a, Direction::Both).collect();
3303 assert_eq!(neighbors.len(), 2);
3304 assert!(neighbors.contains(&b)); assert!(neighbors.contains(&c)); }
3307
3308 #[test]
3309 fn test_edges_from() {
3310 let store = LpgStore::new();
3311
3312 let a = store.create_node(&["Person"]);
3313 let b = store.create_node(&["Person"]);
3314 let c = store.create_node(&["Person"]);
3315
3316 let e1 = store.create_edge(a, b, "KNOWS");
3317 let e2 = store.create_edge(a, c, "KNOWS");
3318
3319 let edges: Vec<_> = store.edges_from(a, Direction::Outgoing).collect();
3320 assert_eq!(edges.len(), 2);
3321 assert!(edges.iter().any(|(_, e)| *e == e1));
3322 assert!(edges.iter().any(|(_, e)| *e == e2));
3323
3324 let incoming: Vec<_> = store.edges_from(b, Direction::Incoming).collect();
3326 assert_eq!(incoming.len(), 1);
3327 assert_eq!(incoming[0].1, e1);
3328 }
3329
3330 #[test]
3331 fn test_edges_to() {
3332 let store = LpgStore::new();
3333
3334 let a = store.create_node(&["Person"]);
3335 let b = store.create_node(&["Person"]);
3336 let c = store.create_node(&["Person"]);
3337
3338 let e1 = store.create_edge(a, b, "KNOWS");
3339 let e2 = store.create_edge(c, b, "KNOWS");
3340
3341 let to_b = store.edges_to(b);
3343 assert_eq!(to_b.len(), 2);
3344 assert!(to_b.iter().any(|(src, e)| *src == a && *e == e1));
3345 assert!(to_b.iter().any(|(src, e)| *src == c && *e == e2));
3346 }
3347
3348 #[test]
3349 fn test_out_degree_in_degree() {
3350 let store = LpgStore::new();
3351
3352 let a = store.create_node(&["Person"]);
3353 let b = store.create_node(&["Person"]);
3354 let c = store.create_node(&["Person"]);
3355
3356 store.create_edge(a, b, "KNOWS");
3357 store.create_edge(a, c, "KNOWS");
3358 store.create_edge(c, b, "KNOWS");
3359
3360 assert_eq!(store.out_degree(a), 2);
3361 assert_eq!(store.out_degree(b), 0);
3362 assert_eq!(store.out_degree(c), 1);
3363
3364 assert_eq!(store.in_degree(a), 0);
3365 assert_eq!(store.in_degree(b), 2);
3366 assert_eq!(store.in_degree(c), 1);
3367 }
3368
3369 #[test]
3370 fn test_edge_type() {
3371 let store = LpgStore::new();
3372
3373 let a = store.create_node(&["Person"]);
3374 let b = store.create_node(&["Person"]);
3375 let edge_id = store.create_edge(a, b, "KNOWS");
3376
3377 let edge_type = store.edge_type(edge_id);
3378 assert_eq!(edge_type.as_deref(), Some("KNOWS"));
3379
3380 let fake_id = EdgeId::new(999);
3382 assert!(store.edge_type(fake_id).is_none());
3383 }
3384
3385 #[test]
3386 fn test_count_methods() {
3387 let store = LpgStore::new();
3388
3389 assert_eq!(store.label_count(), 0);
3390 assert_eq!(store.edge_type_count(), 0);
3391 assert_eq!(store.property_key_count(), 0);
3392
3393 let a = store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
3394 let b = store.create_node(&["Company"]);
3395 store.create_edge_with_props(a, b, "WORKS_AT", [("since", Value::from(2020i64))]);
3396
3397 assert_eq!(store.label_count(), 2); assert_eq!(store.edge_type_count(), 1); assert_eq!(store.property_key_count(), 2); }
3401
3402 #[test]
3403 fn test_all_nodes_and_edges() {
3404 let store = LpgStore::new();
3405
3406 let a = store.create_node(&["Person"]);
3407 let b = store.create_node(&["Person"]);
3408 store.create_edge(a, b, "KNOWS");
3409
3410 let nodes: Vec<_> = store.all_nodes().collect();
3411 assert_eq!(nodes.len(), 2);
3412
3413 let edges: Vec<_> = store.all_edges().collect();
3414 assert_eq!(edges.len(), 1);
3415 }
3416
3417 #[test]
3418 fn test_all_labels_and_edge_types() {
3419 let store = LpgStore::new();
3420
3421 store.create_node(&["Person"]);
3422 store.create_node(&["Company"]);
3423 let a = store.create_node(&["Animal"]);
3424 let b = store.create_node(&["Animal"]);
3425 store.create_edge(a, b, "EATS");
3426
3427 let labels = store.all_labels();
3428 assert_eq!(labels.len(), 3);
3429 assert!(labels.contains(&"Person".to_string()));
3430 assert!(labels.contains(&"Company".to_string()));
3431 assert!(labels.contains(&"Animal".to_string()));
3432
3433 let edge_types = store.all_edge_types();
3434 assert_eq!(edge_types.len(), 1);
3435 assert!(edge_types.contains(&"EATS".to_string()));
3436 }
3437
3438 #[test]
3439 fn test_all_property_keys() {
3440 let store = LpgStore::new();
3441
3442 let a = store.create_node_with_props(&["Person"], [("name", Value::from("Alice"))]);
3443 let b = store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
3444 store.create_edge_with_props(a, b, "KNOWS", [("since", Value::from(2020i64))]);
3445
3446 let keys = store.all_property_keys();
3447 assert!(keys.contains(&"name".to_string()));
3448 assert!(keys.contains(&"age".to_string()));
3449 assert!(keys.contains(&"since".to_string()));
3450 }
3451
3452 #[test]
3453 fn test_nodes_with_label() {
3454 let store = LpgStore::new();
3455
3456 store.create_node(&["Person"]);
3457 store.create_node(&["Person"]);
3458 store.create_node(&["Company"]);
3459
3460 let persons: Vec<_> = store.nodes_with_label("Person").collect();
3461 assert_eq!(persons.len(), 2);
3462
3463 let companies: Vec<_> = store.nodes_with_label("Company").collect();
3464 assert_eq!(companies.len(), 1);
3465
3466 let none: Vec<_> = store.nodes_with_label("NonExistent").collect();
3467 assert_eq!(none.len(), 0);
3468 }
3469
3470 #[test]
3471 fn test_edges_with_type() {
3472 let store = LpgStore::new();
3473
3474 let a = store.create_node(&["Person"]);
3475 let b = store.create_node(&["Person"]);
3476 let c = store.create_node(&["Company"]);
3477
3478 store.create_edge(a, b, "KNOWS");
3479 store.create_edge(a, c, "WORKS_AT");
3480
3481 let knows: Vec<_> = store.edges_with_type("KNOWS").collect();
3482 assert_eq!(knows.len(), 1);
3483
3484 let works_at: Vec<_> = store.edges_with_type("WORKS_AT").collect();
3485 assert_eq!(works_at.len(), 1);
3486
3487 let none: Vec<_> = store.edges_with_type("NonExistent").collect();
3488 assert_eq!(none.len(), 0);
3489 }
3490
3491 #[test]
3492 fn test_nodes_by_label_nonexistent() {
3493 let store = LpgStore::new();
3494 store.create_node(&["Person"]);
3495
3496 let empty = store.nodes_by_label("NonExistent");
3497 assert!(empty.is_empty());
3498 }
3499
3500 #[test]
3501 fn test_statistics() {
3502 let store = LpgStore::new();
3503
3504 let a = store.create_node(&["Person"]);
3505 let b = store.create_node(&["Person"]);
3506 let c = store.create_node(&["Company"]);
3507
3508 store.create_edge(a, b, "KNOWS");
3509 store.create_edge(a, c, "WORKS_AT");
3510
3511 store.compute_statistics();
3512 let stats = store.statistics();
3513
3514 assert_eq!(stats.total_nodes, 3);
3515 assert_eq!(stats.total_edges, 2);
3516
3517 let person_card = store.estimate_label_cardinality("Person");
3519 assert!(person_card > 0.0);
3520
3521 let avg_degree = store.estimate_avg_degree("KNOWS", true);
3522 assert!(avg_degree >= 0.0);
3523 }
3524
3525 #[test]
3526 fn test_zone_maps() {
3527 let store = LpgStore::new();
3528
3529 store.create_node_with_props(&["Person"], [("age", Value::from(25i64))]);
3530 store.create_node_with_props(&["Person"], [("age", Value::from(35i64))]);
3531
3532 let might_match =
3534 store.node_property_might_match(&"age".into(), CompareOp::Eq, &Value::from(30i64));
3535 assert!(might_match);
3537
3538 let zone = store.node_property_zone_map(&"age".into());
3539 assert!(zone.is_some());
3540
3541 let no_zone = store.node_property_zone_map(&"nonexistent".into());
3543 assert!(no_zone.is_none());
3544
3545 let a = store.create_node(&["A"]);
3547 let b = store.create_node(&["B"]);
3548 store.create_edge_with_props(a, b, "REL", [("weight", Value::from(1.0))]);
3549
3550 let edge_zone = store.edge_property_zone_map(&"weight".into());
3551 assert!(edge_zone.is_some());
3552 }
3553
3554 #[test]
3555 fn test_rebuild_zone_maps() {
3556 let store = LpgStore::new();
3557 store.create_node_with_props(&["Person"], [("age", Value::from(25i64))]);
3558
3559 store.rebuild_zone_maps();
3561 }
3562
3563 #[test]
3564 fn test_create_node_with_id() {
3565 let store = LpgStore::new();
3566
3567 let specific_id = NodeId::new(100);
3568 store.create_node_with_id(specific_id, &["Person", "Employee"]);
3569
3570 let node = store.get_node(specific_id).unwrap();
3571 assert!(node.has_label("Person"));
3572 assert!(node.has_label("Employee"));
3573
3574 let next = store.create_node(&["Other"]);
3576 assert!(next.as_u64() > 100);
3577 }
3578
3579 #[test]
3580 fn test_create_edge_with_id() {
3581 let store = LpgStore::new();
3582
3583 let a = store.create_node(&["A"]);
3584 let b = store.create_node(&["B"]);
3585
3586 let specific_id = EdgeId::new(500);
3587 store.create_edge_with_id(specific_id, a, b, "REL");
3588
3589 let edge = store.get_edge(specific_id).unwrap();
3590 assert_eq!(edge.src, a);
3591 assert_eq!(edge.dst, b);
3592 assert_eq!(edge.edge_type.as_str(), "REL");
3593
3594 let next = store.create_edge(a, b, "OTHER");
3596 assert!(next.as_u64() > 500);
3597 }
3598
3599 #[test]
3600 fn test_set_epoch() {
3601 let store = LpgStore::new();
3602
3603 assert_eq!(store.current_epoch().as_u64(), 0);
3604
3605 store.set_epoch(EpochId::new(42));
3606 assert_eq!(store.current_epoch().as_u64(), 42);
3607 }
3608
3609 #[test]
3610 fn test_get_node_nonexistent() {
3611 let store = LpgStore::new();
3612 let fake_id = NodeId::new(999);
3613 assert!(store.get_node(fake_id).is_none());
3614 }
3615
3616 #[test]
3617 fn test_get_edge_nonexistent() {
3618 let store = LpgStore::new();
3619 let fake_id = EdgeId::new(999);
3620 assert!(store.get_edge(fake_id).is_none());
3621 }
3622
3623 #[test]
3624 fn test_multiple_labels() {
3625 let store = LpgStore::new();
3626
3627 let id = store.create_node(&["Person", "Employee", "Manager"]);
3628 let node = store.get_node(id).unwrap();
3629
3630 assert!(node.has_label("Person"));
3631 assert!(node.has_label("Employee"));
3632 assert!(node.has_label("Manager"));
3633 assert!(!node.has_label("Other"));
3634 }
3635
3636 #[test]
3637 fn test_default_impl() {
3638 let store: LpgStore = Default::default();
3639 assert_eq!(store.node_count(), 0);
3640 assert_eq!(store.edge_count(), 0);
3641 }
3642
3643 #[test]
3644 fn test_edges_from_both_directions() {
3645 let store = LpgStore::new();
3646
3647 let a = store.create_node(&["A"]);
3648 let b = store.create_node(&["B"]);
3649 let c = store.create_node(&["C"]);
3650
3651 let e1 = store.create_edge(a, b, "R1"); let e2 = store.create_edge(c, a, "R2"); let edges: Vec<_> = store.edges_from(a, Direction::Both).collect();
3656 assert_eq!(edges.len(), 2);
3657 assert!(edges.iter().any(|(_, e)| *e == e1)); assert!(edges.iter().any(|(_, e)| *e == e2)); }
3660
3661 #[test]
3662 fn test_no_backward_adj_in_degree() {
3663 let config = LpgStoreConfig {
3664 backward_edges: false,
3665 initial_node_capacity: 10,
3666 initial_edge_capacity: 10,
3667 };
3668 let store = LpgStore::with_config(config);
3669
3670 let a = store.create_node(&["A"]);
3671 let b = store.create_node(&["B"]);
3672 store.create_edge(a, b, "R");
3673
3674 let degree = store.in_degree(b);
3676 assert_eq!(degree, 1);
3677 }
3678
3679 #[test]
3680 fn test_no_backward_adj_edges_to() {
3681 let config = LpgStoreConfig {
3682 backward_edges: false,
3683 initial_node_capacity: 10,
3684 initial_edge_capacity: 10,
3685 };
3686 let store = LpgStore::with_config(config);
3687
3688 let a = store.create_node(&["A"]);
3689 let b = store.create_node(&["B"]);
3690 let e = store.create_edge(a, b, "R");
3691
3692 let edges = store.edges_to(b);
3694 assert_eq!(edges.len(), 1);
3695 assert_eq!(edges[0].1, e);
3696 }
3697
3698 #[test]
3699 fn test_node_versioned_creation() {
3700 let store = LpgStore::new();
3701
3702 let epoch = store.new_epoch();
3703 let tx_id = TxId::new(1);
3704
3705 let id = store.create_node_versioned(&["Person"], epoch, tx_id);
3706 assert!(store.get_node(id).is_some());
3707 }
3708
3709 #[test]
3710 fn test_edge_versioned_creation() {
3711 let store = LpgStore::new();
3712
3713 let a = store.create_node(&["A"]);
3714 let b = store.create_node(&["B"]);
3715
3716 let epoch = store.new_epoch();
3717 let tx_id = TxId::new(1);
3718
3719 let edge_id = store.create_edge_versioned(a, b, "REL", epoch, tx_id);
3720 assert!(store.get_edge(edge_id).is_some());
3721 }
3722
3723 #[test]
3724 fn test_node_with_props_versioned() {
3725 let store = LpgStore::new();
3726
3727 let epoch = store.new_epoch();
3728 let tx_id = TxId::new(1);
3729
3730 let id = store.create_node_with_props_versioned(
3731 &["Person"],
3732 [("name", Value::from("Alice"))],
3733 epoch,
3734 tx_id,
3735 );
3736
3737 let node = store.get_node(id).unwrap();
3738 assert_eq!(
3739 node.get_property("name").and_then(|v| v.as_str()),
3740 Some("Alice")
3741 );
3742 }
3743
3744 #[test]
3745 fn test_discard_uncommitted_versions() {
3746 let store = LpgStore::new();
3747
3748 let epoch = store.new_epoch();
3749 let tx_id = TxId::new(42);
3750
3751 let node_id = store.create_node_versioned(&["Person"], epoch, tx_id);
3753 assert!(store.get_node(node_id).is_some());
3754
3755 store.discard_uncommitted_versions(tx_id);
3757
3758 assert!(store.get_node(node_id).is_none());
3760 }
3761
3762 #[test]
3765 fn test_property_index_create_and_lookup() {
3766 let store = LpgStore::new();
3767
3768 let alice = store.create_node(&["Person"]);
3770 let bob = store.create_node(&["Person"]);
3771 let charlie = store.create_node(&["Person"]);
3772
3773 store.set_node_property(alice, "city", Value::from("NYC"));
3774 store.set_node_property(bob, "city", Value::from("NYC"));
3775 store.set_node_property(charlie, "city", Value::from("LA"));
3776
3777 let nyc_people = store.find_nodes_by_property("city", &Value::from("NYC"));
3779 assert_eq!(nyc_people.len(), 2);
3780
3781 store.create_property_index("city");
3783 assert!(store.has_property_index("city"));
3784
3785 let nyc_people = store.find_nodes_by_property("city", &Value::from("NYC"));
3787 assert_eq!(nyc_people.len(), 2);
3788 assert!(nyc_people.contains(&alice));
3789 assert!(nyc_people.contains(&bob));
3790
3791 let la_people = store.find_nodes_by_property("city", &Value::from("LA"));
3792 assert_eq!(la_people.len(), 1);
3793 assert!(la_people.contains(&charlie));
3794 }
3795
3796 #[test]
3797 fn test_property_index_maintained_on_update() {
3798 let store = LpgStore::new();
3799
3800 store.create_property_index("status");
3802
3803 let node = store.create_node(&["Task"]);
3804 store.set_node_property(node, "status", Value::from("pending"));
3805
3806 let pending = store.find_nodes_by_property("status", &Value::from("pending"));
3808 assert_eq!(pending.len(), 1);
3809 assert!(pending.contains(&node));
3810
3811 store.set_node_property(node, "status", Value::from("done"));
3813
3814 let pending = store.find_nodes_by_property("status", &Value::from("pending"));
3816 assert!(pending.is_empty());
3817
3818 let done = store.find_nodes_by_property("status", &Value::from("done"));
3820 assert_eq!(done.len(), 1);
3821 assert!(done.contains(&node));
3822 }
3823
3824 #[test]
3825 fn test_property_index_maintained_on_remove() {
3826 let store = LpgStore::new();
3827
3828 store.create_property_index("tag");
3829
3830 let node = store.create_node(&["Item"]);
3831 store.set_node_property(node, "tag", Value::from("important"));
3832
3833 let found = store.find_nodes_by_property("tag", &Value::from("important"));
3835 assert_eq!(found.len(), 1);
3836
3837 store.remove_node_property(node, "tag");
3839
3840 let found = store.find_nodes_by_property("tag", &Value::from("important"));
3842 assert!(found.is_empty());
3843 }
3844
3845 #[test]
3846 fn test_property_index_drop() {
3847 let store = LpgStore::new();
3848
3849 store.create_property_index("key");
3850 assert!(store.has_property_index("key"));
3851
3852 assert!(store.drop_property_index("key"));
3853 assert!(!store.has_property_index("key"));
3854
3855 assert!(!store.drop_property_index("key"));
3857 }
3858
3859 #[test]
3860 fn test_property_index_multiple_values() {
3861 let store = LpgStore::new();
3862
3863 store.create_property_index("age");
3864
3865 let n1 = store.create_node(&["Person"]);
3867 let n2 = store.create_node(&["Person"]);
3868 let n3 = store.create_node(&["Person"]);
3869 let n4 = store.create_node(&["Person"]);
3870
3871 store.set_node_property(n1, "age", Value::from(25i64));
3872 store.set_node_property(n2, "age", Value::from(25i64));
3873 store.set_node_property(n3, "age", Value::from(30i64));
3874 store.set_node_property(n4, "age", Value::from(25i64));
3875
3876 let age_25 = store.find_nodes_by_property("age", &Value::from(25i64));
3877 assert_eq!(age_25.len(), 3);
3878
3879 let age_30 = store.find_nodes_by_property("age", &Value::from(30i64));
3880 assert_eq!(age_30.len(), 1);
3881
3882 let age_40 = store.find_nodes_by_property("age", &Value::from(40i64));
3883 assert!(age_40.is_empty());
3884 }
3885
3886 #[test]
3887 fn test_property_index_builds_from_existing_data() {
3888 let store = LpgStore::new();
3889
3890 let n1 = store.create_node(&["Person"]);
3892 let n2 = store.create_node(&["Person"]);
3893 store.set_node_property(n1, "email", Value::from("alice@example.com"));
3894 store.set_node_property(n2, "email", Value::from("bob@example.com"));
3895
3896 store.create_property_index("email");
3898
3899 let alice = store.find_nodes_by_property("email", &Value::from("alice@example.com"));
3901 assert_eq!(alice.len(), 1);
3902 assert!(alice.contains(&n1));
3903
3904 let bob = store.find_nodes_by_property("email", &Value::from("bob@example.com"));
3905 assert_eq!(bob.len(), 1);
3906 assert!(bob.contains(&n2));
3907 }
3908
3909 #[test]
3910 fn test_get_node_property_batch() {
3911 let store = LpgStore::new();
3912
3913 let n1 = store.create_node(&["Person"]);
3914 let n2 = store.create_node(&["Person"]);
3915 let n3 = store.create_node(&["Person"]);
3916
3917 store.set_node_property(n1, "age", Value::from(25i64));
3918 store.set_node_property(n2, "age", Value::from(30i64));
3919 let age_key = PropertyKey::new("age");
3922 let values = store.get_node_property_batch(&[n1, n2, n3], &age_key);
3923
3924 assert_eq!(values.len(), 3);
3925 assert_eq!(values[0], Some(Value::from(25i64)));
3926 assert_eq!(values[1], Some(Value::from(30i64)));
3927 assert_eq!(values[2], None);
3928 }
3929
3930 #[test]
3931 fn test_get_node_property_batch_empty() {
3932 let store = LpgStore::new();
3933 let key = PropertyKey::new("any");
3934
3935 let values = store.get_node_property_batch(&[], &key);
3936 assert!(values.is_empty());
3937 }
3938
3939 #[test]
3940 fn test_get_nodes_properties_batch() {
3941 let store = LpgStore::new();
3942
3943 let n1 = store.create_node(&["Person"]);
3944 let n2 = store.create_node(&["Person"]);
3945 let n3 = store.create_node(&["Person"]);
3946
3947 store.set_node_property(n1, "name", Value::from("Alice"));
3948 store.set_node_property(n1, "age", Value::from(25i64));
3949 store.set_node_property(n2, "name", Value::from("Bob"));
3950 let all_props = store.get_nodes_properties_batch(&[n1, n2, n3]);
3953
3954 assert_eq!(all_props.len(), 3);
3955 assert_eq!(all_props[0].len(), 2); assert_eq!(all_props[1].len(), 1); assert_eq!(all_props[2].len(), 0); assert_eq!(
3960 all_props[0].get(&PropertyKey::new("name")),
3961 Some(&Value::from("Alice"))
3962 );
3963 assert_eq!(
3964 all_props[1].get(&PropertyKey::new("name")),
3965 Some(&Value::from("Bob"))
3966 );
3967 }
3968
3969 #[test]
3970 fn test_get_nodes_properties_batch_empty() {
3971 let store = LpgStore::new();
3972
3973 let all_props = store.get_nodes_properties_batch(&[]);
3974 assert!(all_props.is_empty());
3975 }
3976
3977 #[test]
3978 fn test_get_nodes_properties_selective_batch() {
3979 let store = LpgStore::new();
3980
3981 let n1 = store.create_node(&["Person"]);
3982 let n2 = store.create_node(&["Person"]);
3983
3984 store.set_node_property(n1, "name", Value::from("Alice"));
3986 store.set_node_property(n1, "age", Value::from(25i64));
3987 store.set_node_property(n1, "email", Value::from("alice@example.com"));
3988 store.set_node_property(n2, "name", Value::from("Bob"));
3989 store.set_node_property(n2, "age", Value::from(30i64));
3990 store.set_node_property(n2, "city", Value::from("NYC"));
3991
3992 let keys = vec![PropertyKey::new("name"), PropertyKey::new("age")];
3994 let props = store.get_nodes_properties_selective_batch(&[n1, n2], &keys);
3995
3996 assert_eq!(props.len(), 2);
3997
3998 assert_eq!(props[0].len(), 2);
4000 assert_eq!(
4001 props[0].get(&PropertyKey::new("name")),
4002 Some(&Value::from("Alice"))
4003 );
4004 assert_eq!(
4005 props[0].get(&PropertyKey::new("age")),
4006 Some(&Value::from(25i64))
4007 );
4008 assert_eq!(props[0].get(&PropertyKey::new("email")), None);
4009
4010 assert_eq!(props[1].len(), 2);
4012 assert_eq!(
4013 props[1].get(&PropertyKey::new("name")),
4014 Some(&Value::from("Bob"))
4015 );
4016 assert_eq!(
4017 props[1].get(&PropertyKey::new("age")),
4018 Some(&Value::from(30i64))
4019 );
4020 assert_eq!(props[1].get(&PropertyKey::new("city")), None);
4021 }
4022
4023 #[test]
4024 fn test_get_nodes_properties_selective_batch_empty_keys() {
4025 let store = LpgStore::new();
4026
4027 let n1 = store.create_node(&["Person"]);
4028 store.set_node_property(n1, "name", Value::from("Alice"));
4029
4030 let props = store.get_nodes_properties_selective_batch(&[n1], &[]);
4032
4033 assert_eq!(props.len(), 1);
4034 assert!(props[0].is_empty()); }
4036
4037 #[test]
4038 fn test_get_nodes_properties_selective_batch_missing_keys() {
4039 let store = LpgStore::new();
4040
4041 let n1 = store.create_node(&["Person"]);
4042 store.set_node_property(n1, "name", Value::from("Alice"));
4043
4044 let keys = vec![PropertyKey::new("nonexistent"), PropertyKey::new("name")];
4046 let props = store.get_nodes_properties_selective_batch(&[n1], &keys);
4047
4048 assert_eq!(props.len(), 1);
4049 assert_eq!(props[0].len(), 1); assert_eq!(
4051 props[0].get(&PropertyKey::new("name")),
4052 Some(&Value::from("Alice"))
4053 );
4054 }
4055
4056 #[test]
4059 fn test_find_nodes_in_range_inclusive() {
4060 let store = LpgStore::new();
4061
4062 let n1 = store.create_node_with_props(&["Person"], [("age", Value::from(20i64))]);
4063 let n2 = store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
4064 let n3 = store.create_node_with_props(&["Person"], [("age", Value::from(40i64))]);
4065 let _n4 = store.create_node_with_props(&["Person"], [("age", Value::from(50i64))]);
4066
4067 let result = store.find_nodes_in_range(
4069 "age",
4070 Some(&Value::from(20i64)),
4071 Some(&Value::from(40i64)),
4072 true,
4073 true,
4074 );
4075 assert_eq!(result.len(), 3);
4076 assert!(result.contains(&n1));
4077 assert!(result.contains(&n2));
4078 assert!(result.contains(&n3));
4079 }
4080
4081 #[test]
4082 fn test_find_nodes_in_range_exclusive() {
4083 let store = LpgStore::new();
4084
4085 store.create_node_with_props(&["Person"], [("age", Value::from(20i64))]);
4086 let n2 = store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
4087 store.create_node_with_props(&["Person"], [("age", Value::from(40i64))]);
4088
4089 let result = store.find_nodes_in_range(
4091 "age",
4092 Some(&Value::from(20i64)),
4093 Some(&Value::from(40i64)),
4094 false,
4095 false,
4096 );
4097 assert_eq!(result.len(), 1);
4098 assert!(result.contains(&n2));
4099 }
4100
4101 #[test]
4102 fn test_find_nodes_in_range_open_ended() {
4103 let store = LpgStore::new();
4104
4105 store.create_node_with_props(&["Person"], [("age", Value::from(20i64))]);
4106 store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
4107 let n3 = store.create_node_with_props(&["Person"], [("age", Value::from(40i64))]);
4108 let n4 = store.create_node_with_props(&["Person"], [("age", Value::from(50i64))]);
4109
4110 let result = store.find_nodes_in_range("age", Some(&Value::from(35i64)), None, true, true);
4112 assert_eq!(result.len(), 2);
4113 assert!(result.contains(&n3));
4114 assert!(result.contains(&n4));
4115
4116 let result = store.find_nodes_in_range("age", None, Some(&Value::from(25i64)), true, true);
4118 assert_eq!(result.len(), 1);
4119 }
4120
4121 #[test]
4122 fn test_find_nodes_in_range_empty_result() {
4123 let store = LpgStore::new();
4124
4125 store.create_node_with_props(&["Person"], [("age", Value::from(20i64))]);
4126
4127 let result = store.find_nodes_in_range(
4129 "age",
4130 Some(&Value::from(100i64)),
4131 Some(&Value::from(200i64)),
4132 true,
4133 true,
4134 );
4135 assert!(result.is_empty());
4136 }
4137
4138 #[test]
4139 fn test_find_nodes_in_range_nonexistent_property() {
4140 let store = LpgStore::new();
4141
4142 store.create_node_with_props(&["Person"], [("age", Value::from(20i64))]);
4143
4144 let result = store.find_nodes_in_range(
4145 "weight",
4146 Some(&Value::from(50i64)),
4147 Some(&Value::from(100i64)),
4148 true,
4149 true,
4150 );
4151 assert!(result.is_empty());
4152 }
4153
4154 #[test]
4157 fn test_find_nodes_by_properties_multiple_conditions() {
4158 let store = LpgStore::new();
4159
4160 let alice = store.create_node_with_props(
4161 &["Person"],
4162 [("name", Value::from("Alice")), ("city", Value::from("NYC"))],
4163 );
4164 store.create_node_with_props(
4165 &["Person"],
4166 [("name", Value::from("Bob")), ("city", Value::from("NYC"))],
4167 );
4168 store.create_node_with_props(
4169 &["Person"],
4170 [("name", Value::from("Alice")), ("city", Value::from("LA"))],
4171 );
4172
4173 let result = store.find_nodes_by_properties(&[
4175 ("name", Value::from("Alice")),
4176 ("city", Value::from("NYC")),
4177 ]);
4178 assert_eq!(result.len(), 1);
4179 assert!(result.contains(&alice));
4180 }
4181
4182 #[test]
4183 fn test_find_nodes_by_properties_empty_conditions() {
4184 let store = LpgStore::new();
4185
4186 store.create_node(&["Person"]);
4187 store.create_node(&["Person"]);
4188
4189 let result = store.find_nodes_by_properties(&[]);
4191 assert_eq!(result.len(), 2);
4192 }
4193
4194 #[test]
4195 fn test_find_nodes_by_properties_no_match() {
4196 let store = LpgStore::new();
4197
4198 store.create_node_with_props(&["Person"], [("name", Value::from("Alice"))]);
4199
4200 let result = store.find_nodes_by_properties(&[("name", Value::from("Nobody"))]);
4201 assert!(result.is_empty());
4202 }
4203
4204 #[test]
4205 fn test_find_nodes_by_properties_with_index() {
4206 let store = LpgStore::new();
4207
4208 store.create_property_index("name");
4210
4211 let alice = store.create_node_with_props(
4212 &["Person"],
4213 [("name", Value::from("Alice")), ("age", Value::from(30i64))],
4214 );
4215 store.create_node_with_props(
4216 &["Person"],
4217 [("name", Value::from("Bob")), ("age", Value::from(30i64))],
4218 );
4219
4220 let result = store.find_nodes_by_properties(&[
4222 ("name", Value::from("Alice")),
4223 ("age", Value::from(30i64)),
4224 ]);
4225 assert_eq!(result.len(), 1);
4226 assert!(result.contains(&alice));
4227 }
4228
4229 #[test]
4232 fn test_estimate_label_cardinality() {
4233 let store = LpgStore::new();
4234
4235 store.create_node(&["Person"]);
4236 store.create_node(&["Person"]);
4237 store.create_node(&["Animal"]);
4238
4239 store.ensure_statistics_fresh();
4240
4241 let person_est = store.estimate_label_cardinality("Person");
4242 let animal_est = store.estimate_label_cardinality("Animal");
4243 let unknown_est = store.estimate_label_cardinality("Unknown");
4244
4245 assert!(
4246 person_est >= 1.0,
4247 "Person should have cardinality >= 1, got {person_est}"
4248 );
4249 assert!(
4250 animal_est >= 1.0,
4251 "Animal should have cardinality >= 1, got {animal_est}"
4252 );
4253 assert!(unknown_est >= 0.0);
4255 }
4256
4257 #[test]
4258 fn test_estimate_avg_degree() {
4259 let store = LpgStore::new();
4260
4261 let a = store.create_node(&["Person"]);
4262 let b = store.create_node(&["Person"]);
4263 let c = store.create_node(&["Person"]);
4264
4265 store.create_edge(a, b, "KNOWS");
4266 store.create_edge(a, c, "KNOWS");
4267 store.create_edge(b, c, "KNOWS");
4268
4269 store.ensure_statistics_fresh();
4270
4271 let outgoing = store.estimate_avg_degree("KNOWS", true);
4272 let incoming = store.estimate_avg_degree("KNOWS", false);
4273
4274 assert!(
4275 outgoing > 0.0,
4276 "Outgoing degree should be > 0, got {outgoing}"
4277 );
4278 assert!(
4279 incoming > 0.0,
4280 "Incoming degree should be > 0, got {incoming}"
4281 );
4282 }
4283
4284 #[test]
4287 fn test_delete_node_does_not_cascade() {
4288 let store = LpgStore::new();
4289
4290 let a = store.create_node(&["A"]);
4291 let b = store.create_node(&["B"]);
4292 let e = store.create_edge(a, b, "KNOWS");
4293
4294 assert!(store.delete_node(a));
4295 assert!(store.get_node(a).is_none());
4296
4297 assert!(
4299 store.get_edge(e).is_some(),
4300 "Edge should survive non-detach node delete"
4301 );
4302 }
4303
4304 #[test]
4305 fn test_delete_already_deleted_node() {
4306 let store = LpgStore::new();
4307 let a = store.create_node(&["A"]);
4308
4309 assert!(store.delete_node(a));
4310 assert!(!store.delete_node(a));
4312 }
4313
4314 #[test]
4315 fn test_delete_nonexistent_node() {
4316 let store = LpgStore::new();
4317 assert!(!store.delete_node(NodeId::new(999)));
4318 }
4319}