1use super::property::CompareOp;
13use super::{Edge, EdgeRecord, Node, NodeRecord, PropertyStorage};
14use crate::graph::Direction;
15use crate::index::adjacency::ChunkedAdjacency;
16use crate::index::zone_map::ZoneMapEntry;
17use crate::statistics::{EdgeTypeStatistics, LabelStatistics, Statistics};
18use arcstr::ArcStr;
19use dashmap::DashMap;
20#[cfg(not(feature = "tiered-storage"))]
21use grafeo_common::mvcc::VersionChain;
22use grafeo_common::types::{EdgeId, EpochId, HashableValue, NodeId, PropertyKey, TxId, Value};
23use grafeo_common::utils::hash::{FxHashMap, FxHashSet};
24use parking_lot::RwLock;
25use std::cmp::Ordering as CmpOrdering;
26#[cfg(any(
27 feature = "tiered-storage",
28 feature = "vector-index",
29 feature = "text-index"
30))]
31use std::sync::Arc;
32use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
33
34#[cfg(feature = "vector-index")]
35use crate::index::vector::HnswIndex;
36
37fn compare_values_for_range(a: &Value, b: &Value) -> Option<CmpOrdering> {
39 match (a, b) {
40 (Value::Int64(a), Value::Int64(b)) => Some(a.cmp(b)),
41 (Value::Float64(a), Value::Float64(b)) => a.partial_cmp(b),
42 (Value::String(a), Value::String(b)) => Some(a.cmp(b)),
43 (Value::Bool(a), Value::Bool(b)) => Some(a.cmp(b)),
44 _ => None,
45 }
46}
47
48fn value_in_range(
50 value: &Value,
51 min: Option<&Value>,
52 max: Option<&Value>,
53 min_inclusive: bool,
54 max_inclusive: bool,
55) -> bool {
56 if let Some(min_val) = min {
58 match compare_values_for_range(value, min_val) {
59 Some(CmpOrdering::Less) => return false,
60 Some(CmpOrdering::Equal) if !min_inclusive => return false,
61 None => return false, _ => {}
63 }
64 }
65
66 if let Some(max_val) = max {
68 match compare_values_for_range(value, max_val) {
69 Some(CmpOrdering::Greater) => return false,
70 Some(CmpOrdering::Equal) if !max_inclusive => return false,
71 None => return false,
72 _ => {}
73 }
74 }
75
76 true
77}
78
79#[cfg(feature = "tiered-storage")]
81use crate::storage::EpochStore;
82#[cfg(feature = "tiered-storage")]
83use grafeo_common::memory::arena::ArenaAllocator;
84#[cfg(feature = "tiered-storage")]
85use grafeo_common::mvcc::{ColdVersionRef, HotVersionRef, VersionIndex, VersionRef};
86
87#[derive(Debug, Clone)]
93pub struct LpgStoreConfig {
94 pub backward_edges: bool,
97 pub initial_node_capacity: usize,
99 pub initial_edge_capacity: usize,
101}
102
103impl Default for LpgStoreConfig {
104 fn default() -> Self {
105 Self {
106 backward_edges: true,
107 initial_node_capacity: 1024,
108 initial_edge_capacity: 4096,
109 }
110 }
111}
112
113pub struct LpgStore {
172 #[allow(dead_code)]
174 config: LpgStoreConfig,
175
176 #[cfg(not(feature = "tiered-storage"))]
180 nodes: RwLock<FxHashMap<NodeId, VersionChain<NodeRecord>>>,
181
182 #[cfg(not(feature = "tiered-storage"))]
186 edges: RwLock<FxHashMap<EdgeId, VersionChain<EdgeRecord>>>,
187
188 #[cfg(feature = "tiered-storage")]
202 arena_allocator: Arc<ArenaAllocator>,
203
204 #[cfg(feature = "tiered-storage")]
208 node_versions: RwLock<FxHashMap<NodeId, VersionIndex>>,
209
210 #[cfg(feature = "tiered-storage")]
214 edge_versions: RwLock<FxHashMap<EdgeId, VersionIndex>>,
215
216 #[cfg(feature = "tiered-storage")]
219 epoch_store: Arc<EpochStore>,
220
221 node_properties: PropertyStorage<NodeId>,
223
224 edge_properties: PropertyStorage<EdgeId>,
226
227 label_to_id: RwLock<FxHashMap<ArcStr, u32>>,
230
231 id_to_label: RwLock<Vec<ArcStr>>,
234
235 edge_type_to_id: RwLock<FxHashMap<ArcStr, u32>>,
238
239 id_to_edge_type: RwLock<Vec<ArcStr>>,
242
243 forward_adj: ChunkedAdjacency,
245
246 backward_adj: Option<ChunkedAdjacency>,
249
250 label_index: RwLock<Vec<FxHashMap<NodeId, ()>>>,
253
254 node_labels: RwLock<FxHashMap<NodeId, FxHashSet<u32>>>,
258
259 property_indexes: RwLock<FxHashMap<PropertyKey, DashMap<HashableValue, FxHashSet<NodeId>>>>,
265
266 #[cfg(feature = "vector-index")]
271 vector_indexes: RwLock<FxHashMap<String, Arc<HnswIndex>>>,
272
273 #[cfg(feature = "text-index")]
278 text_indexes: RwLock<FxHashMap<String, Arc<RwLock<crate::index::text::InvertedIndex>>>>,
279
280 next_node_id: AtomicU64,
282
283 next_edge_id: AtomicU64,
285
286 current_epoch: AtomicU64,
288
289 statistics: RwLock<Statistics>,
292
293 needs_stats_recompute: AtomicBool,
295}
296
297impl LpgStore {
298 #[must_use]
300 pub fn new() -> Self {
301 Self::with_config(LpgStoreConfig::default())
302 }
303
304 #[must_use]
306 pub fn with_config(config: LpgStoreConfig) -> Self {
307 let backward_adj = if config.backward_edges {
308 Some(ChunkedAdjacency::new())
309 } else {
310 None
311 };
312
313 Self {
314 #[cfg(not(feature = "tiered-storage"))]
315 nodes: RwLock::new(FxHashMap::default()),
316 #[cfg(not(feature = "tiered-storage"))]
317 edges: RwLock::new(FxHashMap::default()),
318 #[cfg(feature = "tiered-storage")]
319 arena_allocator: Arc::new(ArenaAllocator::new()),
320 #[cfg(feature = "tiered-storage")]
321 node_versions: RwLock::new(FxHashMap::default()),
322 #[cfg(feature = "tiered-storage")]
323 edge_versions: RwLock::new(FxHashMap::default()),
324 #[cfg(feature = "tiered-storage")]
325 epoch_store: Arc::new(EpochStore::new()),
326 node_properties: PropertyStorage::new(),
327 edge_properties: PropertyStorage::new(),
328 label_to_id: RwLock::new(FxHashMap::default()),
329 id_to_label: RwLock::new(Vec::new()),
330 edge_type_to_id: RwLock::new(FxHashMap::default()),
331 id_to_edge_type: RwLock::new(Vec::new()),
332 forward_adj: ChunkedAdjacency::new(),
333 backward_adj,
334 label_index: RwLock::new(Vec::new()),
335 node_labels: RwLock::new(FxHashMap::default()),
336 property_indexes: RwLock::new(FxHashMap::default()),
337 #[cfg(feature = "vector-index")]
338 vector_indexes: RwLock::new(FxHashMap::default()),
339 #[cfg(feature = "text-index")]
340 text_indexes: RwLock::new(FxHashMap::default()),
341 next_node_id: AtomicU64::new(0),
342 next_edge_id: AtomicU64::new(0),
343 current_epoch: AtomicU64::new(0),
344 statistics: RwLock::new(Statistics::new()),
345 needs_stats_recompute: AtomicBool::new(true),
346 config,
347 }
348 }
349
350 #[must_use]
352 pub fn current_epoch(&self) -> EpochId {
353 EpochId::new(self.current_epoch.load(Ordering::Acquire))
354 }
355
356 pub fn new_epoch(&self) -> EpochId {
358 let id = self.current_epoch.fetch_add(1, Ordering::AcqRel) + 1;
359 EpochId::new(id)
360 }
361
362 pub fn create_node(&self, labels: &[&str]) -> NodeId {
368 self.needs_stats_recompute.store(true, Ordering::Relaxed);
369 self.create_node_versioned(labels, self.current_epoch(), TxId::SYSTEM)
370 }
371
372 #[cfg(not(feature = "tiered-storage"))]
374 pub fn create_node_versioned(&self, labels: &[&str], epoch: EpochId, tx_id: TxId) -> NodeId {
375 let id = NodeId::new(self.next_node_id.fetch_add(1, Ordering::Relaxed));
376
377 let mut record = NodeRecord::new(id, epoch);
378 record.set_label_count(labels.len() as u16);
379
380 let mut node_label_set = FxHashSet::default();
382 for label in labels {
383 let label_id = self.get_or_create_label_id(*label);
384 node_label_set.insert(label_id);
385
386 let mut index = self.label_index.write();
388 while index.len() <= label_id as usize {
389 index.push(FxHashMap::default());
390 }
391 index[label_id as usize].insert(id, ());
392 }
393
394 self.node_labels.write().insert(id, node_label_set);
396
397 let chain = VersionChain::with_initial(record, epoch, tx_id);
399 self.nodes.write().insert(id, chain);
400 id
401 }
402
403 #[cfg(feature = "tiered-storage")]
406 pub fn create_node_versioned(&self, labels: &[&str], epoch: EpochId, tx_id: TxId) -> NodeId {
407 let id = NodeId::new(self.next_node_id.fetch_add(1, Ordering::Relaxed));
408
409 let mut record = NodeRecord::new(id, epoch);
410 record.set_label_count(labels.len() as u16);
411
412 let mut node_label_set = FxHashSet::default();
414 for label in labels {
415 let label_id = self.get_or_create_label_id(*label);
416 node_label_set.insert(label_id);
417
418 let mut index = self.label_index.write();
420 while index.len() <= label_id as usize {
421 index.push(FxHashMap::default());
422 }
423 index[label_id as usize].insert(id, ());
424 }
425
426 self.node_labels.write().insert(id, node_label_set);
428
429 let arena = self.arena_allocator.arena_or_create(epoch);
431 let (offset, _stored) = arena.alloc_value_with_offset(record);
432
433 let hot_ref = HotVersionRef::new(epoch, offset, tx_id);
435
436 let mut versions = self.node_versions.write();
438 if let Some(index) = versions.get_mut(&id) {
439 index.add_hot(hot_ref);
440 } else {
441 versions.insert(id, VersionIndex::with_initial(hot_ref));
442 }
443
444 id
445 }
446
447 pub fn create_node_with_props(
449 &self,
450 labels: &[&str],
451 properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
452 ) -> NodeId {
453 self.create_node_with_props_versioned(
454 labels,
455 properties,
456 self.current_epoch(),
457 TxId::SYSTEM,
458 )
459 }
460
461 #[cfg(not(feature = "tiered-storage"))]
463 pub fn create_node_with_props_versioned(
464 &self,
465 labels: &[&str],
466 properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
467 epoch: EpochId,
468 tx_id: TxId,
469 ) -> NodeId {
470 let id = self.create_node_versioned(labels, epoch, tx_id);
471
472 for (key, value) in properties {
473 let prop_key: PropertyKey = key.into();
474 let prop_value: Value = value.into();
475 self.update_property_index_on_set(id, &prop_key, &prop_value);
477 self.node_properties.set(id, prop_key, prop_value);
478 }
479
480 let count = self.node_properties.get_all(id).len() as u16;
482 if let Some(chain) = self.nodes.write().get_mut(&id)
483 && let Some(record) = chain.latest_mut()
484 {
485 record.props_count = count;
486 }
487
488 id
489 }
490
491 #[cfg(feature = "tiered-storage")]
494 pub fn create_node_with_props_versioned(
495 &self,
496 labels: &[&str],
497 properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
498 epoch: EpochId,
499 tx_id: TxId,
500 ) -> NodeId {
501 let id = self.create_node_versioned(labels, epoch, tx_id);
502
503 for (key, value) in properties {
504 let prop_key: PropertyKey = key.into();
505 let prop_value: Value = value.into();
506 self.update_property_index_on_set(id, &prop_key, &prop_value);
508 self.node_properties.set(id, prop_key, prop_value);
509 }
510
511 id
515 }
516
517 #[must_use]
519 pub fn get_node(&self, id: NodeId) -> Option<Node> {
520 self.get_node_at_epoch(id, self.current_epoch())
521 }
522
523 #[must_use]
525 #[cfg(not(feature = "tiered-storage"))]
526 pub fn get_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> Option<Node> {
527 let nodes = self.nodes.read();
528 let chain = nodes.get(&id)?;
529 let record = chain.visible_at(epoch)?;
530
531 if record.is_deleted() {
532 return None;
533 }
534
535 let mut node = Node::new(id);
536
537 let id_to_label = self.id_to_label.read();
539 let node_labels = self.node_labels.read();
540 if let Some(label_ids) = node_labels.get(&id) {
541 for &label_id in label_ids {
542 if let Some(label) = id_to_label.get(label_id as usize) {
543 node.labels.push(label.clone());
544 }
545 }
546 }
547
548 node.properties = self.node_properties.get_all(id).into_iter().collect();
550
551 Some(node)
552 }
553
554 #[must_use]
557 #[cfg(feature = "tiered-storage")]
558 pub fn get_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> Option<Node> {
559 let versions = self.node_versions.read();
560 let index = versions.get(&id)?;
561 let version_ref = index.visible_at(epoch)?;
562
563 let record = self.read_node_record(&version_ref)?;
565
566 if record.is_deleted() {
567 return None;
568 }
569
570 let mut node = Node::new(id);
571
572 let id_to_label = self.id_to_label.read();
574 let node_labels = self.node_labels.read();
575 if let Some(label_ids) = node_labels.get(&id) {
576 for &label_id in label_ids {
577 if let Some(label) = id_to_label.get(label_id as usize) {
578 node.labels.push(label.clone());
579 }
580 }
581 }
582
583 node.properties = self.node_properties.get_all(id).into_iter().collect();
585
586 Some(node)
587 }
588
589 #[must_use]
591 #[cfg(not(feature = "tiered-storage"))]
592 pub fn get_node_versioned(&self, id: NodeId, epoch: EpochId, tx_id: TxId) -> Option<Node> {
593 let nodes = self.nodes.read();
594 let chain = nodes.get(&id)?;
595 let record = chain.visible_to(epoch, tx_id)?;
596
597 if record.is_deleted() {
598 return None;
599 }
600
601 let mut node = Node::new(id);
602
603 let id_to_label = self.id_to_label.read();
605 let node_labels = self.node_labels.read();
606 if let Some(label_ids) = node_labels.get(&id) {
607 for &label_id in label_ids {
608 if let Some(label) = id_to_label.get(label_id as usize) {
609 node.labels.push(label.clone());
610 }
611 }
612 }
613
614 node.properties = self.node_properties.get_all(id).into_iter().collect();
616
617 Some(node)
618 }
619
620 #[must_use]
623 #[cfg(feature = "tiered-storage")]
624 pub fn get_node_versioned(&self, id: NodeId, epoch: EpochId, tx_id: TxId) -> Option<Node> {
625 let versions = self.node_versions.read();
626 let index = versions.get(&id)?;
627 let version_ref = index.visible_to(epoch, tx_id)?;
628
629 let record = self.read_node_record(&version_ref)?;
631
632 if record.is_deleted() {
633 return None;
634 }
635
636 let mut node = Node::new(id);
637
638 let id_to_label = self.id_to_label.read();
640 let node_labels = self.node_labels.read();
641 if let Some(label_ids) = node_labels.get(&id) {
642 for &label_id in label_ids {
643 if let Some(label) = id_to_label.get(label_id as usize) {
644 node.labels.push(label.clone());
645 }
646 }
647 }
648
649 node.properties = self.node_properties.get_all(id).into_iter().collect();
651
652 Some(node)
653 }
654
655 #[cfg(feature = "tiered-storage")]
657 #[allow(unsafe_code)]
658 fn read_node_record(&self, version_ref: &VersionRef) -> Option<NodeRecord> {
659 match version_ref {
660 VersionRef::Hot(hot_ref) => {
661 let arena = self.arena_allocator.arena(hot_ref.epoch);
662 let record: &NodeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
664 Some(*record)
665 }
666 VersionRef::Cold(cold_ref) => {
667 self.epoch_store
669 .get_node(cold_ref.epoch, cold_ref.block_offset, cold_ref.length)
670 }
671 }
672 }
673
674 pub fn delete_node(&self, id: NodeId) -> bool {
676 self.needs_stats_recompute.store(true, Ordering::Relaxed);
677 self.delete_node_at_epoch(id, self.current_epoch())
678 }
679
680 #[cfg(not(feature = "tiered-storage"))]
682 pub fn delete_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> bool {
683 let mut nodes = self.nodes.write();
684 if let Some(chain) = nodes.get_mut(&id) {
685 if let Some(record) = chain.visible_at(epoch) {
687 if record.is_deleted() {
688 return false;
689 }
690 } else {
691 return false;
693 }
694
695 chain.mark_deleted(epoch);
697
698 let mut index = self.label_index.write();
700 let mut node_labels = self.node_labels.write();
701 if let Some(label_ids) = node_labels.remove(&id) {
702 for label_id in label_ids {
703 if let Some(set) = index.get_mut(label_id as usize) {
704 set.remove(&id);
705 }
706 }
707 }
708
709 drop(nodes); drop(index);
712 drop(node_labels);
713 self.node_properties.remove_all(id);
714
715 true
718 } else {
719 false
720 }
721 }
722
723 #[cfg(feature = "tiered-storage")]
726 pub fn delete_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> bool {
727 let mut versions = self.node_versions.write();
728 if let Some(index) = versions.get_mut(&id) {
729 if let Some(version_ref) = index.visible_at(epoch) {
731 if let Some(record) = self.read_node_record(&version_ref) {
732 if record.is_deleted() {
733 return false;
734 }
735 } else {
736 return false;
737 }
738 } else {
739 return false;
740 }
741
742 index.mark_deleted(epoch);
744
745 let mut label_index = self.label_index.write();
747 let mut node_labels = self.node_labels.write();
748 if let Some(label_ids) = node_labels.remove(&id) {
749 for label_id in label_ids {
750 if let Some(set) = label_index.get_mut(label_id as usize) {
751 set.remove(&id);
752 }
753 }
754 }
755
756 drop(versions);
758 drop(label_index);
759 drop(node_labels);
760 self.node_properties.remove_all(id);
761
762 true
763 } else {
764 false
765 }
766 }
767
768 #[cfg(not(feature = "tiered-storage"))]
773 pub fn delete_node_edges(&self, node_id: NodeId) {
774 let outgoing: Vec<EdgeId> = self
776 .forward_adj
777 .edges_from(node_id)
778 .into_iter()
779 .map(|(_, edge_id)| edge_id)
780 .collect();
781
782 let incoming: Vec<EdgeId> = if let Some(ref backward) = self.backward_adj {
784 backward
785 .edges_from(node_id)
786 .into_iter()
787 .map(|(_, edge_id)| edge_id)
788 .collect()
789 } else {
790 let epoch = self.current_epoch();
792 self.edges
793 .read()
794 .iter()
795 .filter_map(|(id, chain)| {
796 chain.visible_at(epoch).and_then(|r| {
797 if !r.is_deleted() && r.dst == node_id {
798 Some(*id)
799 } else {
800 None
801 }
802 })
803 })
804 .collect()
805 };
806
807 for edge_id in outgoing.into_iter().chain(incoming) {
809 self.delete_edge(edge_id);
810 }
811 }
812
813 #[cfg(feature = "tiered-storage")]
816 pub fn delete_node_edges(&self, node_id: NodeId) {
817 let outgoing: Vec<EdgeId> = self
819 .forward_adj
820 .edges_from(node_id)
821 .into_iter()
822 .map(|(_, edge_id)| edge_id)
823 .collect();
824
825 let incoming: Vec<EdgeId> = if let Some(ref backward) = self.backward_adj {
827 backward
828 .edges_from(node_id)
829 .into_iter()
830 .map(|(_, edge_id)| edge_id)
831 .collect()
832 } else {
833 let epoch = self.current_epoch();
835 let versions = self.edge_versions.read();
836 versions
837 .iter()
838 .filter_map(|(id, index)| {
839 index.visible_at(epoch).and_then(|vref| {
840 self.read_edge_record(&vref).and_then(|r| {
841 if !r.is_deleted() && r.dst == node_id {
842 Some(*id)
843 } else {
844 None
845 }
846 })
847 })
848 })
849 .collect()
850 };
851
852 for edge_id in outgoing.into_iter().chain(incoming) {
854 self.delete_edge(edge_id);
855 }
856 }
857
858 #[cfg(not(feature = "tiered-storage"))]
860 pub fn set_node_property(&self, id: NodeId, key: &str, value: Value) {
861 let prop_key: PropertyKey = key.into();
862
863 self.update_property_index_on_set(id, &prop_key, &value);
865
866 self.node_properties.set(id, prop_key, value);
867
868 let count = self.node_properties.get_all(id).len() as u16;
870 if let Some(chain) = self.nodes.write().get_mut(&id)
871 && let Some(record) = chain.latest_mut()
872 {
873 record.props_count = count;
874 }
875 }
876
877 #[cfg(feature = "tiered-storage")]
880 pub fn set_node_property(&self, id: NodeId, key: &str, value: Value) {
881 let prop_key: PropertyKey = key.into();
882
883 self.update_property_index_on_set(id, &prop_key, &value);
885
886 self.node_properties.set(id, prop_key, value);
887 }
891
892 pub fn set_edge_property(&self, id: EdgeId, key: &str, value: Value) {
894 self.edge_properties.set(id, key.into(), value);
895 }
896
897 #[cfg(not(feature = "tiered-storage"))]
901 pub fn remove_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
902 let prop_key: PropertyKey = key.into();
903
904 self.update_property_index_on_remove(id, &prop_key);
906
907 let result = self.node_properties.remove(id, &prop_key);
908
909 let count = self.node_properties.get_all(id).len() as u16;
911 if let Some(chain) = self.nodes.write().get_mut(&id)
912 && let Some(record) = chain.latest_mut()
913 {
914 record.props_count = count;
915 }
916
917 result
918 }
919
920 #[cfg(feature = "tiered-storage")]
923 pub fn remove_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
924 let prop_key: PropertyKey = key.into();
925
926 self.update_property_index_on_remove(id, &prop_key);
928
929 self.node_properties.remove(id, &prop_key)
930 }
932
933 pub fn remove_edge_property(&self, id: EdgeId, key: &str) -> Option<Value> {
937 self.edge_properties.remove(id, &key.into())
938 }
939
940 #[must_use]
955 pub fn get_node_property(&self, id: NodeId, key: &PropertyKey) -> Option<Value> {
956 self.node_properties.get(id, key)
957 }
958
959 #[must_use]
963 pub fn get_edge_property(&self, id: EdgeId, key: &PropertyKey) -> Option<Value> {
964 self.edge_properties.get(id, key)
965 }
966
967 #[must_use]
990 pub fn get_node_property_batch(&self, ids: &[NodeId], key: &PropertyKey) -> Vec<Option<Value>> {
991 self.node_properties.get_batch(ids, key)
992 }
993
994 #[must_use]
999 pub fn get_nodes_properties_batch(&self, ids: &[NodeId]) -> Vec<FxHashMap<PropertyKey, Value>> {
1000 self.node_properties.get_all_batch(ids)
1001 }
1002
1003 #[must_use]
1031 pub fn get_nodes_properties_selective_batch(
1032 &self,
1033 ids: &[NodeId],
1034 keys: &[PropertyKey],
1035 ) -> Vec<FxHashMap<PropertyKey, Value>> {
1036 self.node_properties.get_selective_batch(ids, keys)
1037 }
1038
1039 #[must_use]
1043 pub fn get_edges_properties_selective_batch(
1044 &self,
1045 ids: &[EdgeId],
1046 keys: &[PropertyKey],
1047 ) -> Vec<FxHashMap<PropertyKey, Value>> {
1048 self.edge_properties.get_selective_batch(ids, keys)
1049 }
1050
1051 #[must_use]
1087 pub fn find_nodes_in_range(
1088 &self,
1089 property: &str,
1090 min: Option<&Value>,
1091 max: Option<&Value>,
1092 min_inclusive: bool,
1093 max_inclusive: bool,
1094 ) -> Vec<NodeId> {
1095 let key = PropertyKey::new(property);
1096
1097 if !self
1099 .node_properties
1100 .might_match_range(&key, min, max, min_inclusive, max_inclusive)
1101 {
1102 return Vec::new();
1103 }
1104
1105 self.node_ids()
1107 .into_iter()
1108 .filter(|&node_id| {
1109 self.node_properties
1110 .get(node_id, &key)
1111 .is_some_and(|v| value_in_range(&v, min, max, min_inclusive, max_inclusive))
1112 })
1113 .collect()
1114 }
1115
1116 #[must_use]
1141 pub fn find_nodes_by_properties(&self, conditions: &[(&str, Value)]) -> Vec<NodeId> {
1142 if conditions.is_empty() {
1143 return self.node_ids();
1144 }
1145
1146 let mut best_start: Option<(usize, Vec<NodeId>)> = None;
1149 let indexes = self.property_indexes.read();
1150
1151 for (i, (prop, value)) in conditions.iter().enumerate() {
1152 let key = PropertyKey::new(*prop);
1153 let hv = HashableValue::new(value.clone());
1154
1155 if let Some(index) = indexes.get(&key) {
1156 let matches: Vec<NodeId> = index
1157 .get(&hv)
1158 .map(|nodes| nodes.iter().copied().collect())
1159 .unwrap_or_default();
1160
1161 if matches.is_empty() {
1163 return Vec::new();
1164 }
1165
1166 if best_start
1168 .as_ref()
1169 .is_none_or(|(_, best)| matches.len() < best.len())
1170 {
1171 best_start = Some((i, matches));
1172 }
1173 }
1174 }
1175 drop(indexes);
1176
1177 let (start_idx, mut candidates) = best_start.unwrap_or_else(|| {
1179 let (prop, value) = &conditions[0];
1181 (0, self.find_nodes_by_property(prop, value))
1182 });
1183
1184 for (i, (prop, value)) in conditions.iter().enumerate() {
1186 if i == start_idx {
1187 continue;
1188 }
1189
1190 let key = PropertyKey::new(*prop);
1191 candidates.retain(|&node_id| {
1192 self.node_properties
1193 .get(node_id, &key)
1194 .is_some_and(|v| v == *value)
1195 });
1196
1197 if candidates.is_empty() {
1199 return Vec::new();
1200 }
1201 }
1202
1203 candidates
1204 }
1205
1206 pub fn create_property_index(&self, property: &str) {
1234 let key = PropertyKey::new(property);
1235
1236 let mut indexes = self.property_indexes.write();
1237 if indexes.contains_key(&key) {
1238 return; }
1240
1241 let index: DashMap<HashableValue, FxHashSet<NodeId>> = DashMap::new();
1243
1244 for node_id in self.node_ids() {
1246 if let Some(value) = self.node_properties.get(node_id, &key) {
1247 let hv = HashableValue::new(value);
1248 index.entry(hv).or_default().insert(node_id);
1249 }
1250 }
1251
1252 indexes.insert(key, index);
1253 }
1254
1255 pub fn drop_property_index(&self, property: &str) -> bool {
1259 let key = PropertyKey::new(property);
1260 self.property_indexes.write().remove(&key).is_some()
1261 }
1262
1263 #[must_use]
1265 pub fn has_property_index(&self, property: &str) -> bool {
1266 let key = PropertyKey::new(property);
1267 self.property_indexes.read().contains_key(&key)
1268 }
1269
1270 #[cfg(feature = "vector-index")]
1272 pub fn add_vector_index(&self, label: &str, property: &str, index: Arc<HnswIndex>) {
1273 let key = format!("{label}:{property}");
1274 self.vector_indexes.write().insert(key, index);
1275 }
1276
1277 #[cfg(feature = "vector-index")]
1279 #[must_use]
1280 pub fn get_vector_index(&self, label: &str, property: &str) -> Option<Arc<HnswIndex>> {
1281 let key = format!("{label}:{property}");
1282 self.vector_indexes.read().get(&key).cloned()
1283 }
1284
1285 #[cfg(feature = "vector-index")]
1289 pub fn remove_vector_index(&self, label: &str, property: &str) -> bool {
1290 let key = format!("{label}:{property}");
1291 self.vector_indexes.write().remove(&key).is_some()
1292 }
1293
1294 #[cfg(feature = "vector-index")]
1298 #[must_use]
1299 pub fn vector_index_entries(&self) -> Vec<(String, Arc<HnswIndex>)> {
1300 self.vector_indexes
1301 .read()
1302 .iter()
1303 .map(|(k, v)| (k.clone(), v.clone()))
1304 .collect()
1305 }
1306
1307 #[cfg(feature = "text-index")]
1309 pub fn add_text_index(
1310 &self,
1311 label: &str,
1312 property: &str,
1313 index: Arc<RwLock<crate::index::text::InvertedIndex>>,
1314 ) {
1315 let key = format!("{label}:{property}");
1316 self.text_indexes.write().insert(key, index);
1317 }
1318
1319 #[cfg(feature = "text-index")]
1321 #[must_use]
1322 pub fn get_text_index(
1323 &self,
1324 label: &str,
1325 property: &str,
1326 ) -> Option<Arc<RwLock<crate::index::text::InvertedIndex>>> {
1327 let key = format!("{label}:{property}");
1328 self.text_indexes.read().get(&key).cloned()
1329 }
1330
1331 #[cfg(feature = "text-index")]
1335 pub fn remove_text_index(&self, label: &str, property: &str) -> bool {
1336 let key = format!("{label}:{property}");
1337 self.text_indexes.write().remove(&key).is_some()
1338 }
1339
1340 #[cfg(feature = "text-index")]
1344 pub fn text_index_entries(
1345 &self,
1346 ) -> Vec<(String, Arc<RwLock<crate::index::text::InvertedIndex>>)> {
1347 self.text_indexes
1348 .read()
1349 .iter()
1350 .map(|(k, v)| (k.clone(), v.clone()))
1351 .collect()
1352 }
1353
1354 #[must_use]
1377 pub fn find_nodes_by_property(&self, property: &str, value: &Value) -> Vec<NodeId> {
1378 let key = PropertyKey::new(property);
1379 let hv = HashableValue::new(value.clone());
1380
1381 let indexes = self.property_indexes.read();
1383 if let Some(index) = indexes.get(&key) {
1384 if let Some(nodes) = index.get(&hv) {
1385 return nodes.iter().copied().collect();
1386 }
1387 return Vec::new();
1388 }
1389 drop(indexes);
1390
1391 self.node_ids()
1393 .into_iter()
1394 .filter(|&node_id| {
1395 self.node_properties
1396 .get(node_id, &key)
1397 .is_some_and(|v| v == *value)
1398 })
1399 .collect()
1400 }
1401
1402 pub fn find_nodes_matching_filter(&self, property: &str, filter_value: &Value) -> Vec<NodeId> {
1408 let key = PropertyKey::new(property);
1409 self.node_ids()
1410 .into_iter()
1411 .filter(|&node_id| {
1412 self.node_properties
1413 .get(node_id, &key)
1414 .is_some_and(|v| Self::matches_filter(&v, filter_value))
1415 })
1416 .collect()
1417 }
1418
1419 fn matches_filter(node_value: &Value, filter_value: &Value) -> bool {
1424 match filter_value {
1425 Value::Map(ops) if ops.keys().any(|k| k.as_str().starts_with('$')) => {
1426 ops.iter().all(|(op_key, op_val)| {
1427 match op_key.as_str() {
1428 "$gt" => {
1429 Self::compare_values(node_value, op_val)
1430 == Some(std::cmp::Ordering::Greater)
1431 }
1432 "$gte" => matches!(
1433 Self::compare_values(node_value, op_val),
1434 Some(std::cmp::Ordering::Greater | std::cmp::Ordering::Equal)
1435 ),
1436 "$lt" => {
1437 Self::compare_values(node_value, op_val)
1438 == Some(std::cmp::Ordering::Less)
1439 }
1440 "$lte" => matches!(
1441 Self::compare_values(node_value, op_val),
1442 Some(std::cmp::Ordering::Less | std::cmp::Ordering::Equal)
1443 ),
1444 "$ne" => node_value != op_val,
1445 "$in" => match op_val {
1446 Value::List(items) => items.iter().any(|v| v == node_value),
1447 _ => false,
1448 },
1449 "$nin" => match op_val {
1450 Value::List(items) => !items.iter().any(|v| v == node_value),
1451 _ => true,
1452 },
1453 "$contains" => match (node_value, op_val) {
1454 (Value::String(a), Value::String(b)) => a.contains(b.as_str()),
1455 _ => false,
1456 },
1457 _ => false, }
1459 })
1460 }
1461 _ => node_value == filter_value, }
1463 }
1464
1465 fn compare_values(a: &Value, b: &Value) -> Option<std::cmp::Ordering> {
1467 match (a, b) {
1468 (Value::Int64(a), Value::Int64(b)) => Some(a.cmp(b)),
1469 (Value::Float64(a), Value::Float64(b)) => a.partial_cmp(b),
1470 (Value::String(a), Value::String(b)) => Some(a.cmp(b)),
1471 (Value::Bool(a), Value::Bool(b)) => Some(a.cmp(b)),
1472 (Value::Int64(a), Value::Float64(b)) => (*a as f64).partial_cmp(b),
1473 (Value::Float64(a), Value::Int64(b)) => a.partial_cmp(&(*b as f64)),
1474 _ => None,
1475 }
1476 }
1477
1478 fn update_property_index_on_set(&self, node_id: NodeId, key: &PropertyKey, new_value: &Value) {
1480 let indexes = self.property_indexes.read();
1481 if let Some(index) = indexes.get(key) {
1482 if let Some(old_value) = self.node_properties.get(node_id, key) {
1484 let old_hv = HashableValue::new(old_value);
1485 if let Some(mut nodes) = index.get_mut(&old_hv) {
1486 nodes.remove(&node_id);
1487 if nodes.is_empty() {
1488 drop(nodes);
1489 index.remove(&old_hv);
1490 }
1491 }
1492 }
1493
1494 let new_hv = HashableValue::new(new_value.clone());
1496 index
1497 .entry(new_hv)
1498 .or_insert_with(FxHashSet::default)
1499 .insert(node_id);
1500 }
1501 }
1502
1503 fn update_property_index_on_remove(&self, node_id: NodeId, key: &PropertyKey) {
1505 let indexes = self.property_indexes.read();
1506 if let Some(index) = indexes.get(key) {
1507 if let Some(old_value) = self.node_properties.get(node_id, key) {
1509 let old_hv = HashableValue::new(old_value);
1510 if let Some(mut nodes) = index.get_mut(&old_hv) {
1511 nodes.remove(&node_id);
1512 if nodes.is_empty() {
1513 drop(nodes);
1514 index.remove(&old_hv);
1515 }
1516 }
1517 }
1518 }
1519 }
1520
1521 #[cfg(not(feature = "tiered-storage"))]
1526 pub fn add_label(&self, node_id: NodeId, label: &str) -> bool {
1527 let epoch = self.current_epoch();
1528
1529 let nodes = self.nodes.read();
1531 if let Some(chain) = nodes.get(&node_id) {
1532 if chain.visible_at(epoch).map_or(true, |r| r.is_deleted()) {
1533 return false;
1534 }
1535 } else {
1536 return false;
1537 }
1538 drop(nodes);
1539
1540 let label_id = self.get_or_create_label_id(label);
1542
1543 let mut node_labels = self.node_labels.write();
1545 let label_set = node_labels.entry(node_id).or_default();
1546
1547 if label_set.contains(&label_id) {
1548 return false; }
1550
1551 label_set.insert(label_id);
1552 drop(node_labels);
1553
1554 let mut index = self.label_index.write();
1556 if (label_id as usize) >= index.len() {
1557 index.resize(label_id as usize + 1, FxHashMap::default());
1558 }
1559 index[label_id as usize].insert(node_id, ());
1560
1561 if let Some(chain) = self.nodes.write().get_mut(&node_id)
1563 && let Some(record) = chain.latest_mut()
1564 {
1565 let count = self.node_labels.read().get(&node_id).map_or(0, |s| s.len());
1566 record.set_label_count(count as u16);
1567 }
1568
1569 true
1570 }
1571
1572 #[cfg(feature = "tiered-storage")]
1575 pub fn add_label(&self, node_id: NodeId, label: &str) -> bool {
1576 let epoch = self.current_epoch();
1577
1578 let versions = self.node_versions.read();
1580 if let Some(index) = versions.get(&node_id) {
1581 if let Some(vref) = index.visible_at(epoch) {
1582 if let Some(record) = self.read_node_record(&vref) {
1583 if record.is_deleted() {
1584 return false;
1585 }
1586 } else {
1587 return false;
1588 }
1589 } else {
1590 return false;
1591 }
1592 } else {
1593 return false;
1594 }
1595 drop(versions);
1596
1597 let label_id = self.get_or_create_label_id(label);
1599
1600 let mut node_labels = self.node_labels.write();
1602 let label_set = node_labels.entry(node_id).or_default();
1603
1604 if label_set.contains(&label_id) {
1605 return false; }
1607
1608 label_set.insert(label_id);
1609 drop(node_labels);
1610
1611 let mut index = self.label_index.write();
1613 if (label_id as usize) >= index.len() {
1614 index.resize(label_id as usize + 1, FxHashMap::default());
1615 }
1616 index[label_id as usize].insert(node_id, ());
1617
1618 true
1622 }
1623
1624 #[cfg(not(feature = "tiered-storage"))]
1629 pub fn remove_label(&self, node_id: NodeId, label: &str) -> bool {
1630 let epoch = self.current_epoch();
1631
1632 let nodes = self.nodes.read();
1634 if let Some(chain) = nodes.get(&node_id) {
1635 if chain.visible_at(epoch).map_or(true, |r| r.is_deleted()) {
1636 return false;
1637 }
1638 } else {
1639 return false;
1640 }
1641 drop(nodes);
1642
1643 let label_id = {
1645 let label_ids = self.label_to_id.read();
1646 match label_ids.get(label) {
1647 Some(&id) => id,
1648 None => return false, }
1650 };
1651
1652 let mut node_labels = self.node_labels.write();
1654 if let Some(label_set) = node_labels.get_mut(&node_id) {
1655 if !label_set.remove(&label_id) {
1656 return false; }
1658 } else {
1659 return false;
1660 }
1661 drop(node_labels);
1662
1663 let mut index = self.label_index.write();
1665 if (label_id as usize) < index.len() {
1666 index[label_id as usize].remove(&node_id);
1667 }
1668
1669 if let Some(chain) = self.nodes.write().get_mut(&node_id)
1671 && let Some(record) = chain.latest_mut()
1672 {
1673 let count = self.node_labels.read().get(&node_id).map_or(0, |s| s.len());
1674 record.set_label_count(count as u16);
1675 }
1676
1677 true
1678 }
1679
1680 #[cfg(feature = "tiered-storage")]
1683 pub fn remove_label(&self, node_id: NodeId, label: &str) -> bool {
1684 let epoch = self.current_epoch();
1685
1686 let versions = self.node_versions.read();
1688 if let Some(index) = versions.get(&node_id) {
1689 if let Some(vref) = index.visible_at(epoch) {
1690 if let Some(record) = self.read_node_record(&vref) {
1691 if record.is_deleted() {
1692 return false;
1693 }
1694 } else {
1695 return false;
1696 }
1697 } else {
1698 return false;
1699 }
1700 } else {
1701 return false;
1702 }
1703 drop(versions);
1704
1705 let label_id = {
1707 let label_ids = self.label_to_id.read();
1708 match label_ids.get(label) {
1709 Some(&id) => id,
1710 None => return false, }
1712 };
1713
1714 let mut node_labels = self.node_labels.write();
1716 if let Some(label_set) = node_labels.get_mut(&node_id) {
1717 if !label_set.remove(&label_id) {
1718 return false; }
1720 } else {
1721 return false;
1722 }
1723 drop(node_labels);
1724
1725 let mut index = self.label_index.write();
1727 if (label_id as usize) < index.len() {
1728 index[label_id as usize].remove(&node_id);
1729 }
1730
1731 true
1734 }
1735
1736 #[must_use]
1738 #[cfg(not(feature = "tiered-storage"))]
1739 pub fn node_count(&self) -> usize {
1740 let epoch = self.current_epoch();
1741 self.nodes
1742 .read()
1743 .values()
1744 .filter_map(|chain| chain.visible_at(epoch))
1745 .filter(|r| !r.is_deleted())
1746 .count()
1747 }
1748
1749 #[must_use]
1752 #[cfg(feature = "tiered-storage")]
1753 pub fn node_count(&self) -> usize {
1754 let epoch = self.current_epoch();
1755 let versions = self.node_versions.read();
1756 versions
1757 .iter()
1758 .filter(|(_, index)| {
1759 index.visible_at(epoch).map_or(false, |vref| {
1760 self.read_node_record(&vref)
1761 .map_or(false, |r| !r.is_deleted())
1762 })
1763 })
1764 .count()
1765 }
1766
1767 #[must_use]
1773 #[cfg(not(feature = "tiered-storage"))]
1774 pub fn node_ids(&self) -> Vec<NodeId> {
1775 let epoch = self.current_epoch();
1776 let mut ids: Vec<NodeId> = self
1777 .nodes
1778 .read()
1779 .iter()
1780 .filter_map(|(id, chain)| {
1781 chain
1782 .visible_at(epoch)
1783 .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
1784 })
1785 .collect();
1786 ids.sort_unstable();
1787 ids
1788 }
1789
1790 #[must_use]
1793 #[cfg(feature = "tiered-storage")]
1794 pub fn node_ids(&self) -> Vec<NodeId> {
1795 let epoch = self.current_epoch();
1796 let versions = self.node_versions.read();
1797 let mut ids: Vec<NodeId> = versions
1798 .iter()
1799 .filter_map(|(id, index)| {
1800 index.visible_at(epoch).and_then(|vref| {
1801 self.read_node_record(&vref)
1802 .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
1803 })
1804 })
1805 .collect();
1806 ids.sort_unstable();
1807 ids
1808 }
1809
1810 pub fn create_edge(&self, src: NodeId, dst: NodeId, edge_type: &str) -> EdgeId {
1814 self.needs_stats_recompute.store(true, Ordering::Relaxed);
1815 self.create_edge_versioned(src, dst, edge_type, self.current_epoch(), TxId::SYSTEM)
1816 }
1817
1818 #[cfg(not(feature = "tiered-storage"))]
1820 pub fn create_edge_versioned(
1821 &self,
1822 src: NodeId,
1823 dst: NodeId,
1824 edge_type: &str,
1825 epoch: EpochId,
1826 tx_id: TxId,
1827 ) -> EdgeId {
1828 let id = EdgeId::new(self.next_edge_id.fetch_add(1, Ordering::Relaxed));
1829 let type_id = self.get_or_create_edge_type_id(edge_type);
1830
1831 let record = EdgeRecord::new(id, src, dst, type_id, epoch);
1832 let chain = VersionChain::with_initial(record, epoch, tx_id);
1833 self.edges.write().insert(id, chain);
1834
1835 self.forward_adj.add_edge(src, dst, id);
1837 if let Some(ref backward) = self.backward_adj {
1838 backward.add_edge(dst, src, id);
1839 }
1840
1841 id
1842 }
1843
1844 #[cfg(feature = "tiered-storage")]
1847 pub fn create_edge_versioned(
1848 &self,
1849 src: NodeId,
1850 dst: NodeId,
1851 edge_type: &str,
1852 epoch: EpochId,
1853 tx_id: TxId,
1854 ) -> EdgeId {
1855 let id = EdgeId::new(self.next_edge_id.fetch_add(1, Ordering::Relaxed));
1856 let type_id = self.get_or_create_edge_type_id(edge_type);
1857
1858 let record = EdgeRecord::new(id, src, dst, type_id, epoch);
1859
1860 let arena = self.arena_allocator.arena_or_create(epoch);
1862 let (offset, _stored) = arena.alloc_value_with_offset(record);
1863
1864 let hot_ref = HotVersionRef::new(epoch, offset, tx_id);
1866
1867 let mut versions = self.edge_versions.write();
1869 if let Some(index) = versions.get_mut(&id) {
1870 index.add_hot(hot_ref);
1871 } else {
1872 versions.insert(id, VersionIndex::with_initial(hot_ref));
1873 }
1874
1875 self.forward_adj.add_edge(src, dst, id);
1877 if let Some(ref backward) = self.backward_adj {
1878 backward.add_edge(dst, src, id);
1879 }
1880
1881 id
1882 }
1883
1884 pub fn create_edge_with_props(
1886 &self,
1887 src: NodeId,
1888 dst: NodeId,
1889 edge_type: &str,
1890 properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
1891 ) -> EdgeId {
1892 let id = self.create_edge(src, dst, edge_type);
1893
1894 for (key, value) in properties {
1895 self.edge_properties.set(id, key.into(), value.into());
1896 }
1897
1898 id
1899 }
1900
1901 #[must_use]
1903 pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
1904 self.get_edge_at_epoch(id, self.current_epoch())
1905 }
1906
1907 #[must_use]
1909 #[cfg(not(feature = "tiered-storage"))]
1910 pub fn get_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> Option<Edge> {
1911 let edges = self.edges.read();
1912 let chain = edges.get(&id)?;
1913 let record = chain.visible_at(epoch)?;
1914
1915 if record.is_deleted() {
1916 return None;
1917 }
1918
1919 let edge_type = {
1920 let id_to_type = self.id_to_edge_type.read();
1921 id_to_type.get(record.type_id as usize)?.clone()
1922 };
1923
1924 let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1925
1926 edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1928
1929 Some(edge)
1930 }
1931
1932 #[must_use]
1935 #[cfg(feature = "tiered-storage")]
1936 pub fn get_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> Option<Edge> {
1937 let versions = self.edge_versions.read();
1938 let index = versions.get(&id)?;
1939 let version_ref = index.visible_at(epoch)?;
1940
1941 let record = self.read_edge_record(&version_ref)?;
1942
1943 if record.is_deleted() {
1944 return None;
1945 }
1946
1947 let edge_type = {
1948 let id_to_type = self.id_to_edge_type.read();
1949 id_to_type.get(record.type_id as usize)?.clone()
1950 };
1951
1952 let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1953
1954 edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1956
1957 Some(edge)
1958 }
1959
1960 #[must_use]
1962 #[cfg(not(feature = "tiered-storage"))]
1963 pub fn get_edge_versioned(&self, id: EdgeId, epoch: EpochId, tx_id: TxId) -> Option<Edge> {
1964 let edges = self.edges.read();
1965 let chain = edges.get(&id)?;
1966 let record = chain.visible_to(epoch, tx_id)?;
1967
1968 if record.is_deleted() {
1969 return None;
1970 }
1971
1972 let edge_type = {
1973 let id_to_type = self.id_to_edge_type.read();
1974 id_to_type.get(record.type_id as usize)?.clone()
1975 };
1976
1977 let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1978
1979 edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1981
1982 Some(edge)
1983 }
1984
1985 #[must_use]
1988 #[cfg(feature = "tiered-storage")]
1989 pub fn get_edge_versioned(&self, id: EdgeId, epoch: EpochId, tx_id: TxId) -> Option<Edge> {
1990 let versions = self.edge_versions.read();
1991 let index = versions.get(&id)?;
1992 let version_ref = index.visible_to(epoch, tx_id)?;
1993
1994 let record = self.read_edge_record(&version_ref)?;
1995
1996 if record.is_deleted() {
1997 return None;
1998 }
1999
2000 let edge_type = {
2001 let id_to_type = self.id_to_edge_type.read();
2002 id_to_type.get(record.type_id as usize)?.clone()
2003 };
2004
2005 let mut edge = Edge::new(id, record.src, record.dst, edge_type);
2006
2007 edge.properties = self.edge_properties.get_all(id).into_iter().collect();
2009
2010 Some(edge)
2011 }
2012
2013 #[cfg(feature = "tiered-storage")]
2015 #[allow(unsafe_code)]
2016 fn read_edge_record(&self, version_ref: &VersionRef) -> Option<EdgeRecord> {
2017 match version_ref {
2018 VersionRef::Hot(hot_ref) => {
2019 let arena = self.arena_allocator.arena(hot_ref.epoch);
2020 let record: &EdgeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
2022 Some(*record)
2023 }
2024 VersionRef::Cold(cold_ref) => {
2025 self.epoch_store
2027 .get_edge(cold_ref.epoch, cold_ref.block_offset, cold_ref.length)
2028 }
2029 }
2030 }
2031
2032 pub fn delete_edge(&self, id: EdgeId) -> bool {
2034 self.needs_stats_recompute.store(true, Ordering::Relaxed);
2035 self.delete_edge_at_epoch(id, self.current_epoch())
2036 }
2037
2038 #[cfg(not(feature = "tiered-storage"))]
2040 pub fn delete_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> bool {
2041 let mut edges = self.edges.write();
2042 if let Some(chain) = edges.get_mut(&id) {
2043 let (src, dst) = {
2045 match chain.visible_at(epoch) {
2046 Some(record) => {
2047 if record.is_deleted() {
2048 return false;
2049 }
2050 (record.src, record.dst)
2051 }
2052 None => return false, }
2054 };
2055
2056 chain.mark_deleted(epoch);
2058
2059 drop(edges); self.forward_adj.mark_deleted(src, id);
2063 if let Some(ref backward) = self.backward_adj {
2064 backward.mark_deleted(dst, id);
2065 }
2066
2067 self.edge_properties.remove_all(id);
2069
2070 true
2071 } else {
2072 false
2073 }
2074 }
2075
2076 #[cfg(feature = "tiered-storage")]
2079 pub fn delete_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> bool {
2080 let mut versions = self.edge_versions.write();
2081 if let Some(index) = versions.get_mut(&id) {
2082 let (src, dst) = {
2084 match index.visible_at(epoch) {
2085 Some(version_ref) => {
2086 if let Some(record) = self.read_edge_record(&version_ref) {
2087 if record.is_deleted() {
2088 return false;
2089 }
2090 (record.src, record.dst)
2091 } else {
2092 return false;
2093 }
2094 }
2095 None => return false,
2096 }
2097 };
2098
2099 index.mark_deleted(epoch);
2101
2102 drop(versions); self.forward_adj.mark_deleted(src, id);
2106 if let Some(ref backward) = self.backward_adj {
2107 backward.mark_deleted(dst, id);
2108 }
2109
2110 self.edge_properties.remove_all(id);
2112
2113 true
2114 } else {
2115 false
2116 }
2117 }
2118
2119 #[must_use]
2121 #[cfg(not(feature = "tiered-storage"))]
2122 pub fn edge_count(&self) -> usize {
2123 let epoch = self.current_epoch();
2124 self.edges
2125 .read()
2126 .values()
2127 .filter_map(|chain| chain.visible_at(epoch))
2128 .filter(|r| !r.is_deleted())
2129 .count()
2130 }
2131
2132 #[must_use]
2135 #[cfg(feature = "tiered-storage")]
2136 pub fn edge_count(&self) -> usize {
2137 let epoch = self.current_epoch();
2138 let versions = self.edge_versions.read();
2139 versions
2140 .iter()
2141 .filter(|(_, index)| {
2142 index.visible_at(epoch).map_or(false, |vref| {
2143 self.read_edge_record(&vref)
2144 .map_or(false, |r| !r.is_deleted())
2145 })
2146 })
2147 .count()
2148 }
2149
2150 #[cfg(not(feature = "tiered-storage"))]
2155 pub fn discard_uncommitted_versions(&self, tx_id: TxId) {
2156 {
2158 let mut nodes = self.nodes.write();
2159 for chain in nodes.values_mut() {
2160 chain.remove_versions_by(tx_id);
2161 }
2162 nodes.retain(|_, chain| !chain.is_empty());
2164 }
2165
2166 {
2168 let mut edges = self.edges.write();
2169 for chain in edges.values_mut() {
2170 chain.remove_versions_by(tx_id);
2171 }
2172 edges.retain(|_, chain| !chain.is_empty());
2174 }
2175 }
2176
2177 #[cfg(feature = "tiered-storage")]
2180 pub fn discard_uncommitted_versions(&self, tx_id: TxId) {
2181 {
2183 let mut versions = self.node_versions.write();
2184 for index in versions.values_mut() {
2185 index.remove_versions_by(tx_id);
2186 }
2187 versions.retain(|_, index| !index.is_empty());
2189 }
2190
2191 {
2193 let mut versions = self.edge_versions.write();
2194 for index in versions.values_mut() {
2195 index.remove_versions_by(tx_id);
2196 }
2197 versions.retain(|_, index| !index.is_empty());
2199 }
2200 }
2201
2202 #[cfg(not(feature = "tiered-storage"))]
2207 pub fn gc_versions(&self, min_epoch: EpochId) {
2208 {
2209 let mut nodes = self.nodes.write();
2210 for chain in nodes.values_mut() {
2211 chain.gc(min_epoch);
2212 }
2213 nodes.retain(|_, chain| !chain.is_empty());
2214 }
2215 {
2216 let mut edges = self.edges.write();
2217 for chain in edges.values_mut() {
2218 chain.gc(min_epoch);
2219 }
2220 edges.retain(|_, chain| !chain.is_empty());
2221 }
2222 }
2223
2224 #[cfg(feature = "tiered-storage")]
2226 pub fn gc_versions(&self, min_epoch: EpochId) {
2227 {
2228 let mut versions = self.node_versions.write();
2229 for index in versions.values_mut() {
2230 index.gc(min_epoch);
2231 }
2232 versions.retain(|_, index| !index.is_empty());
2233 }
2234 {
2235 let mut versions = self.edge_versions.write();
2236 for index in versions.values_mut() {
2237 index.gc(min_epoch);
2238 }
2239 versions.retain(|_, index| !index.is_empty());
2240 }
2241 }
2242
2243 #[cfg(feature = "tiered-storage")]
2262 #[allow(unsafe_code)]
2263 pub fn freeze_epoch(&self, epoch: EpochId) -> usize {
2264 let mut node_records: Vec<(u64, NodeRecord)> = Vec::new();
2266 let mut node_hot_refs: Vec<(NodeId, HotVersionRef)> = Vec::new();
2267
2268 {
2269 let versions = self.node_versions.read();
2270 for (node_id, index) in versions.iter() {
2271 for hot_ref in index.hot_refs_for_epoch(epoch) {
2272 let arena = self.arena_allocator.arena(hot_ref.epoch);
2273 let record: &NodeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
2275 node_records.push((node_id.as_u64(), *record));
2276 node_hot_refs.push((*node_id, *hot_ref));
2277 }
2278 }
2279 }
2280
2281 let mut edge_records: Vec<(u64, EdgeRecord)> = Vec::new();
2283 let mut edge_hot_refs: Vec<(EdgeId, HotVersionRef)> = Vec::new();
2284
2285 {
2286 let versions = self.edge_versions.read();
2287 for (edge_id, index) in versions.iter() {
2288 for hot_ref in index.hot_refs_for_epoch(epoch) {
2289 let arena = self.arena_allocator.arena(hot_ref.epoch);
2290 let record: &EdgeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
2292 edge_records.push((edge_id.as_u64(), *record));
2293 edge_hot_refs.push((*edge_id, *hot_ref));
2294 }
2295 }
2296 }
2297
2298 let total_frozen = node_records.len() + edge_records.len();
2299
2300 if total_frozen == 0 {
2301 return 0;
2302 }
2303
2304 let (node_entries, edge_entries) =
2306 self.epoch_store
2307 .freeze_epoch(epoch, node_records, edge_records);
2308
2309 let node_entry_map: FxHashMap<u64, _> = node_entries
2311 .iter()
2312 .map(|e| (e.entity_id, (e.offset, e.length)))
2313 .collect();
2314 let edge_entry_map: FxHashMap<u64, _> = edge_entries
2315 .iter()
2316 .map(|e| (e.entity_id, (e.offset, e.length)))
2317 .collect();
2318
2319 {
2321 let mut versions = self.node_versions.write();
2322 for (node_id, hot_ref) in &node_hot_refs {
2323 if let Some(index) = versions.get_mut(node_id)
2324 && let Some(&(offset, length)) = node_entry_map.get(&node_id.as_u64())
2325 {
2326 let cold_ref = ColdVersionRef {
2327 epoch,
2328 block_offset: offset,
2329 length,
2330 created_by: hot_ref.created_by,
2331 deleted_epoch: hot_ref.deleted_epoch,
2332 };
2333 index.freeze_epoch(epoch, std::iter::once(cold_ref));
2334 }
2335 }
2336 }
2337
2338 {
2339 let mut versions = self.edge_versions.write();
2340 for (edge_id, hot_ref) in &edge_hot_refs {
2341 if let Some(index) = versions.get_mut(edge_id)
2342 && let Some(&(offset, length)) = edge_entry_map.get(&edge_id.as_u64())
2343 {
2344 let cold_ref = ColdVersionRef {
2345 epoch,
2346 block_offset: offset,
2347 length,
2348 created_by: hot_ref.created_by,
2349 deleted_epoch: hot_ref.deleted_epoch,
2350 };
2351 index.freeze_epoch(epoch, std::iter::once(cold_ref));
2352 }
2353 }
2354 }
2355
2356 total_frozen
2357 }
2358
2359 #[cfg(feature = "tiered-storage")]
2361 #[must_use]
2362 pub fn epoch_store(&self) -> &EpochStore {
2363 &self.epoch_store
2364 }
2365
2366 #[must_use]
2368 pub fn label_count(&self) -> usize {
2369 self.id_to_label.read().len()
2370 }
2371
2372 #[must_use]
2376 pub fn property_key_count(&self) -> usize {
2377 let node_keys = self.node_properties.column_count();
2378 let edge_keys = self.edge_properties.column_count();
2379 node_keys + edge_keys
2383 }
2384
2385 #[must_use]
2387 pub fn edge_type_count(&self) -> usize {
2388 self.id_to_edge_type.read().len()
2389 }
2390
2391 pub fn neighbors(
2398 &self,
2399 node: NodeId,
2400 direction: Direction,
2401 ) -> impl Iterator<Item = NodeId> + '_ {
2402 let forward: Box<dyn Iterator<Item = NodeId>> = match direction {
2403 Direction::Outgoing | Direction::Both => {
2404 Box::new(self.forward_adj.neighbors(node).into_iter())
2405 }
2406 Direction::Incoming => Box::new(std::iter::empty()),
2407 };
2408
2409 let backward: Box<dyn Iterator<Item = NodeId>> = match direction {
2410 Direction::Incoming | Direction::Both => {
2411 if let Some(ref adj) = self.backward_adj {
2412 Box::new(adj.neighbors(node).into_iter())
2413 } else {
2414 Box::new(std::iter::empty())
2415 }
2416 }
2417 Direction::Outgoing => Box::new(std::iter::empty()),
2418 };
2419
2420 forward.chain(backward)
2421 }
2422
2423 pub fn edges_from(
2427 &self,
2428 node: NodeId,
2429 direction: Direction,
2430 ) -> impl Iterator<Item = (NodeId, EdgeId)> + '_ {
2431 let forward: Box<dyn Iterator<Item = (NodeId, EdgeId)>> = match direction {
2432 Direction::Outgoing | Direction::Both => {
2433 Box::new(self.forward_adj.edges_from(node).into_iter())
2434 }
2435 Direction::Incoming => Box::new(std::iter::empty()),
2436 };
2437
2438 let backward: Box<dyn Iterator<Item = (NodeId, EdgeId)>> = match direction {
2439 Direction::Incoming | Direction::Both => {
2440 if let Some(ref adj) = self.backward_adj {
2441 Box::new(adj.edges_from(node).into_iter())
2442 } else {
2443 Box::new(std::iter::empty())
2444 }
2445 }
2446 Direction::Outgoing => Box::new(std::iter::empty()),
2447 };
2448
2449 forward.chain(backward)
2450 }
2451
2452 pub fn edges_to(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
2465 if let Some(ref backward) = self.backward_adj {
2466 backward.edges_from(node)
2467 } else {
2468 self.all_edges()
2470 .filter_map(|edge| {
2471 if edge.dst == node {
2472 Some((edge.src, edge.id))
2473 } else {
2474 None
2475 }
2476 })
2477 .collect()
2478 }
2479 }
2480
2481 #[must_use]
2485 pub fn out_degree(&self, node: NodeId) -> usize {
2486 self.forward_adj.out_degree(node)
2487 }
2488
2489 #[must_use]
2494 pub fn in_degree(&self, node: NodeId) -> usize {
2495 if let Some(ref backward) = self.backward_adj {
2496 backward.in_degree(node)
2497 } else {
2498 self.all_edges().filter(|edge| edge.dst == node).count()
2500 }
2501 }
2502
2503 #[must_use]
2505 #[cfg(not(feature = "tiered-storage"))]
2506 pub fn edge_type(&self, id: EdgeId) -> Option<ArcStr> {
2507 let edges = self.edges.read();
2508 let chain = edges.get(&id)?;
2509 let epoch = self.current_epoch();
2510 let record = chain.visible_at(epoch)?;
2511 let id_to_type = self.id_to_edge_type.read();
2512 id_to_type.get(record.type_id as usize).cloned()
2513 }
2514
2515 #[must_use]
2518 #[cfg(feature = "tiered-storage")]
2519 pub fn edge_type(&self, id: EdgeId) -> Option<ArcStr> {
2520 let versions = self.edge_versions.read();
2521 let index = versions.get(&id)?;
2522 let epoch = self.current_epoch();
2523 let vref = index.visible_at(epoch)?;
2524 let record = self.read_edge_record(&vref)?;
2525 let id_to_type = self.id_to_edge_type.read();
2526 id_to_type.get(record.type_id as usize).cloned()
2527 }
2528
2529 pub fn nodes_by_label(&self, label: &str) -> Vec<NodeId> {
2535 let label_to_id = self.label_to_id.read();
2536 if let Some(&label_id) = label_to_id.get(label) {
2537 let index = self.label_index.read();
2538 if let Some(set) = index.get(label_id as usize) {
2539 let mut ids: Vec<NodeId> = set.keys().copied().collect();
2540 ids.sort_unstable();
2541 return ids;
2542 }
2543 }
2544 Vec::new()
2545 }
2546
2547 #[cfg(not(feature = "tiered-storage"))]
2554 pub fn all_nodes(&self) -> impl Iterator<Item = Node> + '_ {
2555 let epoch = self.current_epoch();
2556 let node_ids: Vec<NodeId> = self
2557 .nodes
2558 .read()
2559 .iter()
2560 .filter_map(|(id, chain)| {
2561 chain
2562 .visible_at(epoch)
2563 .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
2564 })
2565 .collect();
2566
2567 node_ids.into_iter().filter_map(move |id| self.get_node(id))
2568 }
2569
2570 #[cfg(feature = "tiered-storage")]
2573 pub fn all_nodes(&self) -> impl Iterator<Item = Node> + '_ {
2574 let node_ids = self.node_ids();
2575 node_ids.into_iter().filter_map(move |id| self.get_node(id))
2576 }
2577
2578 #[cfg(not(feature = "tiered-storage"))]
2583 pub fn all_edges(&self) -> impl Iterator<Item = Edge> + '_ {
2584 let epoch = self.current_epoch();
2585 let edge_ids: Vec<EdgeId> = self
2586 .edges
2587 .read()
2588 .iter()
2589 .filter_map(|(id, chain)| {
2590 chain
2591 .visible_at(epoch)
2592 .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
2593 })
2594 .collect();
2595
2596 edge_ids.into_iter().filter_map(move |id| self.get_edge(id))
2597 }
2598
2599 #[cfg(feature = "tiered-storage")]
2602 pub fn all_edges(&self) -> impl Iterator<Item = Edge> + '_ {
2603 let epoch = self.current_epoch();
2604 let versions = self.edge_versions.read();
2605 let edge_ids: Vec<EdgeId> = versions
2606 .iter()
2607 .filter_map(|(id, index)| {
2608 index.visible_at(epoch).and_then(|vref| {
2609 self.read_edge_record(&vref)
2610 .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
2611 })
2612 })
2613 .collect();
2614
2615 edge_ids.into_iter().filter_map(move |id| self.get_edge(id))
2616 }
2617
2618 pub fn all_labels(&self) -> Vec<String> {
2620 self.id_to_label
2621 .read()
2622 .iter()
2623 .map(|s| s.to_string())
2624 .collect()
2625 }
2626
2627 pub fn all_edge_types(&self) -> Vec<String> {
2629 self.id_to_edge_type
2630 .read()
2631 .iter()
2632 .map(|s| s.to_string())
2633 .collect()
2634 }
2635
2636 pub fn all_property_keys(&self) -> Vec<String> {
2638 let mut keys = std::collections::HashSet::new();
2639 for key in self.node_properties.keys() {
2640 keys.insert(key.to_string());
2641 }
2642 for key in self.edge_properties.keys() {
2643 keys.insert(key.to_string());
2644 }
2645 keys.into_iter().collect()
2646 }
2647
2648 pub fn nodes_with_label<'a>(&'a self, label: &str) -> impl Iterator<Item = Node> + 'a {
2650 let node_ids = self.nodes_by_label(label);
2651 node_ids.into_iter().filter_map(move |id| self.get_node(id))
2652 }
2653
2654 #[cfg(not(feature = "tiered-storage"))]
2656 pub fn edges_with_type<'a>(&'a self, edge_type: &str) -> impl Iterator<Item = Edge> + 'a {
2657 let epoch = self.current_epoch();
2658 let type_to_id = self.edge_type_to_id.read();
2659
2660 if let Some(&type_id) = type_to_id.get(edge_type) {
2661 let edge_ids: Vec<EdgeId> = self
2662 .edges
2663 .read()
2664 .iter()
2665 .filter_map(|(id, chain)| {
2666 chain.visible_at(epoch).and_then(|r| {
2667 if !r.is_deleted() && r.type_id == type_id {
2668 Some(*id)
2669 } else {
2670 None
2671 }
2672 })
2673 })
2674 .collect();
2675
2676 Box::new(edge_ids.into_iter().filter_map(move |id| self.get_edge(id)))
2678 as Box<dyn Iterator<Item = Edge> + 'a>
2679 } else {
2680 Box::new(std::iter::empty()) as Box<dyn Iterator<Item = Edge> + 'a>
2682 }
2683 }
2684
2685 #[cfg(feature = "tiered-storage")]
2688 pub fn edges_with_type<'a>(&'a self, edge_type: &str) -> impl Iterator<Item = Edge> + 'a {
2689 let epoch = self.current_epoch();
2690 let type_to_id = self.edge_type_to_id.read();
2691
2692 if let Some(&type_id) = type_to_id.get(edge_type) {
2693 let versions = self.edge_versions.read();
2694 let edge_ids: Vec<EdgeId> = versions
2695 .iter()
2696 .filter_map(|(id, index)| {
2697 index.visible_at(epoch).and_then(|vref| {
2698 self.read_edge_record(&vref).and_then(|r| {
2699 if !r.is_deleted() && r.type_id == type_id {
2700 Some(*id)
2701 } else {
2702 None
2703 }
2704 })
2705 })
2706 })
2707 .collect();
2708
2709 Box::new(edge_ids.into_iter().filter_map(move |id| self.get_edge(id)))
2710 as Box<dyn Iterator<Item = Edge> + 'a>
2711 } else {
2712 Box::new(std::iter::empty()) as Box<dyn Iterator<Item = Edge> + 'a>
2713 }
2714 }
2715
2716 #[must_use]
2723 pub fn node_property_might_match(
2724 &self,
2725 property: &PropertyKey,
2726 op: CompareOp,
2727 value: &Value,
2728 ) -> bool {
2729 self.node_properties.might_match(property, op, value)
2730 }
2731
2732 #[must_use]
2734 pub fn edge_property_might_match(
2735 &self,
2736 property: &PropertyKey,
2737 op: CompareOp,
2738 value: &Value,
2739 ) -> bool {
2740 self.edge_properties.might_match(property, op, value)
2741 }
2742
2743 #[must_use]
2745 pub fn node_property_zone_map(&self, property: &PropertyKey) -> Option<ZoneMapEntry> {
2746 self.node_properties.zone_map(property)
2747 }
2748
2749 #[must_use]
2751 pub fn edge_property_zone_map(&self, property: &PropertyKey) -> Option<ZoneMapEntry> {
2752 self.edge_properties.zone_map(property)
2753 }
2754
2755 pub fn rebuild_zone_maps(&self) {
2757 self.node_properties.rebuild_zone_maps();
2758 self.edge_properties.rebuild_zone_maps();
2759 }
2760
2761 #[must_use]
2765 pub fn statistics(&self) -> Statistics {
2766 self.statistics.read().clone()
2767 }
2768
2769 pub fn ensure_statistics_fresh(&self) {
2774 if self.needs_stats_recompute.swap(false, Ordering::Relaxed) {
2775 self.compute_statistics();
2776 }
2777 }
2778
2779 #[cfg(not(feature = "tiered-storage"))]
2784 pub fn compute_statistics(&self) {
2785 let mut stats = Statistics::new();
2786
2787 stats.total_nodes = self.node_count() as u64;
2789 stats.total_edges = self.edge_count() as u64;
2790
2791 let id_to_label = self.id_to_label.read();
2793 let label_index = self.label_index.read();
2794
2795 for (label_id, label_name) in id_to_label.iter().enumerate() {
2796 let node_count = label_index.get(label_id).map_or(0, |set| set.len() as u64);
2797
2798 if node_count > 0 {
2799 let avg_out_degree = if stats.total_nodes > 0 {
2801 stats.total_edges as f64 / stats.total_nodes as f64
2802 } else {
2803 0.0
2804 };
2805
2806 let label_stats =
2807 LabelStatistics::new(node_count).with_degrees(avg_out_degree, avg_out_degree);
2808
2809 stats.update_label(label_name.as_ref(), label_stats);
2810 }
2811 }
2812
2813 let id_to_edge_type = self.id_to_edge_type.read();
2815 let edges = self.edges.read();
2816 let epoch = self.current_epoch();
2817
2818 let mut edge_type_counts: FxHashMap<u32, u64> = FxHashMap::default();
2819 for chain in edges.values() {
2820 if let Some(record) = chain.visible_at(epoch)
2821 && !record.is_deleted()
2822 {
2823 *edge_type_counts.entry(record.type_id).or_default() += 1;
2824 }
2825 }
2826
2827 for (type_id, count) in edge_type_counts {
2828 if let Some(type_name) = id_to_edge_type.get(type_id as usize) {
2829 let avg_degree = if stats.total_nodes > 0 {
2830 count as f64 / stats.total_nodes as f64
2831 } else {
2832 0.0
2833 };
2834
2835 let edge_stats = EdgeTypeStatistics::new(count, avg_degree, avg_degree);
2836 stats.update_edge_type(type_name.as_ref(), edge_stats);
2837 }
2838 }
2839
2840 *self.statistics.write() = stats;
2841 }
2842
2843 #[cfg(feature = "tiered-storage")]
2846 pub fn compute_statistics(&self) {
2847 let mut stats = Statistics::new();
2848
2849 stats.total_nodes = self.node_count() as u64;
2851 stats.total_edges = self.edge_count() as u64;
2852
2853 let id_to_label = self.id_to_label.read();
2855 let label_index = self.label_index.read();
2856
2857 for (label_id, label_name) in id_to_label.iter().enumerate() {
2858 let node_count = label_index.get(label_id).map_or(0, |set| set.len() as u64);
2859
2860 if node_count > 0 {
2861 let avg_out_degree = if stats.total_nodes > 0 {
2862 stats.total_edges as f64 / stats.total_nodes as f64
2863 } else {
2864 0.0
2865 };
2866
2867 let label_stats =
2868 LabelStatistics::new(node_count).with_degrees(avg_out_degree, avg_out_degree);
2869
2870 stats.update_label(label_name.as_ref(), label_stats);
2871 }
2872 }
2873
2874 let id_to_edge_type = self.id_to_edge_type.read();
2876 let versions = self.edge_versions.read();
2877 let epoch = self.current_epoch();
2878
2879 let mut edge_type_counts: FxHashMap<u32, u64> = FxHashMap::default();
2880 for index in versions.values() {
2881 if let Some(vref) = index.visible_at(epoch)
2882 && let Some(record) = self.read_edge_record(&vref)
2883 && !record.is_deleted()
2884 {
2885 *edge_type_counts.entry(record.type_id).or_default() += 1;
2886 }
2887 }
2888
2889 for (type_id, count) in edge_type_counts {
2890 if let Some(type_name) = id_to_edge_type.get(type_id as usize) {
2891 let avg_degree = if stats.total_nodes > 0 {
2892 count as f64 / stats.total_nodes as f64
2893 } else {
2894 0.0
2895 };
2896
2897 let edge_stats = EdgeTypeStatistics::new(count, avg_degree, avg_degree);
2898 stats.update_edge_type(type_name.as_ref(), edge_stats);
2899 }
2900 }
2901
2902 *self.statistics.write() = stats;
2903 }
2904
2905 #[must_use]
2907 pub fn estimate_label_cardinality(&self, label: &str) -> f64 {
2908 self.statistics.read().estimate_label_cardinality(label)
2909 }
2910
2911 #[must_use]
2913 pub fn estimate_avg_degree(&self, edge_type: &str, outgoing: bool) -> f64 {
2914 self.statistics
2915 .read()
2916 .estimate_avg_degree(edge_type, outgoing)
2917 }
2918
2919 fn get_or_create_label_id(&self, label: &str) -> u32 {
2922 {
2923 let label_to_id = self.label_to_id.read();
2924 if let Some(&id) = label_to_id.get(label) {
2925 return id;
2926 }
2927 }
2928
2929 let mut label_to_id = self.label_to_id.write();
2930 let mut id_to_label = self.id_to_label.write();
2931
2932 if let Some(&id) = label_to_id.get(label) {
2934 return id;
2935 }
2936
2937 let id = id_to_label.len() as u32;
2938
2939 let label: ArcStr = label.into();
2940 label_to_id.insert(label.clone(), id);
2941 id_to_label.push(label);
2942
2943 id
2944 }
2945
2946 fn get_or_create_edge_type_id(&self, edge_type: &str) -> u32 {
2947 {
2948 let type_to_id = self.edge_type_to_id.read();
2949 if let Some(&id) = type_to_id.get(edge_type) {
2950 return id;
2951 }
2952 }
2953
2954 let mut type_to_id = self.edge_type_to_id.write();
2955 let mut id_to_type = self.id_to_edge_type.write();
2956
2957 if let Some(&id) = type_to_id.get(edge_type) {
2959 return id;
2960 }
2961
2962 let id = id_to_type.len() as u32;
2963 let edge_type: ArcStr = edge_type.into();
2964 type_to_id.insert(edge_type.clone(), id);
2965 id_to_type.push(edge_type);
2966
2967 id
2968 }
2969
2970 #[cfg(not(feature = "tiered-storage"))]
2977 pub fn create_node_with_id(&self, id: NodeId, labels: &[&str]) {
2978 let epoch = self.current_epoch();
2979 let mut record = NodeRecord::new(id, epoch);
2980 record.set_label_count(labels.len() as u16);
2981
2982 let mut node_label_set = FxHashSet::default();
2984 for label in labels {
2985 let label_id = self.get_or_create_label_id(*label);
2986 node_label_set.insert(label_id);
2987
2988 let mut index = self.label_index.write();
2990 while index.len() <= label_id as usize {
2991 index.push(FxHashMap::default());
2992 }
2993 index[label_id as usize].insert(id, ());
2994 }
2995
2996 self.node_labels.write().insert(id, node_label_set);
2998
2999 let chain = VersionChain::with_initial(record, epoch, TxId::SYSTEM);
3001 self.nodes.write().insert(id, chain);
3002
3003 let id_val = id.as_u64();
3005 let _ = self
3006 .next_node_id
3007 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
3008 if id_val >= current {
3009 Some(id_val + 1)
3010 } else {
3011 None
3012 }
3013 });
3014 }
3015
3016 #[cfg(feature = "tiered-storage")]
3019 pub fn create_node_with_id(&self, id: NodeId, labels: &[&str]) {
3020 let epoch = self.current_epoch();
3021 let mut record = NodeRecord::new(id, epoch);
3022 record.set_label_count(labels.len() as u16);
3023
3024 let mut node_label_set = FxHashSet::default();
3026 for label in labels {
3027 let label_id = self.get_or_create_label_id(*label);
3028 node_label_set.insert(label_id);
3029
3030 let mut index = self.label_index.write();
3032 while index.len() <= label_id as usize {
3033 index.push(FxHashMap::default());
3034 }
3035 index[label_id as usize].insert(id, ());
3036 }
3037
3038 self.node_labels.write().insert(id, node_label_set);
3040
3041 let arena = self.arena_allocator.arena_or_create(epoch);
3043 let (offset, _stored) = arena.alloc_value_with_offset(record);
3044
3045 let hot_ref = HotVersionRef::new(epoch, offset, TxId::SYSTEM);
3047 let mut versions = self.node_versions.write();
3048 versions.insert(id, VersionIndex::with_initial(hot_ref));
3049
3050 let id_val = id.as_u64();
3052 let _ = self
3053 .next_node_id
3054 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
3055 if id_val >= current {
3056 Some(id_val + 1)
3057 } else {
3058 None
3059 }
3060 });
3061 }
3062
3063 #[cfg(not(feature = "tiered-storage"))]
3067 pub fn create_edge_with_id(&self, id: EdgeId, src: NodeId, dst: NodeId, edge_type: &str) {
3068 let epoch = self.current_epoch();
3069 let type_id = self.get_or_create_edge_type_id(edge_type);
3070
3071 let record = EdgeRecord::new(id, src, dst, type_id, epoch);
3072 let chain = VersionChain::with_initial(record, epoch, TxId::SYSTEM);
3073 self.edges.write().insert(id, chain);
3074
3075 self.forward_adj.add_edge(src, dst, id);
3077 if let Some(ref backward) = self.backward_adj {
3078 backward.add_edge(dst, src, id);
3079 }
3080
3081 let id_val = id.as_u64();
3083 let _ = self
3084 .next_edge_id
3085 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
3086 if id_val >= current {
3087 Some(id_val + 1)
3088 } else {
3089 None
3090 }
3091 });
3092 }
3093
3094 #[cfg(feature = "tiered-storage")]
3097 pub fn create_edge_with_id(&self, id: EdgeId, src: NodeId, dst: NodeId, edge_type: &str) {
3098 let epoch = self.current_epoch();
3099 let type_id = self.get_or_create_edge_type_id(edge_type);
3100
3101 let record = EdgeRecord::new(id, src, dst, type_id, epoch);
3102
3103 let arena = self.arena_allocator.arena_or_create(epoch);
3105 let (offset, _stored) = arena.alloc_value_with_offset(record);
3106
3107 let hot_ref = HotVersionRef::new(epoch, offset, TxId::SYSTEM);
3109 let mut versions = self.edge_versions.write();
3110 versions.insert(id, VersionIndex::with_initial(hot_ref));
3111
3112 self.forward_adj.add_edge(src, dst, id);
3114 if let Some(ref backward) = self.backward_adj {
3115 backward.add_edge(dst, src, id);
3116 }
3117
3118 let id_val = id.as_u64();
3120 let _ = self
3121 .next_edge_id
3122 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
3123 if id_val >= current {
3124 Some(id_val + 1)
3125 } else {
3126 None
3127 }
3128 });
3129 }
3130
3131 pub fn set_epoch(&self, epoch: EpochId) {
3133 self.current_epoch.store(epoch.as_u64(), Ordering::SeqCst);
3134 }
3135}
3136
3137impl Default for LpgStore {
3138 fn default() -> Self {
3139 Self::new()
3140 }
3141}
3142
3143#[cfg(test)]
3144mod tests {
3145 use super::*;
3146
3147 #[test]
3148 fn test_create_node() {
3149 let store = LpgStore::new();
3150
3151 let id = store.create_node(&["Person"]);
3152 assert!(id.is_valid());
3153
3154 let node = store.get_node(id).unwrap();
3155 assert!(node.has_label("Person"));
3156 assert!(!node.has_label("Animal"));
3157 }
3158
3159 #[test]
3160 fn test_create_node_with_props() {
3161 let store = LpgStore::new();
3162
3163 let id = store.create_node_with_props(
3164 &["Person"],
3165 [("name", Value::from("Alice")), ("age", Value::from(30i64))],
3166 );
3167
3168 let node = store.get_node(id).unwrap();
3169 assert_eq!(
3170 node.get_property("name").and_then(|v| v.as_str()),
3171 Some("Alice")
3172 );
3173 assert_eq!(
3174 node.get_property("age").and_then(|v| v.as_int64()),
3175 Some(30)
3176 );
3177 }
3178
3179 #[test]
3180 fn test_delete_node() {
3181 let store = LpgStore::new();
3182
3183 let id = store.create_node(&["Person"]);
3184 assert_eq!(store.node_count(), 1);
3185
3186 assert!(store.delete_node(id));
3187 assert_eq!(store.node_count(), 0);
3188 assert!(store.get_node(id).is_none());
3189
3190 assert!(!store.delete_node(id));
3192 }
3193
3194 #[test]
3195 fn test_create_edge() {
3196 let store = LpgStore::new();
3197
3198 let alice = store.create_node(&["Person"]);
3199 let bob = store.create_node(&["Person"]);
3200
3201 let edge_id = store.create_edge(alice, bob, "KNOWS");
3202 assert!(edge_id.is_valid());
3203
3204 let edge = store.get_edge(edge_id).unwrap();
3205 assert_eq!(edge.src, alice);
3206 assert_eq!(edge.dst, bob);
3207 assert_eq!(edge.edge_type.as_str(), "KNOWS");
3208 }
3209
3210 #[test]
3211 fn test_neighbors() {
3212 let store = LpgStore::new();
3213
3214 let a = store.create_node(&["Person"]);
3215 let b = store.create_node(&["Person"]);
3216 let c = store.create_node(&["Person"]);
3217
3218 store.create_edge(a, b, "KNOWS");
3219 store.create_edge(a, c, "KNOWS");
3220
3221 let outgoing: Vec<_> = store.neighbors(a, Direction::Outgoing).collect();
3222 assert_eq!(outgoing.len(), 2);
3223 assert!(outgoing.contains(&b));
3224 assert!(outgoing.contains(&c));
3225
3226 let incoming: Vec<_> = store.neighbors(b, Direction::Incoming).collect();
3227 assert_eq!(incoming.len(), 1);
3228 assert!(incoming.contains(&a));
3229 }
3230
3231 #[test]
3232 fn test_nodes_by_label() {
3233 let store = LpgStore::new();
3234
3235 let p1 = store.create_node(&["Person"]);
3236 let p2 = store.create_node(&["Person"]);
3237 let _a = store.create_node(&["Animal"]);
3238
3239 let persons = store.nodes_by_label("Person");
3240 assert_eq!(persons.len(), 2);
3241 assert!(persons.contains(&p1));
3242 assert!(persons.contains(&p2));
3243
3244 let animals = store.nodes_by_label("Animal");
3245 assert_eq!(animals.len(), 1);
3246 }
3247
3248 #[test]
3249 fn test_delete_edge() {
3250 let store = LpgStore::new();
3251
3252 let a = store.create_node(&["Person"]);
3253 let b = store.create_node(&["Person"]);
3254 let edge_id = store.create_edge(a, b, "KNOWS");
3255
3256 assert_eq!(store.edge_count(), 1);
3257
3258 assert!(store.delete_edge(edge_id));
3259 assert_eq!(store.edge_count(), 0);
3260 assert!(store.get_edge(edge_id).is_none());
3261 }
3262
3263 #[test]
3266 fn test_lpg_store_config() {
3267 let config = LpgStoreConfig {
3269 backward_edges: false,
3270 initial_node_capacity: 100,
3271 initial_edge_capacity: 200,
3272 };
3273 let store = LpgStore::with_config(config);
3274
3275 let a = store.create_node(&["Person"]);
3277 let b = store.create_node(&["Person"]);
3278 store.create_edge(a, b, "KNOWS");
3279
3280 let outgoing: Vec<_> = store.neighbors(a, Direction::Outgoing).collect();
3282 assert_eq!(outgoing.len(), 1);
3283
3284 let incoming: Vec<_> = store.neighbors(b, Direction::Incoming).collect();
3286 assert_eq!(incoming.len(), 0);
3287 }
3288
3289 #[test]
3290 fn test_epoch_management() {
3291 let store = LpgStore::new();
3292
3293 let epoch0 = store.current_epoch();
3294 assert_eq!(epoch0.as_u64(), 0);
3295
3296 let epoch1 = store.new_epoch();
3297 assert_eq!(epoch1.as_u64(), 1);
3298
3299 let current = store.current_epoch();
3300 assert_eq!(current.as_u64(), 1);
3301 }
3302
3303 #[test]
3304 fn test_node_properties() {
3305 let store = LpgStore::new();
3306 let id = store.create_node(&["Person"]);
3307
3308 store.set_node_property(id, "name", Value::from("Alice"));
3310 let name = store.get_node_property(id, &"name".into());
3311 assert!(matches!(name, Some(Value::String(s)) if s.as_str() == "Alice"));
3312
3313 store.set_node_property(id, "name", Value::from("Bob"));
3315 let name = store.get_node_property(id, &"name".into());
3316 assert!(matches!(name, Some(Value::String(s)) if s.as_str() == "Bob"));
3317
3318 let old = store.remove_node_property(id, "name");
3320 assert!(matches!(old, Some(Value::String(s)) if s.as_str() == "Bob"));
3321
3322 let name = store.get_node_property(id, &"name".into());
3324 assert!(name.is_none());
3325
3326 let none = store.remove_node_property(id, "nonexistent");
3328 assert!(none.is_none());
3329 }
3330
3331 #[test]
3332 fn test_edge_properties() {
3333 let store = LpgStore::new();
3334 let a = store.create_node(&["Person"]);
3335 let b = store.create_node(&["Person"]);
3336 let edge_id = store.create_edge(a, b, "KNOWS");
3337
3338 store.set_edge_property(edge_id, "since", Value::from(2020i64));
3340 let since = store.get_edge_property(edge_id, &"since".into());
3341 assert_eq!(since.and_then(|v| v.as_int64()), Some(2020));
3342
3343 let old = store.remove_edge_property(edge_id, "since");
3345 assert_eq!(old.and_then(|v| v.as_int64()), Some(2020));
3346
3347 let since = store.get_edge_property(edge_id, &"since".into());
3348 assert!(since.is_none());
3349 }
3350
3351 #[test]
3352 fn test_add_remove_label() {
3353 let store = LpgStore::new();
3354 let id = store.create_node(&["Person"]);
3355
3356 assert!(store.add_label(id, "Employee"));
3358
3359 let node = store.get_node(id).unwrap();
3360 assert!(node.has_label("Person"));
3361 assert!(node.has_label("Employee"));
3362
3363 assert!(!store.add_label(id, "Employee"));
3365
3366 assert!(store.remove_label(id, "Employee"));
3368
3369 let node = store.get_node(id).unwrap();
3370 assert!(node.has_label("Person"));
3371 assert!(!node.has_label("Employee"));
3372
3373 assert!(!store.remove_label(id, "Employee"));
3375 assert!(!store.remove_label(id, "NonExistent"));
3376 }
3377
3378 #[test]
3379 fn test_add_label_to_nonexistent_node() {
3380 let store = LpgStore::new();
3381 let fake_id = NodeId::new(999);
3382 assert!(!store.add_label(fake_id, "Label"));
3383 }
3384
3385 #[test]
3386 fn test_remove_label_from_nonexistent_node() {
3387 let store = LpgStore::new();
3388 let fake_id = NodeId::new(999);
3389 assert!(!store.remove_label(fake_id, "Label"));
3390 }
3391
3392 #[test]
3393 fn test_node_ids() {
3394 let store = LpgStore::new();
3395
3396 let n1 = store.create_node(&["Person"]);
3397 let n2 = store.create_node(&["Person"]);
3398 let n3 = store.create_node(&["Person"]);
3399
3400 let ids = store.node_ids();
3401 assert_eq!(ids.len(), 3);
3402 assert!(ids.contains(&n1));
3403 assert!(ids.contains(&n2));
3404 assert!(ids.contains(&n3));
3405
3406 store.delete_node(n2);
3408 let ids = store.node_ids();
3409 assert_eq!(ids.len(), 2);
3410 assert!(!ids.contains(&n2));
3411 }
3412
3413 #[test]
3414 fn test_delete_node_nonexistent() {
3415 let store = LpgStore::new();
3416 let fake_id = NodeId::new(999);
3417 assert!(!store.delete_node(fake_id));
3418 }
3419
3420 #[test]
3421 fn test_delete_edge_nonexistent() {
3422 let store = LpgStore::new();
3423 let fake_id = EdgeId::new(999);
3424 assert!(!store.delete_edge(fake_id));
3425 }
3426
3427 #[test]
3428 fn test_delete_edge_double() {
3429 let store = LpgStore::new();
3430 let a = store.create_node(&["Person"]);
3431 let b = store.create_node(&["Person"]);
3432 let edge_id = store.create_edge(a, b, "KNOWS");
3433
3434 assert!(store.delete_edge(edge_id));
3435 assert!(!store.delete_edge(edge_id)); }
3437
3438 #[test]
3439 fn test_create_edge_with_props() {
3440 let store = LpgStore::new();
3441 let a = store.create_node(&["Person"]);
3442 let b = store.create_node(&["Person"]);
3443
3444 let edge_id = store.create_edge_with_props(
3445 a,
3446 b,
3447 "KNOWS",
3448 [
3449 ("since", Value::from(2020i64)),
3450 ("weight", Value::from(1.0)),
3451 ],
3452 );
3453
3454 let edge = store.get_edge(edge_id).unwrap();
3455 assert_eq!(
3456 edge.get_property("since").and_then(|v| v.as_int64()),
3457 Some(2020)
3458 );
3459 assert_eq!(
3460 edge.get_property("weight").and_then(|v| v.as_float64()),
3461 Some(1.0)
3462 );
3463 }
3464
3465 #[test]
3466 fn test_delete_node_edges() {
3467 let store = LpgStore::new();
3468
3469 let a = store.create_node(&["Person"]);
3470 let b = store.create_node(&["Person"]);
3471 let c = store.create_node(&["Person"]);
3472
3473 store.create_edge(a, b, "KNOWS"); store.create_edge(c, a, "KNOWS"); assert_eq!(store.edge_count(), 2);
3477
3478 store.delete_node_edges(a);
3480
3481 assert_eq!(store.edge_count(), 0);
3482 }
3483
3484 #[test]
3485 fn test_neighbors_both_directions() {
3486 let store = LpgStore::new();
3487
3488 let a = store.create_node(&["Person"]);
3489 let b = store.create_node(&["Person"]);
3490 let c = store.create_node(&["Person"]);
3491
3492 store.create_edge(a, b, "KNOWS"); store.create_edge(c, a, "KNOWS"); let neighbors: Vec<_> = store.neighbors(a, Direction::Both).collect();
3497 assert_eq!(neighbors.len(), 2);
3498 assert!(neighbors.contains(&b)); assert!(neighbors.contains(&c)); }
3501
3502 #[test]
3503 fn test_edges_from() {
3504 let store = LpgStore::new();
3505
3506 let a = store.create_node(&["Person"]);
3507 let b = store.create_node(&["Person"]);
3508 let c = store.create_node(&["Person"]);
3509
3510 let e1 = store.create_edge(a, b, "KNOWS");
3511 let e2 = store.create_edge(a, c, "KNOWS");
3512
3513 let edges: Vec<_> = store.edges_from(a, Direction::Outgoing).collect();
3514 assert_eq!(edges.len(), 2);
3515 assert!(edges.iter().any(|(_, e)| *e == e1));
3516 assert!(edges.iter().any(|(_, e)| *e == e2));
3517
3518 let incoming: Vec<_> = store.edges_from(b, Direction::Incoming).collect();
3520 assert_eq!(incoming.len(), 1);
3521 assert_eq!(incoming[0].1, e1);
3522 }
3523
3524 #[test]
3525 fn test_edges_to() {
3526 let store = LpgStore::new();
3527
3528 let a = store.create_node(&["Person"]);
3529 let b = store.create_node(&["Person"]);
3530 let c = store.create_node(&["Person"]);
3531
3532 let e1 = store.create_edge(a, b, "KNOWS");
3533 let e2 = store.create_edge(c, b, "KNOWS");
3534
3535 let to_b = store.edges_to(b);
3537 assert_eq!(to_b.len(), 2);
3538 assert!(to_b.iter().any(|(src, e)| *src == a && *e == e1));
3539 assert!(to_b.iter().any(|(src, e)| *src == c && *e == e2));
3540 }
3541
3542 #[test]
3543 fn test_out_degree_in_degree() {
3544 let store = LpgStore::new();
3545
3546 let a = store.create_node(&["Person"]);
3547 let b = store.create_node(&["Person"]);
3548 let c = store.create_node(&["Person"]);
3549
3550 store.create_edge(a, b, "KNOWS");
3551 store.create_edge(a, c, "KNOWS");
3552 store.create_edge(c, b, "KNOWS");
3553
3554 assert_eq!(store.out_degree(a), 2);
3555 assert_eq!(store.out_degree(b), 0);
3556 assert_eq!(store.out_degree(c), 1);
3557
3558 assert_eq!(store.in_degree(a), 0);
3559 assert_eq!(store.in_degree(b), 2);
3560 assert_eq!(store.in_degree(c), 1);
3561 }
3562
3563 #[test]
3564 fn test_edge_type() {
3565 let store = LpgStore::new();
3566
3567 let a = store.create_node(&["Person"]);
3568 let b = store.create_node(&["Person"]);
3569 let edge_id = store.create_edge(a, b, "KNOWS");
3570
3571 let edge_type = store.edge_type(edge_id);
3572 assert_eq!(edge_type.as_deref(), Some("KNOWS"));
3573
3574 let fake_id = EdgeId::new(999);
3576 assert!(store.edge_type(fake_id).is_none());
3577 }
3578
3579 #[test]
3580 fn test_count_methods() {
3581 let store = LpgStore::new();
3582
3583 assert_eq!(store.label_count(), 0);
3584 assert_eq!(store.edge_type_count(), 0);
3585 assert_eq!(store.property_key_count(), 0);
3586
3587 let a = store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
3588 let b = store.create_node(&["Company"]);
3589 store.create_edge_with_props(a, b, "WORKS_AT", [("since", Value::from(2020i64))]);
3590
3591 assert_eq!(store.label_count(), 2); assert_eq!(store.edge_type_count(), 1); assert_eq!(store.property_key_count(), 2); }
3595
3596 #[test]
3597 fn test_all_nodes_and_edges() {
3598 let store = LpgStore::new();
3599
3600 let a = store.create_node(&["Person"]);
3601 let b = store.create_node(&["Person"]);
3602 store.create_edge(a, b, "KNOWS");
3603
3604 let nodes: Vec<_> = store.all_nodes().collect();
3605 assert_eq!(nodes.len(), 2);
3606
3607 let edges: Vec<_> = store.all_edges().collect();
3608 assert_eq!(edges.len(), 1);
3609 }
3610
3611 #[test]
3612 fn test_all_labels_and_edge_types() {
3613 let store = LpgStore::new();
3614
3615 store.create_node(&["Person"]);
3616 store.create_node(&["Company"]);
3617 let a = store.create_node(&["Animal"]);
3618 let b = store.create_node(&["Animal"]);
3619 store.create_edge(a, b, "EATS");
3620
3621 let labels = store.all_labels();
3622 assert_eq!(labels.len(), 3);
3623 assert!(labels.contains(&"Person".to_string()));
3624 assert!(labels.contains(&"Company".to_string()));
3625 assert!(labels.contains(&"Animal".to_string()));
3626
3627 let edge_types = store.all_edge_types();
3628 assert_eq!(edge_types.len(), 1);
3629 assert!(edge_types.contains(&"EATS".to_string()));
3630 }
3631
3632 #[test]
3633 fn test_all_property_keys() {
3634 let store = LpgStore::new();
3635
3636 let a = store.create_node_with_props(&["Person"], [("name", Value::from("Alice"))]);
3637 let b = store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
3638 store.create_edge_with_props(a, b, "KNOWS", [("since", Value::from(2020i64))]);
3639
3640 let keys = store.all_property_keys();
3641 assert!(keys.contains(&"name".to_string()));
3642 assert!(keys.contains(&"age".to_string()));
3643 assert!(keys.contains(&"since".to_string()));
3644 }
3645
3646 #[test]
3647 fn test_nodes_with_label() {
3648 let store = LpgStore::new();
3649
3650 store.create_node(&["Person"]);
3651 store.create_node(&["Person"]);
3652 store.create_node(&["Company"]);
3653
3654 let persons: Vec<_> = store.nodes_with_label("Person").collect();
3655 assert_eq!(persons.len(), 2);
3656
3657 let companies: Vec<_> = store.nodes_with_label("Company").collect();
3658 assert_eq!(companies.len(), 1);
3659
3660 let none: Vec<_> = store.nodes_with_label("NonExistent").collect();
3661 assert_eq!(none.len(), 0);
3662 }
3663
3664 #[test]
3665 fn test_edges_with_type() {
3666 let store = LpgStore::new();
3667
3668 let a = store.create_node(&["Person"]);
3669 let b = store.create_node(&["Person"]);
3670 let c = store.create_node(&["Company"]);
3671
3672 store.create_edge(a, b, "KNOWS");
3673 store.create_edge(a, c, "WORKS_AT");
3674
3675 let knows: Vec<_> = store.edges_with_type("KNOWS").collect();
3676 assert_eq!(knows.len(), 1);
3677
3678 let works_at: Vec<_> = store.edges_with_type("WORKS_AT").collect();
3679 assert_eq!(works_at.len(), 1);
3680
3681 let none: Vec<_> = store.edges_with_type("NonExistent").collect();
3682 assert_eq!(none.len(), 0);
3683 }
3684
3685 #[test]
3686 fn test_nodes_by_label_nonexistent() {
3687 let store = LpgStore::new();
3688 store.create_node(&["Person"]);
3689
3690 let empty = store.nodes_by_label("NonExistent");
3691 assert!(empty.is_empty());
3692 }
3693
3694 #[test]
3695 fn test_statistics() {
3696 let store = LpgStore::new();
3697
3698 let a = store.create_node(&["Person"]);
3699 let b = store.create_node(&["Person"]);
3700 let c = store.create_node(&["Company"]);
3701
3702 store.create_edge(a, b, "KNOWS");
3703 store.create_edge(a, c, "WORKS_AT");
3704
3705 store.compute_statistics();
3706 let stats = store.statistics();
3707
3708 assert_eq!(stats.total_nodes, 3);
3709 assert_eq!(stats.total_edges, 2);
3710
3711 let person_card = store.estimate_label_cardinality("Person");
3713 assert!(person_card > 0.0);
3714
3715 let avg_degree = store.estimate_avg_degree("KNOWS", true);
3716 assert!(avg_degree >= 0.0);
3717 }
3718
3719 #[test]
3720 fn test_zone_maps() {
3721 let store = LpgStore::new();
3722
3723 store.create_node_with_props(&["Person"], [("age", Value::from(25i64))]);
3724 store.create_node_with_props(&["Person"], [("age", Value::from(35i64))]);
3725
3726 let might_match =
3728 store.node_property_might_match(&"age".into(), CompareOp::Eq, &Value::from(30i64));
3729 assert!(might_match);
3731
3732 let zone = store.node_property_zone_map(&"age".into());
3733 assert!(zone.is_some());
3734
3735 let no_zone = store.node_property_zone_map(&"nonexistent".into());
3737 assert!(no_zone.is_none());
3738
3739 let a = store.create_node(&["A"]);
3741 let b = store.create_node(&["B"]);
3742 store.create_edge_with_props(a, b, "REL", [("weight", Value::from(1.0))]);
3743
3744 let edge_zone = store.edge_property_zone_map(&"weight".into());
3745 assert!(edge_zone.is_some());
3746 }
3747
3748 #[test]
3749 fn test_rebuild_zone_maps() {
3750 let store = LpgStore::new();
3751 store.create_node_with_props(&["Person"], [("age", Value::from(25i64))]);
3752
3753 store.rebuild_zone_maps();
3755 }
3756
3757 #[test]
3758 fn test_create_node_with_id() {
3759 let store = LpgStore::new();
3760
3761 let specific_id = NodeId::new(100);
3762 store.create_node_with_id(specific_id, &["Person", "Employee"]);
3763
3764 let node = store.get_node(specific_id).unwrap();
3765 assert!(node.has_label("Person"));
3766 assert!(node.has_label("Employee"));
3767
3768 let next = store.create_node(&["Other"]);
3770 assert!(next.as_u64() > 100);
3771 }
3772
3773 #[test]
3774 fn test_create_edge_with_id() {
3775 let store = LpgStore::new();
3776
3777 let a = store.create_node(&["A"]);
3778 let b = store.create_node(&["B"]);
3779
3780 let specific_id = EdgeId::new(500);
3781 store.create_edge_with_id(specific_id, a, b, "REL");
3782
3783 let edge = store.get_edge(specific_id).unwrap();
3784 assert_eq!(edge.src, a);
3785 assert_eq!(edge.dst, b);
3786 assert_eq!(edge.edge_type.as_str(), "REL");
3787
3788 let next = store.create_edge(a, b, "OTHER");
3790 assert!(next.as_u64() > 500);
3791 }
3792
3793 #[test]
3794 fn test_set_epoch() {
3795 let store = LpgStore::new();
3796
3797 assert_eq!(store.current_epoch().as_u64(), 0);
3798
3799 store.set_epoch(EpochId::new(42));
3800 assert_eq!(store.current_epoch().as_u64(), 42);
3801 }
3802
3803 #[test]
3804 fn test_get_node_nonexistent() {
3805 let store = LpgStore::new();
3806 let fake_id = NodeId::new(999);
3807 assert!(store.get_node(fake_id).is_none());
3808 }
3809
3810 #[test]
3811 fn test_get_edge_nonexistent() {
3812 let store = LpgStore::new();
3813 let fake_id = EdgeId::new(999);
3814 assert!(store.get_edge(fake_id).is_none());
3815 }
3816
3817 #[test]
3818 fn test_multiple_labels() {
3819 let store = LpgStore::new();
3820
3821 let id = store.create_node(&["Person", "Employee", "Manager"]);
3822 let node = store.get_node(id).unwrap();
3823
3824 assert!(node.has_label("Person"));
3825 assert!(node.has_label("Employee"));
3826 assert!(node.has_label("Manager"));
3827 assert!(!node.has_label("Other"));
3828 }
3829
3830 #[test]
3831 fn test_default_impl() {
3832 let store: LpgStore = Default::default();
3833 assert_eq!(store.node_count(), 0);
3834 assert_eq!(store.edge_count(), 0);
3835 }
3836
3837 #[test]
3838 fn test_edges_from_both_directions() {
3839 let store = LpgStore::new();
3840
3841 let a = store.create_node(&["A"]);
3842 let b = store.create_node(&["B"]);
3843 let c = store.create_node(&["C"]);
3844
3845 let e1 = store.create_edge(a, b, "R1"); let e2 = store.create_edge(c, a, "R2"); let edges: Vec<_> = store.edges_from(a, Direction::Both).collect();
3850 assert_eq!(edges.len(), 2);
3851 assert!(edges.iter().any(|(_, e)| *e == e1)); assert!(edges.iter().any(|(_, e)| *e == e2)); }
3854
3855 #[test]
3856 fn test_no_backward_adj_in_degree() {
3857 let config = LpgStoreConfig {
3858 backward_edges: false,
3859 initial_node_capacity: 10,
3860 initial_edge_capacity: 10,
3861 };
3862 let store = LpgStore::with_config(config);
3863
3864 let a = store.create_node(&["A"]);
3865 let b = store.create_node(&["B"]);
3866 store.create_edge(a, b, "R");
3867
3868 let degree = store.in_degree(b);
3870 assert_eq!(degree, 1);
3871 }
3872
3873 #[test]
3874 fn test_no_backward_adj_edges_to() {
3875 let config = LpgStoreConfig {
3876 backward_edges: false,
3877 initial_node_capacity: 10,
3878 initial_edge_capacity: 10,
3879 };
3880 let store = LpgStore::with_config(config);
3881
3882 let a = store.create_node(&["A"]);
3883 let b = store.create_node(&["B"]);
3884 let e = store.create_edge(a, b, "R");
3885
3886 let edges = store.edges_to(b);
3888 assert_eq!(edges.len(), 1);
3889 assert_eq!(edges[0].1, e);
3890 }
3891
3892 #[test]
3893 fn test_node_versioned_creation() {
3894 let store = LpgStore::new();
3895
3896 let epoch = store.new_epoch();
3897 let tx_id = TxId::new(1);
3898
3899 let id = store.create_node_versioned(&["Person"], epoch, tx_id);
3900 assert!(store.get_node(id).is_some());
3901 }
3902
3903 #[test]
3904 fn test_edge_versioned_creation() {
3905 let store = LpgStore::new();
3906
3907 let a = store.create_node(&["A"]);
3908 let b = store.create_node(&["B"]);
3909
3910 let epoch = store.new_epoch();
3911 let tx_id = TxId::new(1);
3912
3913 let edge_id = store.create_edge_versioned(a, b, "REL", epoch, tx_id);
3914 assert!(store.get_edge(edge_id).is_some());
3915 }
3916
3917 #[test]
3918 fn test_node_with_props_versioned() {
3919 let store = LpgStore::new();
3920
3921 let epoch = store.new_epoch();
3922 let tx_id = TxId::new(1);
3923
3924 let id = store.create_node_with_props_versioned(
3925 &["Person"],
3926 [("name", Value::from("Alice"))],
3927 epoch,
3928 tx_id,
3929 );
3930
3931 let node = store.get_node(id).unwrap();
3932 assert_eq!(
3933 node.get_property("name").and_then(|v| v.as_str()),
3934 Some("Alice")
3935 );
3936 }
3937
3938 #[test]
3939 fn test_discard_uncommitted_versions() {
3940 let store = LpgStore::new();
3941
3942 let epoch = store.new_epoch();
3943 let tx_id = TxId::new(42);
3944
3945 let node_id = store.create_node_versioned(&["Person"], epoch, tx_id);
3947 assert!(store.get_node(node_id).is_some());
3948
3949 store.discard_uncommitted_versions(tx_id);
3951
3952 assert!(store.get_node(node_id).is_none());
3954 }
3955
3956 #[test]
3959 fn test_property_index_create_and_lookup() {
3960 let store = LpgStore::new();
3961
3962 let alice = store.create_node(&["Person"]);
3964 let bob = store.create_node(&["Person"]);
3965 let charlie = store.create_node(&["Person"]);
3966
3967 store.set_node_property(alice, "city", Value::from("NYC"));
3968 store.set_node_property(bob, "city", Value::from("NYC"));
3969 store.set_node_property(charlie, "city", Value::from("LA"));
3970
3971 let nyc_people = store.find_nodes_by_property("city", &Value::from("NYC"));
3973 assert_eq!(nyc_people.len(), 2);
3974
3975 store.create_property_index("city");
3977 assert!(store.has_property_index("city"));
3978
3979 let nyc_people = store.find_nodes_by_property("city", &Value::from("NYC"));
3981 assert_eq!(nyc_people.len(), 2);
3982 assert!(nyc_people.contains(&alice));
3983 assert!(nyc_people.contains(&bob));
3984
3985 let la_people = store.find_nodes_by_property("city", &Value::from("LA"));
3986 assert_eq!(la_people.len(), 1);
3987 assert!(la_people.contains(&charlie));
3988 }
3989
3990 #[test]
3991 fn test_property_index_maintained_on_update() {
3992 let store = LpgStore::new();
3993
3994 store.create_property_index("status");
3996
3997 let node = store.create_node(&["Task"]);
3998 store.set_node_property(node, "status", Value::from("pending"));
3999
4000 let pending = store.find_nodes_by_property("status", &Value::from("pending"));
4002 assert_eq!(pending.len(), 1);
4003 assert!(pending.contains(&node));
4004
4005 store.set_node_property(node, "status", Value::from("done"));
4007
4008 let pending = store.find_nodes_by_property("status", &Value::from("pending"));
4010 assert!(pending.is_empty());
4011
4012 let done = store.find_nodes_by_property("status", &Value::from("done"));
4014 assert_eq!(done.len(), 1);
4015 assert!(done.contains(&node));
4016 }
4017
4018 #[test]
4019 fn test_property_index_maintained_on_remove() {
4020 let store = LpgStore::new();
4021
4022 store.create_property_index("tag");
4023
4024 let node = store.create_node(&["Item"]);
4025 store.set_node_property(node, "tag", Value::from("important"));
4026
4027 let found = store.find_nodes_by_property("tag", &Value::from("important"));
4029 assert_eq!(found.len(), 1);
4030
4031 store.remove_node_property(node, "tag");
4033
4034 let found = store.find_nodes_by_property("tag", &Value::from("important"));
4036 assert!(found.is_empty());
4037 }
4038
4039 #[test]
4040 fn test_property_index_drop() {
4041 let store = LpgStore::new();
4042
4043 store.create_property_index("key");
4044 assert!(store.has_property_index("key"));
4045
4046 assert!(store.drop_property_index("key"));
4047 assert!(!store.has_property_index("key"));
4048
4049 assert!(!store.drop_property_index("key"));
4051 }
4052
4053 #[test]
4054 fn test_property_index_multiple_values() {
4055 let store = LpgStore::new();
4056
4057 store.create_property_index("age");
4058
4059 let n1 = store.create_node(&["Person"]);
4061 let n2 = store.create_node(&["Person"]);
4062 let n3 = store.create_node(&["Person"]);
4063 let n4 = store.create_node(&["Person"]);
4064
4065 store.set_node_property(n1, "age", Value::from(25i64));
4066 store.set_node_property(n2, "age", Value::from(25i64));
4067 store.set_node_property(n3, "age", Value::from(30i64));
4068 store.set_node_property(n4, "age", Value::from(25i64));
4069
4070 let age_25 = store.find_nodes_by_property("age", &Value::from(25i64));
4071 assert_eq!(age_25.len(), 3);
4072
4073 let age_30 = store.find_nodes_by_property("age", &Value::from(30i64));
4074 assert_eq!(age_30.len(), 1);
4075
4076 let age_40 = store.find_nodes_by_property("age", &Value::from(40i64));
4077 assert!(age_40.is_empty());
4078 }
4079
4080 #[test]
4081 fn test_property_index_builds_from_existing_data() {
4082 let store = LpgStore::new();
4083
4084 let n1 = store.create_node(&["Person"]);
4086 let n2 = store.create_node(&["Person"]);
4087 store.set_node_property(n1, "email", Value::from("alice@example.com"));
4088 store.set_node_property(n2, "email", Value::from("bob@example.com"));
4089
4090 store.create_property_index("email");
4092
4093 let alice = store.find_nodes_by_property("email", &Value::from("alice@example.com"));
4095 assert_eq!(alice.len(), 1);
4096 assert!(alice.contains(&n1));
4097
4098 let bob = store.find_nodes_by_property("email", &Value::from("bob@example.com"));
4099 assert_eq!(bob.len(), 1);
4100 assert!(bob.contains(&n2));
4101 }
4102
4103 #[test]
4104 fn test_get_node_property_batch() {
4105 let store = LpgStore::new();
4106
4107 let n1 = store.create_node(&["Person"]);
4108 let n2 = store.create_node(&["Person"]);
4109 let n3 = store.create_node(&["Person"]);
4110
4111 store.set_node_property(n1, "age", Value::from(25i64));
4112 store.set_node_property(n2, "age", Value::from(30i64));
4113 let age_key = PropertyKey::new("age");
4116 let values = store.get_node_property_batch(&[n1, n2, n3], &age_key);
4117
4118 assert_eq!(values.len(), 3);
4119 assert_eq!(values[0], Some(Value::from(25i64)));
4120 assert_eq!(values[1], Some(Value::from(30i64)));
4121 assert_eq!(values[2], None);
4122 }
4123
4124 #[test]
4125 fn test_get_node_property_batch_empty() {
4126 let store = LpgStore::new();
4127 let key = PropertyKey::new("any");
4128
4129 let values = store.get_node_property_batch(&[], &key);
4130 assert!(values.is_empty());
4131 }
4132
4133 #[test]
4134 fn test_get_nodes_properties_batch() {
4135 let store = LpgStore::new();
4136
4137 let n1 = store.create_node(&["Person"]);
4138 let n2 = store.create_node(&["Person"]);
4139 let n3 = store.create_node(&["Person"]);
4140
4141 store.set_node_property(n1, "name", Value::from("Alice"));
4142 store.set_node_property(n1, "age", Value::from(25i64));
4143 store.set_node_property(n2, "name", Value::from("Bob"));
4144 let all_props = store.get_nodes_properties_batch(&[n1, n2, n3]);
4147
4148 assert_eq!(all_props.len(), 3);
4149 assert_eq!(all_props[0].len(), 2); assert_eq!(all_props[1].len(), 1); assert_eq!(all_props[2].len(), 0); assert_eq!(
4154 all_props[0].get(&PropertyKey::new("name")),
4155 Some(&Value::from("Alice"))
4156 );
4157 assert_eq!(
4158 all_props[1].get(&PropertyKey::new("name")),
4159 Some(&Value::from("Bob"))
4160 );
4161 }
4162
4163 #[test]
4164 fn test_get_nodes_properties_batch_empty() {
4165 let store = LpgStore::new();
4166
4167 let all_props = store.get_nodes_properties_batch(&[]);
4168 assert!(all_props.is_empty());
4169 }
4170
4171 #[test]
4172 fn test_get_nodes_properties_selective_batch() {
4173 let store = LpgStore::new();
4174
4175 let n1 = store.create_node(&["Person"]);
4176 let n2 = store.create_node(&["Person"]);
4177
4178 store.set_node_property(n1, "name", Value::from("Alice"));
4180 store.set_node_property(n1, "age", Value::from(25i64));
4181 store.set_node_property(n1, "email", Value::from("alice@example.com"));
4182 store.set_node_property(n2, "name", Value::from("Bob"));
4183 store.set_node_property(n2, "age", Value::from(30i64));
4184 store.set_node_property(n2, "city", Value::from("NYC"));
4185
4186 let keys = vec![PropertyKey::new("name"), PropertyKey::new("age")];
4188 let props = store.get_nodes_properties_selective_batch(&[n1, n2], &keys);
4189
4190 assert_eq!(props.len(), 2);
4191
4192 assert_eq!(props[0].len(), 2);
4194 assert_eq!(
4195 props[0].get(&PropertyKey::new("name")),
4196 Some(&Value::from("Alice"))
4197 );
4198 assert_eq!(
4199 props[0].get(&PropertyKey::new("age")),
4200 Some(&Value::from(25i64))
4201 );
4202 assert_eq!(props[0].get(&PropertyKey::new("email")), None);
4203
4204 assert_eq!(props[1].len(), 2);
4206 assert_eq!(
4207 props[1].get(&PropertyKey::new("name")),
4208 Some(&Value::from("Bob"))
4209 );
4210 assert_eq!(
4211 props[1].get(&PropertyKey::new("age")),
4212 Some(&Value::from(30i64))
4213 );
4214 assert_eq!(props[1].get(&PropertyKey::new("city")), None);
4215 }
4216
4217 #[test]
4218 fn test_get_nodes_properties_selective_batch_empty_keys() {
4219 let store = LpgStore::new();
4220
4221 let n1 = store.create_node(&["Person"]);
4222 store.set_node_property(n1, "name", Value::from("Alice"));
4223
4224 let props = store.get_nodes_properties_selective_batch(&[n1], &[]);
4226
4227 assert_eq!(props.len(), 1);
4228 assert!(props[0].is_empty()); }
4230
4231 #[test]
4232 fn test_get_nodes_properties_selective_batch_missing_keys() {
4233 let store = LpgStore::new();
4234
4235 let n1 = store.create_node(&["Person"]);
4236 store.set_node_property(n1, "name", Value::from("Alice"));
4237
4238 let keys = vec![PropertyKey::new("nonexistent"), PropertyKey::new("name")];
4240 let props = store.get_nodes_properties_selective_batch(&[n1], &keys);
4241
4242 assert_eq!(props.len(), 1);
4243 assert_eq!(props[0].len(), 1); assert_eq!(
4245 props[0].get(&PropertyKey::new("name")),
4246 Some(&Value::from("Alice"))
4247 );
4248 }
4249
4250 #[test]
4253 fn test_find_nodes_in_range_inclusive() {
4254 let store = LpgStore::new();
4255
4256 let n1 = store.create_node_with_props(&["Person"], [("age", Value::from(20i64))]);
4257 let n2 = store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
4258 let n3 = store.create_node_with_props(&["Person"], [("age", Value::from(40i64))]);
4259 let _n4 = store.create_node_with_props(&["Person"], [("age", Value::from(50i64))]);
4260
4261 let result = store.find_nodes_in_range(
4263 "age",
4264 Some(&Value::from(20i64)),
4265 Some(&Value::from(40i64)),
4266 true,
4267 true,
4268 );
4269 assert_eq!(result.len(), 3);
4270 assert!(result.contains(&n1));
4271 assert!(result.contains(&n2));
4272 assert!(result.contains(&n3));
4273 }
4274
4275 #[test]
4276 fn test_find_nodes_in_range_exclusive() {
4277 let store = LpgStore::new();
4278
4279 store.create_node_with_props(&["Person"], [("age", Value::from(20i64))]);
4280 let n2 = store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
4281 store.create_node_with_props(&["Person"], [("age", Value::from(40i64))]);
4282
4283 let result = store.find_nodes_in_range(
4285 "age",
4286 Some(&Value::from(20i64)),
4287 Some(&Value::from(40i64)),
4288 false,
4289 false,
4290 );
4291 assert_eq!(result.len(), 1);
4292 assert!(result.contains(&n2));
4293 }
4294
4295 #[test]
4296 fn test_find_nodes_in_range_open_ended() {
4297 let store = LpgStore::new();
4298
4299 store.create_node_with_props(&["Person"], [("age", Value::from(20i64))]);
4300 store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
4301 let n3 = store.create_node_with_props(&["Person"], [("age", Value::from(40i64))]);
4302 let n4 = store.create_node_with_props(&["Person"], [("age", Value::from(50i64))]);
4303
4304 let result = store.find_nodes_in_range("age", Some(&Value::from(35i64)), None, true, true);
4306 assert_eq!(result.len(), 2);
4307 assert!(result.contains(&n3));
4308 assert!(result.contains(&n4));
4309
4310 let result = store.find_nodes_in_range("age", None, Some(&Value::from(25i64)), true, true);
4312 assert_eq!(result.len(), 1);
4313 }
4314
4315 #[test]
4316 fn test_find_nodes_in_range_empty_result() {
4317 let store = LpgStore::new();
4318
4319 store.create_node_with_props(&["Person"], [("age", Value::from(20i64))]);
4320
4321 let result = store.find_nodes_in_range(
4323 "age",
4324 Some(&Value::from(100i64)),
4325 Some(&Value::from(200i64)),
4326 true,
4327 true,
4328 );
4329 assert!(result.is_empty());
4330 }
4331
4332 #[test]
4333 fn test_find_nodes_in_range_nonexistent_property() {
4334 let store = LpgStore::new();
4335
4336 store.create_node_with_props(&["Person"], [("age", Value::from(20i64))]);
4337
4338 let result = store.find_nodes_in_range(
4339 "weight",
4340 Some(&Value::from(50i64)),
4341 Some(&Value::from(100i64)),
4342 true,
4343 true,
4344 );
4345 assert!(result.is_empty());
4346 }
4347
4348 #[test]
4351 fn test_find_nodes_by_properties_multiple_conditions() {
4352 let store = LpgStore::new();
4353
4354 let alice = store.create_node_with_props(
4355 &["Person"],
4356 [("name", Value::from("Alice")), ("city", Value::from("NYC"))],
4357 );
4358 store.create_node_with_props(
4359 &["Person"],
4360 [("name", Value::from("Bob")), ("city", Value::from("NYC"))],
4361 );
4362 store.create_node_with_props(
4363 &["Person"],
4364 [("name", Value::from("Alice")), ("city", Value::from("LA"))],
4365 );
4366
4367 let result = store.find_nodes_by_properties(&[
4369 ("name", Value::from("Alice")),
4370 ("city", Value::from("NYC")),
4371 ]);
4372 assert_eq!(result.len(), 1);
4373 assert!(result.contains(&alice));
4374 }
4375
4376 #[test]
4377 fn test_find_nodes_by_properties_empty_conditions() {
4378 let store = LpgStore::new();
4379
4380 store.create_node(&["Person"]);
4381 store.create_node(&["Person"]);
4382
4383 let result = store.find_nodes_by_properties(&[]);
4385 assert_eq!(result.len(), 2);
4386 }
4387
4388 #[test]
4389 fn test_find_nodes_by_properties_no_match() {
4390 let store = LpgStore::new();
4391
4392 store.create_node_with_props(&["Person"], [("name", Value::from("Alice"))]);
4393
4394 let result = store.find_nodes_by_properties(&[("name", Value::from("Nobody"))]);
4395 assert!(result.is_empty());
4396 }
4397
4398 #[test]
4399 fn test_find_nodes_by_properties_with_index() {
4400 let store = LpgStore::new();
4401
4402 store.create_property_index("name");
4404
4405 let alice = store.create_node_with_props(
4406 &["Person"],
4407 [("name", Value::from("Alice")), ("age", Value::from(30i64))],
4408 );
4409 store.create_node_with_props(
4410 &["Person"],
4411 [("name", Value::from("Bob")), ("age", Value::from(30i64))],
4412 );
4413
4414 let result = store.find_nodes_by_properties(&[
4416 ("name", Value::from("Alice")),
4417 ("age", Value::from(30i64)),
4418 ]);
4419 assert_eq!(result.len(), 1);
4420 assert!(result.contains(&alice));
4421 }
4422
4423 #[test]
4426 fn test_estimate_label_cardinality() {
4427 let store = LpgStore::new();
4428
4429 store.create_node(&["Person"]);
4430 store.create_node(&["Person"]);
4431 store.create_node(&["Animal"]);
4432
4433 store.ensure_statistics_fresh();
4434
4435 let person_est = store.estimate_label_cardinality("Person");
4436 let animal_est = store.estimate_label_cardinality("Animal");
4437 let unknown_est = store.estimate_label_cardinality("Unknown");
4438
4439 assert!(
4440 person_est >= 1.0,
4441 "Person should have cardinality >= 1, got {person_est}"
4442 );
4443 assert!(
4444 animal_est >= 1.0,
4445 "Animal should have cardinality >= 1, got {animal_est}"
4446 );
4447 assert!(unknown_est >= 0.0);
4449 }
4450
4451 #[test]
4452 fn test_estimate_avg_degree() {
4453 let store = LpgStore::new();
4454
4455 let a = store.create_node(&["Person"]);
4456 let b = store.create_node(&["Person"]);
4457 let c = store.create_node(&["Person"]);
4458
4459 store.create_edge(a, b, "KNOWS");
4460 store.create_edge(a, c, "KNOWS");
4461 store.create_edge(b, c, "KNOWS");
4462
4463 store.ensure_statistics_fresh();
4464
4465 let outgoing = store.estimate_avg_degree("KNOWS", true);
4466 let incoming = store.estimate_avg_degree("KNOWS", false);
4467
4468 assert!(
4469 outgoing > 0.0,
4470 "Outgoing degree should be > 0, got {outgoing}"
4471 );
4472 assert!(
4473 incoming > 0.0,
4474 "Incoming degree should be > 0, got {incoming}"
4475 );
4476 }
4477
4478 #[test]
4481 fn test_delete_node_does_not_cascade() {
4482 let store = LpgStore::new();
4483
4484 let a = store.create_node(&["A"]);
4485 let b = store.create_node(&["B"]);
4486 let e = store.create_edge(a, b, "KNOWS");
4487
4488 assert!(store.delete_node(a));
4489 assert!(store.get_node(a).is_none());
4490
4491 assert!(
4493 store.get_edge(e).is_some(),
4494 "Edge should survive non-detach node delete"
4495 );
4496 }
4497
4498 #[test]
4499 fn test_delete_already_deleted_node() {
4500 let store = LpgStore::new();
4501 let a = store.create_node(&["A"]);
4502
4503 assert!(store.delete_node(a));
4504 assert!(!store.delete_node(a));
4506 }
4507
4508 #[test]
4509 fn test_delete_nonexistent_node() {
4510 let store = LpgStore::new();
4511 assert!(!store.delete_node(NodeId::new(999)));
4512 }
4513}