1use super::property::CompareOp;
13use super::{Edge, EdgeRecord, Node, NodeRecord, PropertyStorage};
14use crate::graph::Direction;
15use crate::index::adjacency::ChunkedAdjacency;
16use crate::index::zone_map::ZoneMapEntry;
17use crate::statistics::{EdgeTypeStatistics, LabelStatistics, Statistics};
18use arcstr::ArcStr;
19use dashmap::DashMap;
20#[cfg(not(feature = "tiered-storage"))]
21use grafeo_common::mvcc::VersionChain;
22use grafeo_common::types::{EdgeId, EpochId, HashableValue, NodeId, PropertyKey, TxId, Value};
23use grafeo_common::utils::hash::{FxHashMap, FxHashSet};
24use parking_lot::RwLock;
25use std::cmp::Ordering as CmpOrdering;
26#[cfg(any(
27 feature = "tiered-storage",
28 feature = "vector-index",
29 feature = "text-index"
30))]
31use std::sync::Arc;
32use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
33
34#[cfg(feature = "vector-index")]
35use crate::index::vector::HnswIndex;
36
37fn compare_values_for_range(a: &Value, b: &Value) -> Option<CmpOrdering> {
39 match (a, b) {
40 (Value::Int64(a), Value::Int64(b)) => Some(a.cmp(b)),
41 (Value::Float64(a), Value::Float64(b)) => a.partial_cmp(b),
42 (Value::String(a), Value::String(b)) => Some(a.cmp(b)),
43 (Value::Bool(a), Value::Bool(b)) => Some(a.cmp(b)),
44 _ => None,
45 }
46}
47
48fn value_in_range(
50 value: &Value,
51 min: Option<&Value>,
52 max: Option<&Value>,
53 min_inclusive: bool,
54 max_inclusive: bool,
55) -> bool {
56 if let Some(min_val) = min {
58 match compare_values_for_range(value, min_val) {
59 Some(CmpOrdering::Less) => return false,
60 Some(CmpOrdering::Equal) if !min_inclusive => return false,
61 None => return false, _ => {}
63 }
64 }
65
66 if let Some(max_val) = max {
68 match compare_values_for_range(value, max_val) {
69 Some(CmpOrdering::Greater) => return false,
70 Some(CmpOrdering::Equal) if !max_inclusive => return false,
71 None => return false,
72 _ => {}
73 }
74 }
75
76 true
77}
78
79#[cfg(feature = "tiered-storage")]
81use crate::storage::EpochStore;
82#[cfg(feature = "tiered-storage")]
83use grafeo_common::memory::arena::ArenaAllocator;
84#[cfg(feature = "tiered-storage")]
85use grafeo_common::mvcc::{ColdVersionRef, HotVersionRef, VersionIndex, VersionRef};
86
87#[derive(Debug, Clone)]
93pub struct LpgStoreConfig {
94 pub backward_edges: bool,
97 pub initial_node_capacity: usize,
99 pub initial_edge_capacity: usize,
101}
102
103impl Default for LpgStoreConfig {
104 fn default() -> Self {
105 Self {
106 backward_edges: true,
107 initial_node_capacity: 1024,
108 initial_edge_capacity: 4096,
109 }
110 }
111}
112
113pub struct LpgStore {
172 #[allow(dead_code)]
174 config: LpgStoreConfig,
175
176 #[cfg(not(feature = "tiered-storage"))]
180 nodes: RwLock<FxHashMap<NodeId, VersionChain<NodeRecord>>>,
181
182 #[cfg(not(feature = "tiered-storage"))]
186 edges: RwLock<FxHashMap<EdgeId, VersionChain<EdgeRecord>>>,
187
188 #[cfg(feature = "tiered-storage")]
202 arena_allocator: Arc<ArenaAllocator>,
203
204 #[cfg(feature = "tiered-storage")]
208 node_versions: RwLock<FxHashMap<NodeId, VersionIndex>>,
209
210 #[cfg(feature = "tiered-storage")]
214 edge_versions: RwLock<FxHashMap<EdgeId, VersionIndex>>,
215
216 #[cfg(feature = "tiered-storage")]
219 epoch_store: Arc<EpochStore>,
220
221 node_properties: PropertyStorage<NodeId>,
223
224 edge_properties: PropertyStorage<EdgeId>,
226
227 label_to_id: RwLock<FxHashMap<ArcStr, u32>>,
230
231 id_to_label: RwLock<Vec<ArcStr>>,
234
235 edge_type_to_id: RwLock<FxHashMap<ArcStr, u32>>,
238
239 id_to_edge_type: RwLock<Vec<ArcStr>>,
242
243 forward_adj: ChunkedAdjacency,
245
246 backward_adj: Option<ChunkedAdjacency>,
249
250 label_index: RwLock<Vec<FxHashMap<NodeId, ()>>>,
253
254 node_labels: RwLock<FxHashMap<NodeId, FxHashSet<u32>>>,
258
259 property_indexes: RwLock<FxHashMap<PropertyKey, DashMap<HashableValue, FxHashSet<NodeId>>>>,
265
266 #[cfg(feature = "vector-index")]
271 vector_indexes: RwLock<FxHashMap<String, Arc<HnswIndex>>>,
272
273 #[cfg(feature = "text-index")]
278 text_indexes: RwLock<FxHashMap<String, Arc<RwLock<crate::index::text::InvertedIndex>>>>,
279
280 next_node_id: AtomicU64,
282
283 next_edge_id: AtomicU64,
285
286 current_epoch: AtomicU64,
288
289 statistics: RwLock<Statistics>,
292
293 needs_stats_recompute: AtomicBool,
295}
296
297impl LpgStore {
298 #[must_use]
300 pub fn new() -> Self {
301 Self::with_config(LpgStoreConfig::default())
302 }
303
304 #[must_use]
306 pub fn with_config(config: LpgStoreConfig) -> Self {
307 let backward_adj = if config.backward_edges {
308 Some(ChunkedAdjacency::new())
309 } else {
310 None
311 };
312
313 Self {
314 #[cfg(not(feature = "tiered-storage"))]
315 nodes: RwLock::new(FxHashMap::default()),
316 #[cfg(not(feature = "tiered-storage"))]
317 edges: RwLock::new(FxHashMap::default()),
318 #[cfg(feature = "tiered-storage")]
319 arena_allocator: Arc::new(ArenaAllocator::new()),
320 #[cfg(feature = "tiered-storage")]
321 node_versions: RwLock::new(FxHashMap::default()),
322 #[cfg(feature = "tiered-storage")]
323 edge_versions: RwLock::new(FxHashMap::default()),
324 #[cfg(feature = "tiered-storage")]
325 epoch_store: Arc::new(EpochStore::new()),
326 node_properties: PropertyStorage::new(),
327 edge_properties: PropertyStorage::new(),
328 label_to_id: RwLock::new(FxHashMap::default()),
329 id_to_label: RwLock::new(Vec::new()),
330 edge_type_to_id: RwLock::new(FxHashMap::default()),
331 id_to_edge_type: RwLock::new(Vec::new()),
332 forward_adj: ChunkedAdjacency::new(),
333 backward_adj,
334 label_index: RwLock::new(Vec::new()),
335 node_labels: RwLock::new(FxHashMap::default()),
336 property_indexes: RwLock::new(FxHashMap::default()),
337 #[cfg(feature = "vector-index")]
338 vector_indexes: RwLock::new(FxHashMap::default()),
339 #[cfg(feature = "text-index")]
340 text_indexes: RwLock::new(FxHashMap::default()),
341 next_node_id: AtomicU64::new(0),
342 next_edge_id: AtomicU64::new(0),
343 current_epoch: AtomicU64::new(0),
344 statistics: RwLock::new(Statistics::new()),
345 needs_stats_recompute: AtomicBool::new(true),
346 config,
347 }
348 }
349
350 #[must_use]
352 pub fn current_epoch(&self) -> EpochId {
353 EpochId::new(self.current_epoch.load(Ordering::Acquire))
354 }
355
356 pub fn new_epoch(&self) -> EpochId {
358 let id = self.current_epoch.fetch_add(1, Ordering::AcqRel) + 1;
359 EpochId::new(id)
360 }
361
362 pub fn sync_epoch(&self, epoch: EpochId) {
367 self.current_epoch
368 .fetch_max(epoch.as_u64(), Ordering::AcqRel);
369 }
370
371 pub fn create_node(&self, labels: &[&str]) -> NodeId {
377 self.needs_stats_recompute.store(true, Ordering::Relaxed);
378 self.create_node_versioned(labels, self.current_epoch(), TxId::SYSTEM)
379 }
380
381 #[cfg(not(feature = "tiered-storage"))]
383 pub fn create_node_versioned(&self, labels: &[&str], epoch: EpochId, tx_id: TxId) -> NodeId {
384 let id = NodeId::new(self.next_node_id.fetch_add(1, Ordering::Relaxed));
385
386 let mut record = NodeRecord::new(id, epoch);
387 record.set_label_count(labels.len() as u16);
388
389 let mut node_label_set = FxHashSet::default();
391 for label in labels {
392 let label_id = self.get_or_create_label_id(*label);
393 node_label_set.insert(label_id);
394
395 let mut index = self.label_index.write();
397 while index.len() <= label_id as usize {
398 index.push(FxHashMap::default());
399 }
400 index[label_id as usize].insert(id, ());
401 }
402
403 self.node_labels.write().insert(id, node_label_set);
405
406 let chain = VersionChain::with_initial(record, epoch, tx_id);
408 self.nodes.write().insert(id, chain);
409 id
410 }
411
412 #[cfg(feature = "tiered-storage")]
415 pub fn create_node_versioned(&self, labels: &[&str], epoch: EpochId, tx_id: TxId) -> NodeId {
416 let id = NodeId::new(self.next_node_id.fetch_add(1, Ordering::Relaxed));
417
418 let mut record = NodeRecord::new(id, epoch);
419 record.set_label_count(labels.len() as u16);
420
421 let mut node_label_set = FxHashSet::default();
423 for label in labels {
424 let label_id = self.get_or_create_label_id(*label);
425 node_label_set.insert(label_id);
426
427 let mut index = self.label_index.write();
429 while index.len() <= label_id as usize {
430 index.push(FxHashMap::default());
431 }
432 index[label_id as usize].insert(id, ());
433 }
434
435 self.node_labels.write().insert(id, node_label_set);
437
438 let arena = self.arena_allocator.arena_or_create(epoch);
440 let (offset, _stored) = arena.alloc_value_with_offset(record);
441
442 let hot_ref = HotVersionRef::new(epoch, offset, tx_id);
444
445 let mut versions = self.node_versions.write();
447 if let Some(index) = versions.get_mut(&id) {
448 index.add_hot(hot_ref);
449 } else {
450 versions.insert(id, VersionIndex::with_initial(hot_ref));
451 }
452
453 id
454 }
455
456 pub fn create_node_with_props(
458 &self,
459 labels: &[&str],
460 properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
461 ) -> NodeId {
462 self.create_node_with_props_versioned(
463 labels,
464 properties,
465 self.current_epoch(),
466 TxId::SYSTEM,
467 )
468 }
469
470 #[cfg(not(feature = "tiered-storage"))]
472 pub fn create_node_with_props_versioned(
473 &self,
474 labels: &[&str],
475 properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
476 epoch: EpochId,
477 tx_id: TxId,
478 ) -> NodeId {
479 let id = self.create_node_versioned(labels, epoch, tx_id);
480
481 for (key, value) in properties {
482 let prop_key: PropertyKey = key.into();
483 let prop_value: Value = value.into();
484 self.update_property_index_on_set(id, &prop_key, &prop_value);
486 self.node_properties.set(id, prop_key, prop_value);
487 }
488
489 let count = self.node_properties.get_all(id).len() as u16;
491 if let Some(chain) = self.nodes.write().get_mut(&id)
492 && let Some(record) = chain.latest_mut()
493 {
494 record.props_count = count;
495 }
496
497 id
498 }
499
500 #[cfg(feature = "tiered-storage")]
503 pub fn create_node_with_props_versioned(
504 &self,
505 labels: &[&str],
506 properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
507 epoch: EpochId,
508 tx_id: TxId,
509 ) -> NodeId {
510 let id = self.create_node_versioned(labels, epoch, tx_id);
511
512 for (key, value) in properties {
513 let prop_key: PropertyKey = key.into();
514 let prop_value: Value = value.into();
515 self.update_property_index_on_set(id, &prop_key, &prop_value);
517 self.node_properties.set(id, prop_key, prop_value);
518 }
519
520 id
524 }
525
526 #[must_use]
528 pub fn get_node(&self, id: NodeId) -> Option<Node> {
529 self.get_node_at_epoch(id, self.current_epoch())
530 }
531
532 #[must_use]
534 #[cfg(not(feature = "tiered-storage"))]
535 pub fn get_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> Option<Node> {
536 let nodes = self.nodes.read();
537 let chain = nodes.get(&id)?;
538 let record = chain.visible_at(epoch)?;
539
540 if record.is_deleted() {
541 return None;
542 }
543
544 let mut node = Node::new(id);
545
546 let id_to_label = self.id_to_label.read();
548 let node_labels = self.node_labels.read();
549 if let Some(label_ids) = node_labels.get(&id) {
550 for &label_id in label_ids {
551 if let Some(label) = id_to_label.get(label_id as usize) {
552 node.labels.push(label.clone());
553 }
554 }
555 }
556
557 node.properties = self.node_properties.get_all(id).into_iter().collect();
559
560 Some(node)
561 }
562
563 #[must_use]
566 #[cfg(feature = "tiered-storage")]
567 pub fn get_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> Option<Node> {
568 let versions = self.node_versions.read();
569 let index = versions.get(&id)?;
570 let version_ref = index.visible_at(epoch)?;
571
572 let record = self.read_node_record(&version_ref)?;
574
575 if record.is_deleted() {
576 return None;
577 }
578
579 let mut node = Node::new(id);
580
581 let id_to_label = self.id_to_label.read();
583 let node_labels = self.node_labels.read();
584 if let Some(label_ids) = node_labels.get(&id) {
585 for &label_id in label_ids {
586 if let Some(label) = id_to_label.get(label_id as usize) {
587 node.labels.push(label.clone());
588 }
589 }
590 }
591
592 node.properties = self.node_properties.get_all(id).into_iter().collect();
594
595 Some(node)
596 }
597
598 #[must_use]
600 #[cfg(not(feature = "tiered-storage"))]
601 pub fn get_node_versioned(&self, id: NodeId, epoch: EpochId, tx_id: TxId) -> Option<Node> {
602 let nodes = self.nodes.read();
603 let chain = nodes.get(&id)?;
604 let record = chain.visible_to(epoch, tx_id)?;
605
606 if record.is_deleted() {
607 return None;
608 }
609
610 let mut node = Node::new(id);
611
612 let id_to_label = self.id_to_label.read();
614 let node_labels = self.node_labels.read();
615 if let Some(label_ids) = node_labels.get(&id) {
616 for &label_id in label_ids {
617 if let Some(label) = id_to_label.get(label_id as usize) {
618 node.labels.push(label.clone());
619 }
620 }
621 }
622
623 node.properties = self.node_properties.get_all(id).into_iter().collect();
625
626 Some(node)
627 }
628
629 #[must_use]
632 #[cfg(feature = "tiered-storage")]
633 pub fn get_node_versioned(&self, id: NodeId, epoch: EpochId, tx_id: TxId) -> Option<Node> {
634 let versions = self.node_versions.read();
635 let index = versions.get(&id)?;
636 let version_ref = index.visible_to(epoch, tx_id)?;
637
638 let record = self.read_node_record(&version_ref)?;
640
641 if record.is_deleted() {
642 return None;
643 }
644
645 let mut node = Node::new(id);
646
647 let id_to_label = self.id_to_label.read();
649 let node_labels = self.node_labels.read();
650 if let Some(label_ids) = node_labels.get(&id) {
651 for &label_id in label_ids {
652 if let Some(label) = id_to_label.get(label_id as usize) {
653 node.labels.push(label.clone());
654 }
655 }
656 }
657
658 node.properties = self.node_properties.get_all(id).into_iter().collect();
660
661 Some(node)
662 }
663
664 #[cfg(feature = "tiered-storage")]
666 #[allow(unsafe_code)]
667 fn read_node_record(&self, version_ref: &VersionRef) -> Option<NodeRecord> {
668 match version_ref {
669 VersionRef::Hot(hot_ref) => {
670 let arena = self.arena_allocator.arena(hot_ref.epoch);
671 let record: &NodeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
673 Some(*record)
674 }
675 VersionRef::Cold(cold_ref) => {
676 self.epoch_store
678 .get_node(cold_ref.epoch, cold_ref.block_offset, cold_ref.length)
679 }
680 }
681 }
682
683 pub fn delete_node(&self, id: NodeId) -> bool {
685 self.needs_stats_recompute.store(true, Ordering::Relaxed);
686 self.delete_node_at_epoch(id, self.current_epoch())
687 }
688
689 #[cfg(not(feature = "tiered-storage"))]
691 pub fn delete_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> bool {
692 let mut nodes = self.nodes.write();
693 if let Some(chain) = nodes.get_mut(&id) {
694 if let Some(record) = chain.visible_at(epoch) {
696 if record.is_deleted() {
697 return false;
698 }
699 } else {
700 return false;
702 }
703
704 chain.mark_deleted(epoch);
706
707 let mut index = self.label_index.write();
709 let mut node_labels = self.node_labels.write();
710 if let Some(label_ids) = node_labels.remove(&id) {
711 for label_id in label_ids {
712 if let Some(set) = index.get_mut(label_id as usize) {
713 set.remove(&id);
714 }
715 }
716 }
717
718 drop(nodes); drop(index);
721 drop(node_labels);
722 self.node_properties.remove_all(id);
723
724 true
727 } else {
728 false
729 }
730 }
731
732 #[cfg(feature = "tiered-storage")]
735 pub fn delete_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> bool {
736 let mut versions = self.node_versions.write();
737 if let Some(index) = versions.get_mut(&id) {
738 if let Some(version_ref) = index.visible_at(epoch) {
740 if let Some(record) = self.read_node_record(&version_ref) {
741 if record.is_deleted() {
742 return false;
743 }
744 } else {
745 return false;
746 }
747 } else {
748 return false;
749 }
750
751 index.mark_deleted(epoch);
753
754 let mut label_index = self.label_index.write();
756 let mut node_labels = self.node_labels.write();
757 if let Some(label_ids) = node_labels.remove(&id) {
758 for label_id in label_ids {
759 if let Some(set) = label_index.get_mut(label_id as usize) {
760 set.remove(&id);
761 }
762 }
763 }
764
765 drop(versions);
767 drop(label_index);
768 drop(node_labels);
769 self.node_properties.remove_all(id);
770
771 true
772 } else {
773 false
774 }
775 }
776
777 #[cfg(not(feature = "tiered-storage"))]
782 pub fn delete_node_edges(&self, node_id: NodeId) {
783 let outgoing: Vec<EdgeId> = self
785 .forward_adj
786 .edges_from(node_id)
787 .into_iter()
788 .map(|(_, edge_id)| edge_id)
789 .collect();
790
791 let incoming: Vec<EdgeId> = if let Some(ref backward) = self.backward_adj {
793 backward
794 .edges_from(node_id)
795 .into_iter()
796 .map(|(_, edge_id)| edge_id)
797 .collect()
798 } else {
799 let epoch = self.current_epoch();
801 self.edges
802 .read()
803 .iter()
804 .filter_map(|(id, chain)| {
805 chain.visible_at(epoch).and_then(|r| {
806 if !r.is_deleted() && r.dst == node_id {
807 Some(*id)
808 } else {
809 None
810 }
811 })
812 })
813 .collect()
814 };
815
816 for edge_id in outgoing.into_iter().chain(incoming) {
818 self.delete_edge(edge_id);
819 }
820 }
821
822 #[cfg(feature = "tiered-storage")]
825 pub fn delete_node_edges(&self, node_id: NodeId) {
826 let outgoing: Vec<EdgeId> = self
828 .forward_adj
829 .edges_from(node_id)
830 .into_iter()
831 .map(|(_, edge_id)| edge_id)
832 .collect();
833
834 let incoming: Vec<EdgeId> = if let Some(ref backward) = self.backward_adj {
836 backward
837 .edges_from(node_id)
838 .into_iter()
839 .map(|(_, edge_id)| edge_id)
840 .collect()
841 } else {
842 let epoch = self.current_epoch();
844 let versions = self.edge_versions.read();
845 versions
846 .iter()
847 .filter_map(|(id, index)| {
848 index.visible_at(epoch).and_then(|vref| {
849 self.read_edge_record(&vref).and_then(|r| {
850 if !r.is_deleted() && r.dst == node_id {
851 Some(*id)
852 } else {
853 None
854 }
855 })
856 })
857 })
858 .collect()
859 };
860
861 for edge_id in outgoing.into_iter().chain(incoming) {
863 self.delete_edge(edge_id);
864 }
865 }
866
867 #[cfg(not(feature = "tiered-storage"))]
869 pub fn set_node_property(&self, id: NodeId, key: &str, value: Value) {
870 let prop_key: PropertyKey = key.into();
871
872 self.update_property_index_on_set(id, &prop_key, &value);
874
875 self.node_properties.set(id, prop_key, value);
876
877 let count = self.node_properties.get_all(id).len() as u16;
879 if let Some(chain) = self.nodes.write().get_mut(&id)
880 && let Some(record) = chain.latest_mut()
881 {
882 record.props_count = count;
883 }
884 }
885
886 #[cfg(feature = "tiered-storage")]
889 pub fn set_node_property(&self, id: NodeId, key: &str, value: Value) {
890 let prop_key: PropertyKey = key.into();
891
892 self.update_property_index_on_set(id, &prop_key, &value);
894
895 self.node_properties.set(id, prop_key, value);
896 }
900
901 pub fn set_edge_property(&self, id: EdgeId, key: &str, value: Value) {
903 self.edge_properties.set(id, key.into(), value);
904 }
905
906 #[cfg(not(feature = "tiered-storage"))]
910 pub fn remove_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
911 let prop_key: PropertyKey = key.into();
912
913 self.update_property_index_on_remove(id, &prop_key);
915
916 let result = self.node_properties.remove(id, &prop_key);
917
918 let count = self.node_properties.get_all(id).len() as u16;
920 if let Some(chain) = self.nodes.write().get_mut(&id)
921 && let Some(record) = chain.latest_mut()
922 {
923 record.props_count = count;
924 }
925
926 result
927 }
928
929 #[cfg(feature = "tiered-storage")]
932 pub fn remove_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
933 let prop_key: PropertyKey = key.into();
934
935 self.update_property_index_on_remove(id, &prop_key);
937
938 self.node_properties.remove(id, &prop_key)
939 }
941
942 pub fn remove_edge_property(&self, id: EdgeId, key: &str) -> Option<Value> {
946 self.edge_properties.remove(id, &key.into())
947 }
948
949 #[must_use]
964 pub fn get_node_property(&self, id: NodeId, key: &PropertyKey) -> Option<Value> {
965 self.node_properties.get(id, key)
966 }
967
968 #[must_use]
972 pub fn get_edge_property(&self, id: EdgeId, key: &PropertyKey) -> Option<Value> {
973 self.edge_properties.get(id, key)
974 }
975
976 #[must_use]
999 pub fn get_node_property_batch(&self, ids: &[NodeId], key: &PropertyKey) -> Vec<Option<Value>> {
1000 self.node_properties.get_batch(ids, key)
1001 }
1002
1003 #[must_use]
1008 pub fn get_nodes_properties_batch(&self, ids: &[NodeId]) -> Vec<FxHashMap<PropertyKey, Value>> {
1009 self.node_properties.get_all_batch(ids)
1010 }
1011
1012 #[must_use]
1040 pub fn get_nodes_properties_selective_batch(
1041 &self,
1042 ids: &[NodeId],
1043 keys: &[PropertyKey],
1044 ) -> Vec<FxHashMap<PropertyKey, Value>> {
1045 self.node_properties.get_selective_batch(ids, keys)
1046 }
1047
1048 #[must_use]
1052 pub fn get_edges_properties_selective_batch(
1053 &self,
1054 ids: &[EdgeId],
1055 keys: &[PropertyKey],
1056 ) -> Vec<FxHashMap<PropertyKey, Value>> {
1057 self.edge_properties.get_selective_batch(ids, keys)
1058 }
1059
1060 #[must_use]
1096 pub fn find_nodes_in_range(
1097 &self,
1098 property: &str,
1099 min: Option<&Value>,
1100 max: Option<&Value>,
1101 min_inclusive: bool,
1102 max_inclusive: bool,
1103 ) -> Vec<NodeId> {
1104 let key = PropertyKey::new(property);
1105
1106 if !self
1108 .node_properties
1109 .might_match_range(&key, min, max, min_inclusive, max_inclusive)
1110 {
1111 return Vec::new();
1112 }
1113
1114 self.node_ids()
1116 .into_iter()
1117 .filter(|&node_id| {
1118 self.node_properties
1119 .get(node_id, &key)
1120 .is_some_and(|v| value_in_range(&v, min, max, min_inclusive, max_inclusive))
1121 })
1122 .collect()
1123 }
1124
1125 #[must_use]
1150 pub fn find_nodes_by_properties(&self, conditions: &[(&str, Value)]) -> Vec<NodeId> {
1151 if conditions.is_empty() {
1152 return self.node_ids();
1153 }
1154
1155 let mut best_start: Option<(usize, Vec<NodeId>)> = None;
1158 let indexes = self.property_indexes.read();
1159
1160 for (i, (prop, value)) in conditions.iter().enumerate() {
1161 let key = PropertyKey::new(*prop);
1162 let hv = HashableValue::new(value.clone());
1163
1164 if let Some(index) = indexes.get(&key) {
1165 let matches: Vec<NodeId> = index
1166 .get(&hv)
1167 .map(|nodes| nodes.iter().copied().collect())
1168 .unwrap_or_default();
1169
1170 if matches.is_empty() {
1172 return Vec::new();
1173 }
1174
1175 if best_start
1177 .as_ref()
1178 .is_none_or(|(_, best)| matches.len() < best.len())
1179 {
1180 best_start = Some((i, matches));
1181 }
1182 }
1183 }
1184 drop(indexes);
1185
1186 let (start_idx, mut candidates) = best_start.unwrap_or_else(|| {
1188 let (prop, value) = &conditions[0];
1190 (0, self.find_nodes_by_property(prop, value))
1191 });
1192
1193 for (i, (prop, value)) in conditions.iter().enumerate() {
1195 if i == start_idx {
1196 continue;
1197 }
1198
1199 let key = PropertyKey::new(*prop);
1200 candidates.retain(|&node_id| {
1201 self.node_properties
1202 .get(node_id, &key)
1203 .is_some_and(|v| v == *value)
1204 });
1205
1206 if candidates.is_empty() {
1208 return Vec::new();
1209 }
1210 }
1211
1212 candidates
1213 }
1214
1215 pub fn create_property_index(&self, property: &str) {
1243 let key = PropertyKey::new(property);
1244
1245 let mut indexes = self.property_indexes.write();
1246 if indexes.contains_key(&key) {
1247 return; }
1249
1250 let index: DashMap<HashableValue, FxHashSet<NodeId>> = DashMap::new();
1252
1253 for node_id in self.node_ids() {
1255 if let Some(value) = self.node_properties.get(node_id, &key) {
1256 let hv = HashableValue::new(value);
1257 index.entry(hv).or_default().insert(node_id);
1258 }
1259 }
1260
1261 indexes.insert(key, index);
1262 }
1263
1264 pub fn drop_property_index(&self, property: &str) -> bool {
1268 let key = PropertyKey::new(property);
1269 self.property_indexes.write().remove(&key).is_some()
1270 }
1271
1272 #[must_use]
1274 pub fn has_property_index(&self, property: &str) -> bool {
1275 let key = PropertyKey::new(property);
1276 self.property_indexes.read().contains_key(&key)
1277 }
1278
1279 #[cfg(feature = "vector-index")]
1281 pub fn add_vector_index(&self, label: &str, property: &str, index: Arc<HnswIndex>) {
1282 let key = format!("{label}:{property}");
1283 self.vector_indexes.write().insert(key, index);
1284 }
1285
1286 #[cfg(feature = "vector-index")]
1288 #[must_use]
1289 pub fn get_vector_index(&self, label: &str, property: &str) -> Option<Arc<HnswIndex>> {
1290 let key = format!("{label}:{property}");
1291 self.vector_indexes.read().get(&key).cloned()
1292 }
1293
1294 #[cfg(feature = "vector-index")]
1298 pub fn remove_vector_index(&self, label: &str, property: &str) -> bool {
1299 let key = format!("{label}:{property}");
1300 self.vector_indexes.write().remove(&key).is_some()
1301 }
1302
1303 #[cfg(feature = "vector-index")]
1307 #[must_use]
1308 pub fn vector_index_entries(&self) -> Vec<(String, Arc<HnswIndex>)> {
1309 self.vector_indexes
1310 .read()
1311 .iter()
1312 .map(|(k, v)| (k.clone(), v.clone()))
1313 .collect()
1314 }
1315
1316 #[cfg(feature = "text-index")]
1318 pub fn add_text_index(
1319 &self,
1320 label: &str,
1321 property: &str,
1322 index: Arc<RwLock<crate::index::text::InvertedIndex>>,
1323 ) {
1324 let key = format!("{label}:{property}");
1325 self.text_indexes.write().insert(key, index);
1326 }
1327
1328 #[cfg(feature = "text-index")]
1330 #[must_use]
1331 pub fn get_text_index(
1332 &self,
1333 label: &str,
1334 property: &str,
1335 ) -> Option<Arc<RwLock<crate::index::text::InvertedIndex>>> {
1336 let key = format!("{label}:{property}");
1337 self.text_indexes.read().get(&key).cloned()
1338 }
1339
1340 #[cfg(feature = "text-index")]
1344 pub fn remove_text_index(&self, label: &str, property: &str) -> bool {
1345 let key = format!("{label}:{property}");
1346 self.text_indexes.write().remove(&key).is_some()
1347 }
1348
1349 #[cfg(feature = "text-index")]
1353 pub fn text_index_entries(
1354 &self,
1355 ) -> Vec<(String, Arc<RwLock<crate::index::text::InvertedIndex>>)> {
1356 self.text_indexes
1357 .read()
1358 .iter()
1359 .map(|(k, v)| (k.clone(), v.clone()))
1360 .collect()
1361 }
1362
1363 #[must_use]
1386 pub fn find_nodes_by_property(&self, property: &str, value: &Value) -> Vec<NodeId> {
1387 let key = PropertyKey::new(property);
1388 let hv = HashableValue::new(value.clone());
1389
1390 let indexes = self.property_indexes.read();
1392 if let Some(index) = indexes.get(&key) {
1393 if let Some(nodes) = index.get(&hv) {
1394 return nodes.iter().copied().collect();
1395 }
1396 return Vec::new();
1397 }
1398 drop(indexes);
1399
1400 self.node_ids()
1402 .into_iter()
1403 .filter(|&node_id| {
1404 self.node_properties
1405 .get(node_id, &key)
1406 .is_some_and(|v| v == *value)
1407 })
1408 .collect()
1409 }
1410
1411 pub fn find_nodes_matching_filter(&self, property: &str, filter_value: &Value) -> Vec<NodeId> {
1417 let key = PropertyKey::new(property);
1418 self.node_ids()
1419 .into_iter()
1420 .filter(|&node_id| {
1421 self.node_properties
1422 .get(node_id, &key)
1423 .is_some_and(|v| Self::matches_filter(&v, filter_value))
1424 })
1425 .collect()
1426 }
1427
1428 fn matches_filter(node_value: &Value, filter_value: &Value) -> bool {
1433 match filter_value {
1434 Value::Map(ops) if ops.keys().any(|k| k.as_str().starts_with('$')) => {
1435 ops.iter().all(|(op_key, op_val)| {
1436 match op_key.as_str() {
1437 "$gt" => {
1438 Self::compare_values(node_value, op_val)
1439 == Some(std::cmp::Ordering::Greater)
1440 }
1441 "$gte" => matches!(
1442 Self::compare_values(node_value, op_val),
1443 Some(std::cmp::Ordering::Greater | std::cmp::Ordering::Equal)
1444 ),
1445 "$lt" => {
1446 Self::compare_values(node_value, op_val)
1447 == Some(std::cmp::Ordering::Less)
1448 }
1449 "$lte" => matches!(
1450 Self::compare_values(node_value, op_val),
1451 Some(std::cmp::Ordering::Less | std::cmp::Ordering::Equal)
1452 ),
1453 "$ne" => node_value != op_val,
1454 "$in" => match op_val {
1455 Value::List(items) => items.iter().any(|v| v == node_value),
1456 _ => false,
1457 },
1458 "$nin" => match op_val {
1459 Value::List(items) => !items.iter().any(|v| v == node_value),
1460 _ => true,
1461 },
1462 "$contains" => match (node_value, op_val) {
1463 (Value::String(a), Value::String(b)) => a.contains(b.as_str()),
1464 _ => false,
1465 },
1466 _ => false, }
1468 })
1469 }
1470 _ => node_value == filter_value, }
1472 }
1473
1474 fn compare_values(a: &Value, b: &Value) -> Option<std::cmp::Ordering> {
1476 match (a, b) {
1477 (Value::Int64(a), Value::Int64(b)) => Some(a.cmp(b)),
1478 (Value::Float64(a), Value::Float64(b)) => a.partial_cmp(b),
1479 (Value::String(a), Value::String(b)) => Some(a.cmp(b)),
1480 (Value::Bool(a), Value::Bool(b)) => Some(a.cmp(b)),
1481 (Value::Int64(a), Value::Float64(b)) => (*a as f64).partial_cmp(b),
1482 (Value::Float64(a), Value::Int64(b)) => a.partial_cmp(&(*b as f64)),
1483 _ => None,
1484 }
1485 }
1486
1487 fn update_property_index_on_set(&self, node_id: NodeId, key: &PropertyKey, new_value: &Value) {
1489 let indexes = self.property_indexes.read();
1490 if let Some(index) = indexes.get(key) {
1491 if let Some(old_value) = self.node_properties.get(node_id, key) {
1493 let old_hv = HashableValue::new(old_value);
1494 if let Some(mut nodes) = index.get_mut(&old_hv) {
1495 nodes.remove(&node_id);
1496 if nodes.is_empty() {
1497 drop(nodes);
1498 index.remove(&old_hv);
1499 }
1500 }
1501 }
1502
1503 let new_hv = HashableValue::new(new_value.clone());
1505 index
1506 .entry(new_hv)
1507 .or_insert_with(FxHashSet::default)
1508 .insert(node_id);
1509 }
1510 }
1511
1512 fn update_property_index_on_remove(&self, node_id: NodeId, key: &PropertyKey) {
1514 let indexes = self.property_indexes.read();
1515 if let Some(index) = indexes.get(key) {
1516 if let Some(old_value) = self.node_properties.get(node_id, key) {
1518 let old_hv = HashableValue::new(old_value);
1519 if let Some(mut nodes) = index.get_mut(&old_hv) {
1520 nodes.remove(&node_id);
1521 if nodes.is_empty() {
1522 drop(nodes);
1523 index.remove(&old_hv);
1524 }
1525 }
1526 }
1527 }
1528 }
1529
1530 #[cfg(not(feature = "tiered-storage"))]
1535 pub fn add_label(&self, node_id: NodeId, label: &str) -> bool {
1536 let epoch = self.current_epoch();
1537
1538 let nodes = self.nodes.read();
1540 if let Some(chain) = nodes.get(&node_id) {
1541 if chain.visible_at(epoch).map_or(true, |r| r.is_deleted()) {
1542 return false;
1543 }
1544 } else {
1545 return false;
1546 }
1547 drop(nodes);
1548
1549 let label_id = self.get_or_create_label_id(label);
1551
1552 let mut node_labels = self.node_labels.write();
1554 let label_set = node_labels.entry(node_id).or_default();
1555
1556 if label_set.contains(&label_id) {
1557 return false; }
1559
1560 label_set.insert(label_id);
1561 drop(node_labels);
1562
1563 let mut index = self.label_index.write();
1565 if (label_id as usize) >= index.len() {
1566 index.resize(label_id as usize + 1, FxHashMap::default());
1567 }
1568 index[label_id as usize].insert(node_id, ());
1569
1570 if let Some(chain) = self.nodes.write().get_mut(&node_id)
1572 && let Some(record) = chain.latest_mut()
1573 {
1574 let count = self.node_labels.read().get(&node_id).map_or(0, |s| s.len());
1575 record.set_label_count(count as u16);
1576 }
1577
1578 true
1579 }
1580
1581 #[cfg(feature = "tiered-storage")]
1584 pub fn add_label(&self, node_id: NodeId, label: &str) -> bool {
1585 let epoch = self.current_epoch();
1586
1587 let versions = self.node_versions.read();
1589 if let Some(index) = versions.get(&node_id) {
1590 if let Some(vref) = index.visible_at(epoch) {
1591 if let Some(record) = self.read_node_record(&vref) {
1592 if record.is_deleted() {
1593 return false;
1594 }
1595 } else {
1596 return false;
1597 }
1598 } else {
1599 return false;
1600 }
1601 } else {
1602 return false;
1603 }
1604 drop(versions);
1605
1606 let label_id = self.get_or_create_label_id(label);
1608
1609 let mut node_labels = self.node_labels.write();
1611 let label_set = node_labels.entry(node_id).or_default();
1612
1613 if label_set.contains(&label_id) {
1614 return false; }
1616
1617 label_set.insert(label_id);
1618 drop(node_labels);
1619
1620 let mut index = self.label_index.write();
1622 if (label_id as usize) >= index.len() {
1623 index.resize(label_id as usize + 1, FxHashMap::default());
1624 }
1625 index[label_id as usize].insert(node_id, ());
1626
1627 true
1631 }
1632
1633 #[cfg(not(feature = "tiered-storage"))]
1638 pub fn remove_label(&self, node_id: NodeId, label: &str) -> bool {
1639 let epoch = self.current_epoch();
1640
1641 let nodes = self.nodes.read();
1643 if let Some(chain) = nodes.get(&node_id) {
1644 if chain.visible_at(epoch).map_or(true, |r| r.is_deleted()) {
1645 return false;
1646 }
1647 } else {
1648 return false;
1649 }
1650 drop(nodes);
1651
1652 let label_id = {
1654 let label_ids = self.label_to_id.read();
1655 match label_ids.get(label) {
1656 Some(&id) => id,
1657 None => return false, }
1659 };
1660
1661 let mut node_labels = self.node_labels.write();
1663 if let Some(label_set) = node_labels.get_mut(&node_id) {
1664 if !label_set.remove(&label_id) {
1665 return false; }
1667 } else {
1668 return false;
1669 }
1670 drop(node_labels);
1671
1672 let mut index = self.label_index.write();
1674 if (label_id as usize) < index.len() {
1675 index[label_id as usize].remove(&node_id);
1676 }
1677
1678 if let Some(chain) = self.nodes.write().get_mut(&node_id)
1680 && let Some(record) = chain.latest_mut()
1681 {
1682 let count = self.node_labels.read().get(&node_id).map_or(0, |s| s.len());
1683 record.set_label_count(count as u16);
1684 }
1685
1686 true
1687 }
1688
1689 #[cfg(feature = "tiered-storage")]
1692 pub fn remove_label(&self, node_id: NodeId, label: &str) -> bool {
1693 let epoch = self.current_epoch();
1694
1695 let versions = self.node_versions.read();
1697 if let Some(index) = versions.get(&node_id) {
1698 if let Some(vref) = index.visible_at(epoch) {
1699 if let Some(record) = self.read_node_record(&vref) {
1700 if record.is_deleted() {
1701 return false;
1702 }
1703 } else {
1704 return false;
1705 }
1706 } else {
1707 return false;
1708 }
1709 } else {
1710 return false;
1711 }
1712 drop(versions);
1713
1714 let label_id = {
1716 let label_ids = self.label_to_id.read();
1717 match label_ids.get(label) {
1718 Some(&id) => id,
1719 None => return false, }
1721 };
1722
1723 let mut node_labels = self.node_labels.write();
1725 if let Some(label_set) = node_labels.get_mut(&node_id) {
1726 if !label_set.remove(&label_id) {
1727 return false; }
1729 } else {
1730 return false;
1731 }
1732 drop(node_labels);
1733
1734 let mut index = self.label_index.write();
1736 if (label_id as usize) < index.len() {
1737 index[label_id as usize].remove(&node_id);
1738 }
1739
1740 true
1743 }
1744
1745 #[must_use]
1747 #[cfg(not(feature = "tiered-storage"))]
1748 pub fn node_count(&self) -> usize {
1749 let epoch = self.current_epoch();
1750 self.nodes
1751 .read()
1752 .values()
1753 .filter_map(|chain| chain.visible_at(epoch))
1754 .filter(|r| !r.is_deleted())
1755 .count()
1756 }
1757
1758 #[must_use]
1761 #[cfg(feature = "tiered-storage")]
1762 pub fn node_count(&self) -> usize {
1763 let epoch = self.current_epoch();
1764 let versions = self.node_versions.read();
1765 versions
1766 .iter()
1767 .filter(|(_, index)| {
1768 index.visible_at(epoch).map_or(false, |vref| {
1769 self.read_node_record(&vref)
1770 .map_or(false, |r| !r.is_deleted())
1771 })
1772 })
1773 .count()
1774 }
1775
1776 #[must_use]
1782 #[cfg(not(feature = "tiered-storage"))]
1783 pub fn node_ids(&self) -> Vec<NodeId> {
1784 let epoch = self.current_epoch();
1785 let mut ids: Vec<NodeId> = self
1786 .nodes
1787 .read()
1788 .iter()
1789 .filter_map(|(id, chain)| {
1790 chain
1791 .visible_at(epoch)
1792 .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
1793 })
1794 .collect();
1795 ids.sort_unstable();
1796 ids
1797 }
1798
1799 #[must_use]
1802 #[cfg(feature = "tiered-storage")]
1803 pub fn node_ids(&self) -> Vec<NodeId> {
1804 let epoch = self.current_epoch();
1805 let versions = self.node_versions.read();
1806 let mut ids: Vec<NodeId> = versions
1807 .iter()
1808 .filter_map(|(id, index)| {
1809 index.visible_at(epoch).and_then(|vref| {
1810 self.read_node_record(&vref)
1811 .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
1812 })
1813 })
1814 .collect();
1815 ids.sort_unstable();
1816 ids
1817 }
1818
1819 pub fn create_edge(&self, src: NodeId, dst: NodeId, edge_type: &str) -> EdgeId {
1823 self.needs_stats_recompute.store(true, Ordering::Relaxed);
1824 self.create_edge_versioned(src, dst, edge_type, self.current_epoch(), TxId::SYSTEM)
1825 }
1826
1827 #[cfg(not(feature = "tiered-storage"))]
1829 pub fn create_edge_versioned(
1830 &self,
1831 src: NodeId,
1832 dst: NodeId,
1833 edge_type: &str,
1834 epoch: EpochId,
1835 tx_id: TxId,
1836 ) -> EdgeId {
1837 let id = EdgeId::new(self.next_edge_id.fetch_add(1, Ordering::Relaxed));
1838 let type_id = self.get_or_create_edge_type_id(edge_type);
1839
1840 let record = EdgeRecord::new(id, src, dst, type_id, epoch);
1841 let chain = VersionChain::with_initial(record, epoch, tx_id);
1842 self.edges.write().insert(id, chain);
1843
1844 self.forward_adj.add_edge(src, dst, id);
1846 if let Some(ref backward) = self.backward_adj {
1847 backward.add_edge(dst, src, id);
1848 }
1849
1850 id
1851 }
1852
1853 #[cfg(feature = "tiered-storage")]
1856 pub fn create_edge_versioned(
1857 &self,
1858 src: NodeId,
1859 dst: NodeId,
1860 edge_type: &str,
1861 epoch: EpochId,
1862 tx_id: TxId,
1863 ) -> EdgeId {
1864 let id = EdgeId::new(self.next_edge_id.fetch_add(1, Ordering::Relaxed));
1865 let type_id = self.get_or_create_edge_type_id(edge_type);
1866
1867 let record = EdgeRecord::new(id, src, dst, type_id, epoch);
1868
1869 let arena = self.arena_allocator.arena_or_create(epoch);
1871 let (offset, _stored) = arena.alloc_value_with_offset(record);
1872
1873 let hot_ref = HotVersionRef::new(epoch, offset, tx_id);
1875
1876 let mut versions = self.edge_versions.write();
1878 if let Some(index) = versions.get_mut(&id) {
1879 index.add_hot(hot_ref);
1880 } else {
1881 versions.insert(id, VersionIndex::with_initial(hot_ref));
1882 }
1883
1884 self.forward_adj.add_edge(src, dst, id);
1886 if let Some(ref backward) = self.backward_adj {
1887 backward.add_edge(dst, src, id);
1888 }
1889
1890 id
1891 }
1892
1893 pub fn create_edge_with_props(
1895 &self,
1896 src: NodeId,
1897 dst: NodeId,
1898 edge_type: &str,
1899 properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
1900 ) -> EdgeId {
1901 let id = self.create_edge(src, dst, edge_type);
1902
1903 for (key, value) in properties {
1904 self.edge_properties.set(id, key.into(), value.into());
1905 }
1906
1907 id
1908 }
1909
1910 #[must_use]
1912 pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
1913 self.get_edge_at_epoch(id, self.current_epoch())
1914 }
1915
1916 #[must_use]
1918 #[cfg(not(feature = "tiered-storage"))]
1919 pub fn get_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> Option<Edge> {
1920 let edges = self.edges.read();
1921 let chain = edges.get(&id)?;
1922 let record = chain.visible_at(epoch)?;
1923
1924 if record.is_deleted() {
1925 return None;
1926 }
1927
1928 let edge_type = {
1929 let id_to_type = self.id_to_edge_type.read();
1930 id_to_type.get(record.type_id as usize)?.clone()
1931 };
1932
1933 let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1934
1935 edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1937
1938 Some(edge)
1939 }
1940
1941 #[must_use]
1944 #[cfg(feature = "tiered-storage")]
1945 pub fn get_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> Option<Edge> {
1946 let versions = self.edge_versions.read();
1947 let index = versions.get(&id)?;
1948 let version_ref = index.visible_at(epoch)?;
1949
1950 let record = self.read_edge_record(&version_ref)?;
1951
1952 if record.is_deleted() {
1953 return None;
1954 }
1955
1956 let edge_type = {
1957 let id_to_type = self.id_to_edge_type.read();
1958 id_to_type.get(record.type_id as usize)?.clone()
1959 };
1960
1961 let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1962
1963 edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1965
1966 Some(edge)
1967 }
1968
1969 #[must_use]
1971 #[cfg(not(feature = "tiered-storage"))]
1972 pub fn get_edge_versioned(&self, id: EdgeId, epoch: EpochId, tx_id: TxId) -> Option<Edge> {
1973 let edges = self.edges.read();
1974 let chain = edges.get(&id)?;
1975 let record = chain.visible_to(epoch, tx_id)?;
1976
1977 if record.is_deleted() {
1978 return None;
1979 }
1980
1981 let edge_type = {
1982 let id_to_type = self.id_to_edge_type.read();
1983 id_to_type.get(record.type_id as usize)?.clone()
1984 };
1985
1986 let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1987
1988 edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1990
1991 Some(edge)
1992 }
1993
1994 #[must_use]
1997 #[cfg(feature = "tiered-storage")]
1998 pub fn get_edge_versioned(&self, id: EdgeId, epoch: EpochId, tx_id: TxId) -> Option<Edge> {
1999 let versions = self.edge_versions.read();
2000 let index = versions.get(&id)?;
2001 let version_ref = index.visible_to(epoch, tx_id)?;
2002
2003 let record = self.read_edge_record(&version_ref)?;
2004
2005 if record.is_deleted() {
2006 return None;
2007 }
2008
2009 let edge_type = {
2010 let id_to_type = self.id_to_edge_type.read();
2011 id_to_type.get(record.type_id as usize)?.clone()
2012 };
2013
2014 let mut edge = Edge::new(id, record.src, record.dst, edge_type);
2015
2016 edge.properties = self.edge_properties.get_all(id).into_iter().collect();
2018
2019 Some(edge)
2020 }
2021
2022 #[cfg(feature = "tiered-storage")]
2024 #[allow(unsafe_code)]
2025 fn read_edge_record(&self, version_ref: &VersionRef) -> Option<EdgeRecord> {
2026 match version_ref {
2027 VersionRef::Hot(hot_ref) => {
2028 let arena = self.arena_allocator.arena(hot_ref.epoch);
2029 let record: &EdgeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
2031 Some(*record)
2032 }
2033 VersionRef::Cold(cold_ref) => {
2034 self.epoch_store
2036 .get_edge(cold_ref.epoch, cold_ref.block_offset, cold_ref.length)
2037 }
2038 }
2039 }
2040
2041 pub fn delete_edge(&self, id: EdgeId) -> bool {
2043 self.needs_stats_recompute.store(true, Ordering::Relaxed);
2044 self.delete_edge_at_epoch(id, self.current_epoch())
2045 }
2046
2047 #[cfg(not(feature = "tiered-storage"))]
2049 pub fn delete_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> bool {
2050 let mut edges = self.edges.write();
2051 if let Some(chain) = edges.get_mut(&id) {
2052 let (src, dst) = {
2054 match chain.visible_at(epoch) {
2055 Some(record) => {
2056 if record.is_deleted() {
2057 return false;
2058 }
2059 (record.src, record.dst)
2060 }
2061 None => return false, }
2063 };
2064
2065 chain.mark_deleted(epoch);
2067
2068 drop(edges); self.forward_adj.mark_deleted(src, id);
2072 if let Some(ref backward) = self.backward_adj {
2073 backward.mark_deleted(dst, id);
2074 }
2075
2076 self.edge_properties.remove_all(id);
2078
2079 true
2080 } else {
2081 false
2082 }
2083 }
2084
2085 #[cfg(feature = "tiered-storage")]
2088 pub fn delete_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> bool {
2089 let mut versions = self.edge_versions.write();
2090 if let Some(index) = versions.get_mut(&id) {
2091 let (src, dst) = {
2093 match index.visible_at(epoch) {
2094 Some(version_ref) => {
2095 if let Some(record) = self.read_edge_record(&version_ref) {
2096 if record.is_deleted() {
2097 return false;
2098 }
2099 (record.src, record.dst)
2100 } else {
2101 return false;
2102 }
2103 }
2104 None => return false,
2105 }
2106 };
2107
2108 index.mark_deleted(epoch);
2110
2111 drop(versions); self.forward_adj.mark_deleted(src, id);
2115 if let Some(ref backward) = self.backward_adj {
2116 backward.mark_deleted(dst, id);
2117 }
2118
2119 self.edge_properties.remove_all(id);
2121
2122 true
2123 } else {
2124 false
2125 }
2126 }
2127
2128 #[must_use]
2130 #[cfg(not(feature = "tiered-storage"))]
2131 pub fn edge_count(&self) -> usize {
2132 let epoch = self.current_epoch();
2133 self.edges
2134 .read()
2135 .values()
2136 .filter_map(|chain| chain.visible_at(epoch))
2137 .filter(|r| !r.is_deleted())
2138 .count()
2139 }
2140
2141 #[must_use]
2144 #[cfg(feature = "tiered-storage")]
2145 pub fn edge_count(&self) -> usize {
2146 let epoch = self.current_epoch();
2147 let versions = self.edge_versions.read();
2148 versions
2149 .iter()
2150 .filter(|(_, index)| {
2151 index.visible_at(epoch).map_or(false, |vref| {
2152 self.read_edge_record(&vref)
2153 .map_or(false, |r| !r.is_deleted())
2154 })
2155 })
2156 .count()
2157 }
2158
2159 #[cfg(not(feature = "tiered-storage"))]
2164 pub fn discard_uncommitted_versions(&self, tx_id: TxId) {
2165 {
2167 let mut nodes = self.nodes.write();
2168 for chain in nodes.values_mut() {
2169 chain.remove_versions_by(tx_id);
2170 }
2171 nodes.retain(|_, chain| !chain.is_empty());
2173 }
2174
2175 {
2177 let mut edges = self.edges.write();
2178 for chain in edges.values_mut() {
2179 chain.remove_versions_by(tx_id);
2180 }
2181 edges.retain(|_, chain| !chain.is_empty());
2183 }
2184 }
2185
2186 #[cfg(feature = "tiered-storage")]
2189 pub fn discard_uncommitted_versions(&self, tx_id: TxId) {
2190 {
2192 let mut versions = self.node_versions.write();
2193 for index in versions.values_mut() {
2194 index.remove_versions_by(tx_id);
2195 }
2196 versions.retain(|_, index| !index.is_empty());
2198 }
2199
2200 {
2202 let mut versions = self.edge_versions.write();
2203 for index in versions.values_mut() {
2204 index.remove_versions_by(tx_id);
2205 }
2206 versions.retain(|_, index| !index.is_empty());
2208 }
2209 }
2210
2211 #[cfg(not(feature = "tiered-storage"))]
2216 pub fn gc_versions(&self, min_epoch: EpochId) {
2217 {
2218 let mut nodes = self.nodes.write();
2219 for chain in nodes.values_mut() {
2220 chain.gc(min_epoch);
2221 }
2222 nodes.retain(|_, chain| !chain.is_empty());
2223 }
2224 {
2225 let mut edges = self.edges.write();
2226 for chain in edges.values_mut() {
2227 chain.gc(min_epoch);
2228 }
2229 edges.retain(|_, chain| !chain.is_empty());
2230 }
2231 }
2232
2233 #[cfg(feature = "tiered-storage")]
2235 pub fn gc_versions(&self, min_epoch: EpochId) {
2236 {
2237 let mut versions = self.node_versions.write();
2238 for index in versions.values_mut() {
2239 index.gc(min_epoch);
2240 }
2241 versions.retain(|_, index| !index.is_empty());
2242 }
2243 {
2244 let mut versions = self.edge_versions.write();
2245 for index in versions.values_mut() {
2246 index.gc(min_epoch);
2247 }
2248 versions.retain(|_, index| !index.is_empty());
2249 }
2250 }
2251
2252 #[cfg(feature = "tiered-storage")]
2271 #[allow(unsafe_code)]
2272 pub fn freeze_epoch(&self, epoch: EpochId) -> usize {
2273 let mut node_records: Vec<(u64, NodeRecord)> = Vec::new();
2275 let mut node_hot_refs: Vec<(NodeId, HotVersionRef)> = Vec::new();
2276
2277 {
2278 let versions = self.node_versions.read();
2279 for (node_id, index) in versions.iter() {
2280 for hot_ref in index.hot_refs_for_epoch(epoch) {
2281 let arena = self.arena_allocator.arena(hot_ref.epoch);
2282 let record: &NodeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
2284 node_records.push((node_id.as_u64(), *record));
2285 node_hot_refs.push((*node_id, *hot_ref));
2286 }
2287 }
2288 }
2289
2290 let mut edge_records: Vec<(u64, EdgeRecord)> = Vec::new();
2292 let mut edge_hot_refs: Vec<(EdgeId, HotVersionRef)> = Vec::new();
2293
2294 {
2295 let versions = self.edge_versions.read();
2296 for (edge_id, index) in versions.iter() {
2297 for hot_ref in index.hot_refs_for_epoch(epoch) {
2298 let arena = self.arena_allocator.arena(hot_ref.epoch);
2299 let record: &EdgeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
2301 edge_records.push((edge_id.as_u64(), *record));
2302 edge_hot_refs.push((*edge_id, *hot_ref));
2303 }
2304 }
2305 }
2306
2307 let total_frozen = node_records.len() + edge_records.len();
2308
2309 if total_frozen == 0 {
2310 return 0;
2311 }
2312
2313 let (node_entries, edge_entries) =
2315 self.epoch_store
2316 .freeze_epoch(epoch, node_records, edge_records);
2317
2318 let node_entry_map: FxHashMap<u64, _> = node_entries
2320 .iter()
2321 .map(|e| (e.entity_id, (e.offset, e.length)))
2322 .collect();
2323 let edge_entry_map: FxHashMap<u64, _> = edge_entries
2324 .iter()
2325 .map(|e| (e.entity_id, (e.offset, e.length)))
2326 .collect();
2327
2328 {
2330 let mut versions = self.node_versions.write();
2331 for (node_id, hot_ref) in &node_hot_refs {
2332 if let Some(index) = versions.get_mut(node_id)
2333 && let Some(&(offset, length)) = node_entry_map.get(&node_id.as_u64())
2334 {
2335 let cold_ref = ColdVersionRef {
2336 epoch,
2337 block_offset: offset,
2338 length,
2339 created_by: hot_ref.created_by,
2340 deleted_epoch: hot_ref.deleted_epoch,
2341 };
2342 index.freeze_epoch(epoch, std::iter::once(cold_ref));
2343 }
2344 }
2345 }
2346
2347 {
2348 let mut versions = self.edge_versions.write();
2349 for (edge_id, hot_ref) in &edge_hot_refs {
2350 if let Some(index) = versions.get_mut(edge_id)
2351 && let Some(&(offset, length)) = edge_entry_map.get(&edge_id.as_u64())
2352 {
2353 let cold_ref = ColdVersionRef {
2354 epoch,
2355 block_offset: offset,
2356 length,
2357 created_by: hot_ref.created_by,
2358 deleted_epoch: hot_ref.deleted_epoch,
2359 };
2360 index.freeze_epoch(epoch, std::iter::once(cold_ref));
2361 }
2362 }
2363 }
2364
2365 total_frozen
2366 }
2367
2368 #[cfg(feature = "tiered-storage")]
2370 #[must_use]
2371 pub fn epoch_store(&self) -> &EpochStore {
2372 &self.epoch_store
2373 }
2374
2375 #[must_use]
2377 pub fn label_count(&self) -> usize {
2378 self.id_to_label.read().len()
2379 }
2380
2381 #[must_use]
2385 pub fn property_key_count(&self) -> usize {
2386 let node_keys = self.node_properties.column_count();
2387 let edge_keys = self.edge_properties.column_count();
2388 node_keys + edge_keys
2392 }
2393
2394 #[must_use]
2396 pub fn edge_type_count(&self) -> usize {
2397 self.id_to_edge_type.read().len()
2398 }
2399
2400 pub fn neighbors(
2407 &self,
2408 node: NodeId,
2409 direction: Direction,
2410 ) -> impl Iterator<Item = NodeId> + '_ {
2411 let forward: Box<dyn Iterator<Item = NodeId>> = match direction {
2412 Direction::Outgoing | Direction::Both => {
2413 Box::new(self.forward_adj.neighbors(node).into_iter())
2414 }
2415 Direction::Incoming => Box::new(std::iter::empty()),
2416 };
2417
2418 let backward: Box<dyn Iterator<Item = NodeId>> = match direction {
2419 Direction::Incoming | Direction::Both => {
2420 if let Some(ref adj) = self.backward_adj {
2421 Box::new(adj.neighbors(node).into_iter())
2422 } else {
2423 Box::new(std::iter::empty())
2424 }
2425 }
2426 Direction::Outgoing => Box::new(std::iter::empty()),
2427 };
2428
2429 forward.chain(backward)
2430 }
2431
2432 pub fn edges_from(
2436 &self,
2437 node: NodeId,
2438 direction: Direction,
2439 ) -> impl Iterator<Item = (NodeId, EdgeId)> + '_ {
2440 let forward: Box<dyn Iterator<Item = (NodeId, EdgeId)>> = match direction {
2441 Direction::Outgoing | Direction::Both => {
2442 Box::new(self.forward_adj.edges_from(node).into_iter())
2443 }
2444 Direction::Incoming => Box::new(std::iter::empty()),
2445 };
2446
2447 let backward: Box<dyn Iterator<Item = (NodeId, EdgeId)>> = match direction {
2448 Direction::Incoming | Direction::Both => {
2449 if let Some(ref adj) = self.backward_adj {
2450 Box::new(adj.edges_from(node).into_iter())
2451 } else {
2452 Box::new(std::iter::empty())
2453 }
2454 }
2455 Direction::Outgoing => Box::new(std::iter::empty()),
2456 };
2457
2458 forward.chain(backward)
2459 }
2460
2461 pub fn edges_to(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
2474 if let Some(ref backward) = self.backward_adj {
2475 backward.edges_from(node)
2476 } else {
2477 self.all_edges()
2479 .filter_map(|edge| {
2480 if edge.dst == node {
2481 Some((edge.src, edge.id))
2482 } else {
2483 None
2484 }
2485 })
2486 .collect()
2487 }
2488 }
2489
2490 #[must_use]
2494 pub fn out_degree(&self, node: NodeId) -> usize {
2495 self.forward_adj.out_degree(node)
2496 }
2497
2498 #[must_use]
2503 pub fn in_degree(&self, node: NodeId) -> usize {
2504 if let Some(ref backward) = self.backward_adj {
2505 backward.in_degree(node)
2506 } else {
2507 self.all_edges().filter(|edge| edge.dst == node).count()
2509 }
2510 }
2511
2512 #[must_use]
2514 #[cfg(not(feature = "tiered-storage"))]
2515 pub fn edge_type(&self, id: EdgeId) -> Option<ArcStr> {
2516 let edges = self.edges.read();
2517 let chain = edges.get(&id)?;
2518 let epoch = self.current_epoch();
2519 let record = chain.visible_at(epoch)?;
2520 let id_to_type = self.id_to_edge_type.read();
2521 id_to_type.get(record.type_id as usize).cloned()
2522 }
2523
2524 #[must_use]
2527 #[cfg(feature = "tiered-storage")]
2528 pub fn edge_type(&self, id: EdgeId) -> Option<ArcStr> {
2529 let versions = self.edge_versions.read();
2530 let index = versions.get(&id)?;
2531 let epoch = self.current_epoch();
2532 let vref = index.visible_at(epoch)?;
2533 let record = self.read_edge_record(&vref)?;
2534 let id_to_type = self.id_to_edge_type.read();
2535 id_to_type.get(record.type_id as usize).cloned()
2536 }
2537
2538 pub fn nodes_by_label(&self, label: &str) -> Vec<NodeId> {
2544 let label_to_id = self.label_to_id.read();
2545 if let Some(&label_id) = label_to_id.get(label) {
2546 let index = self.label_index.read();
2547 if let Some(set) = index.get(label_id as usize) {
2548 let mut ids: Vec<NodeId> = set.keys().copied().collect();
2549 ids.sort_unstable();
2550 return ids;
2551 }
2552 }
2553 Vec::new()
2554 }
2555
2556 #[cfg(not(feature = "tiered-storage"))]
2563 pub fn all_nodes(&self) -> impl Iterator<Item = Node> + '_ {
2564 let epoch = self.current_epoch();
2565 let node_ids: Vec<NodeId> = self
2566 .nodes
2567 .read()
2568 .iter()
2569 .filter_map(|(id, chain)| {
2570 chain
2571 .visible_at(epoch)
2572 .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
2573 })
2574 .collect();
2575
2576 node_ids.into_iter().filter_map(move |id| self.get_node(id))
2577 }
2578
2579 #[cfg(feature = "tiered-storage")]
2582 pub fn all_nodes(&self) -> impl Iterator<Item = Node> + '_ {
2583 let node_ids = self.node_ids();
2584 node_ids.into_iter().filter_map(move |id| self.get_node(id))
2585 }
2586
2587 #[cfg(not(feature = "tiered-storage"))]
2592 pub fn all_edges(&self) -> impl Iterator<Item = Edge> + '_ {
2593 let epoch = self.current_epoch();
2594 let edge_ids: Vec<EdgeId> = self
2595 .edges
2596 .read()
2597 .iter()
2598 .filter_map(|(id, chain)| {
2599 chain
2600 .visible_at(epoch)
2601 .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
2602 })
2603 .collect();
2604
2605 edge_ids.into_iter().filter_map(move |id| self.get_edge(id))
2606 }
2607
2608 #[cfg(feature = "tiered-storage")]
2611 pub fn all_edges(&self) -> impl Iterator<Item = Edge> + '_ {
2612 let epoch = self.current_epoch();
2613 let versions = self.edge_versions.read();
2614 let edge_ids: Vec<EdgeId> = versions
2615 .iter()
2616 .filter_map(|(id, index)| {
2617 index.visible_at(epoch).and_then(|vref| {
2618 self.read_edge_record(&vref)
2619 .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
2620 })
2621 })
2622 .collect();
2623
2624 edge_ids.into_iter().filter_map(move |id| self.get_edge(id))
2625 }
2626
2627 pub fn all_labels(&self) -> Vec<String> {
2629 self.id_to_label
2630 .read()
2631 .iter()
2632 .map(|s| s.to_string())
2633 .collect()
2634 }
2635
2636 pub fn all_edge_types(&self) -> Vec<String> {
2638 self.id_to_edge_type
2639 .read()
2640 .iter()
2641 .map(|s| s.to_string())
2642 .collect()
2643 }
2644
2645 pub fn all_property_keys(&self) -> Vec<String> {
2647 let mut keys = std::collections::HashSet::new();
2648 for key in self.node_properties.keys() {
2649 keys.insert(key.to_string());
2650 }
2651 for key in self.edge_properties.keys() {
2652 keys.insert(key.to_string());
2653 }
2654 keys.into_iter().collect()
2655 }
2656
2657 pub fn nodes_with_label<'a>(&'a self, label: &str) -> impl Iterator<Item = Node> + 'a {
2659 let node_ids = self.nodes_by_label(label);
2660 node_ids.into_iter().filter_map(move |id| self.get_node(id))
2661 }
2662
2663 #[cfg(not(feature = "tiered-storage"))]
2665 pub fn edges_with_type<'a>(&'a self, edge_type: &str) -> impl Iterator<Item = Edge> + 'a {
2666 let epoch = self.current_epoch();
2667 let type_to_id = self.edge_type_to_id.read();
2668
2669 if let Some(&type_id) = type_to_id.get(edge_type) {
2670 let edge_ids: Vec<EdgeId> = self
2671 .edges
2672 .read()
2673 .iter()
2674 .filter_map(|(id, chain)| {
2675 chain.visible_at(epoch).and_then(|r| {
2676 if !r.is_deleted() && r.type_id == type_id {
2677 Some(*id)
2678 } else {
2679 None
2680 }
2681 })
2682 })
2683 .collect();
2684
2685 Box::new(edge_ids.into_iter().filter_map(move |id| self.get_edge(id)))
2687 as Box<dyn Iterator<Item = Edge> + 'a>
2688 } else {
2689 Box::new(std::iter::empty()) as Box<dyn Iterator<Item = Edge> + 'a>
2691 }
2692 }
2693
2694 #[cfg(feature = "tiered-storage")]
2697 pub fn edges_with_type<'a>(&'a self, edge_type: &str) -> impl Iterator<Item = Edge> + 'a {
2698 let epoch = self.current_epoch();
2699 let type_to_id = self.edge_type_to_id.read();
2700
2701 if let Some(&type_id) = type_to_id.get(edge_type) {
2702 let versions = self.edge_versions.read();
2703 let edge_ids: Vec<EdgeId> = versions
2704 .iter()
2705 .filter_map(|(id, index)| {
2706 index.visible_at(epoch).and_then(|vref| {
2707 self.read_edge_record(&vref).and_then(|r| {
2708 if !r.is_deleted() && r.type_id == type_id {
2709 Some(*id)
2710 } else {
2711 None
2712 }
2713 })
2714 })
2715 })
2716 .collect();
2717
2718 Box::new(edge_ids.into_iter().filter_map(move |id| self.get_edge(id)))
2719 as Box<dyn Iterator<Item = Edge> + 'a>
2720 } else {
2721 Box::new(std::iter::empty()) as Box<dyn Iterator<Item = Edge> + 'a>
2722 }
2723 }
2724
2725 #[must_use]
2732 pub fn node_property_might_match(
2733 &self,
2734 property: &PropertyKey,
2735 op: CompareOp,
2736 value: &Value,
2737 ) -> bool {
2738 self.node_properties.might_match(property, op, value)
2739 }
2740
2741 #[must_use]
2743 pub fn edge_property_might_match(
2744 &self,
2745 property: &PropertyKey,
2746 op: CompareOp,
2747 value: &Value,
2748 ) -> bool {
2749 self.edge_properties.might_match(property, op, value)
2750 }
2751
2752 #[must_use]
2754 pub fn node_property_zone_map(&self, property: &PropertyKey) -> Option<ZoneMapEntry> {
2755 self.node_properties.zone_map(property)
2756 }
2757
2758 #[must_use]
2760 pub fn edge_property_zone_map(&self, property: &PropertyKey) -> Option<ZoneMapEntry> {
2761 self.edge_properties.zone_map(property)
2762 }
2763
2764 pub fn rebuild_zone_maps(&self) {
2766 self.node_properties.rebuild_zone_maps();
2767 self.edge_properties.rebuild_zone_maps();
2768 }
2769
2770 #[must_use]
2774 pub fn statistics(&self) -> Statistics {
2775 self.statistics.read().clone()
2776 }
2777
2778 pub fn ensure_statistics_fresh(&self) {
2783 if self.needs_stats_recompute.swap(false, Ordering::Relaxed) {
2784 self.compute_statistics();
2785 }
2786 }
2787
2788 #[cfg(not(feature = "tiered-storage"))]
2793 pub fn compute_statistics(&self) {
2794 let mut stats = Statistics::new();
2795
2796 stats.total_nodes = self.node_count() as u64;
2798 stats.total_edges = self.edge_count() as u64;
2799
2800 let id_to_label = self.id_to_label.read();
2802 let label_index = self.label_index.read();
2803
2804 for (label_id, label_name) in id_to_label.iter().enumerate() {
2805 let node_count = label_index.get(label_id).map_or(0, |set| set.len() as u64);
2806
2807 if node_count > 0 {
2808 let avg_out_degree = if stats.total_nodes > 0 {
2810 stats.total_edges as f64 / stats.total_nodes as f64
2811 } else {
2812 0.0
2813 };
2814
2815 let label_stats =
2816 LabelStatistics::new(node_count).with_degrees(avg_out_degree, avg_out_degree);
2817
2818 stats.update_label(label_name.as_ref(), label_stats);
2819 }
2820 }
2821
2822 let id_to_edge_type = self.id_to_edge_type.read();
2824 let edges = self.edges.read();
2825 let epoch = self.current_epoch();
2826
2827 let mut edge_type_counts: FxHashMap<u32, u64> = FxHashMap::default();
2828 for chain in edges.values() {
2829 if let Some(record) = chain.visible_at(epoch)
2830 && !record.is_deleted()
2831 {
2832 *edge_type_counts.entry(record.type_id).or_default() += 1;
2833 }
2834 }
2835
2836 for (type_id, count) in edge_type_counts {
2837 if let Some(type_name) = id_to_edge_type.get(type_id as usize) {
2838 let avg_degree = if stats.total_nodes > 0 {
2839 count as f64 / stats.total_nodes as f64
2840 } else {
2841 0.0
2842 };
2843
2844 let edge_stats = EdgeTypeStatistics::new(count, avg_degree, avg_degree);
2845 stats.update_edge_type(type_name.as_ref(), edge_stats);
2846 }
2847 }
2848
2849 *self.statistics.write() = stats;
2850 }
2851
2852 #[cfg(feature = "tiered-storage")]
2855 pub fn compute_statistics(&self) {
2856 let mut stats = Statistics::new();
2857
2858 stats.total_nodes = self.node_count() as u64;
2860 stats.total_edges = self.edge_count() as u64;
2861
2862 let id_to_label = self.id_to_label.read();
2864 let label_index = self.label_index.read();
2865
2866 for (label_id, label_name) in id_to_label.iter().enumerate() {
2867 let node_count = label_index.get(label_id).map_or(0, |set| set.len() as u64);
2868
2869 if node_count > 0 {
2870 let avg_out_degree = if stats.total_nodes > 0 {
2871 stats.total_edges as f64 / stats.total_nodes as f64
2872 } else {
2873 0.0
2874 };
2875
2876 let label_stats =
2877 LabelStatistics::new(node_count).with_degrees(avg_out_degree, avg_out_degree);
2878
2879 stats.update_label(label_name.as_ref(), label_stats);
2880 }
2881 }
2882
2883 let id_to_edge_type = self.id_to_edge_type.read();
2885 let versions = self.edge_versions.read();
2886 let epoch = self.current_epoch();
2887
2888 let mut edge_type_counts: FxHashMap<u32, u64> = FxHashMap::default();
2889 for index in versions.values() {
2890 if let Some(vref) = index.visible_at(epoch)
2891 && let Some(record) = self.read_edge_record(&vref)
2892 && !record.is_deleted()
2893 {
2894 *edge_type_counts.entry(record.type_id).or_default() += 1;
2895 }
2896 }
2897
2898 for (type_id, count) in edge_type_counts {
2899 if let Some(type_name) = id_to_edge_type.get(type_id as usize) {
2900 let avg_degree = if stats.total_nodes > 0 {
2901 count as f64 / stats.total_nodes as f64
2902 } else {
2903 0.0
2904 };
2905
2906 let edge_stats = EdgeTypeStatistics::new(count, avg_degree, avg_degree);
2907 stats.update_edge_type(type_name.as_ref(), edge_stats);
2908 }
2909 }
2910
2911 *self.statistics.write() = stats;
2912 }
2913
2914 #[must_use]
2916 pub fn estimate_label_cardinality(&self, label: &str) -> f64 {
2917 self.statistics.read().estimate_label_cardinality(label)
2918 }
2919
2920 #[must_use]
2922 pub fn estimate_avg_degree(&self, edge_type: &str, outgoing: bool) -> f64 {
2923 self.statistics
2924 .read()
2925 .estimate_avg_degree(edge_type, outgoing)
2926 }
2927
2928 fn get_or_create_label_id(&self, label: &str) -> u32 {
2931 {
2932 let label_to_id = self.label_to_id.read();
2933 if let Some(&id) = label_to_id.get(label) {
2934 return id;
2935 }
2936 }
2937
2938 let mut label_to_id = self.label_to_id.write();
2939 let mut id_to_label = self.id_to_label.write();
2940
2941 if let Some(&id) = label_to_id.get(label) {
2943 return id;
2944 }
2945
2946 let id = id_to_label.len() as u32;
2947
2948 let label: ArcStr = label.into();
2949 label_to_id.insert(label.clone(), id);
2950 id_to_label.push(label);
2951
2952 id
2953 }
2954
2955 fn get_or_create_edge_type_id(&self, edge_type: &str) -> u32 {
2956 {
2957 let type_to_id = self.edge_type_to_id.read();
2958 if let Some(&id) = type_to_id.get(edge_type) {
2959 return id;
2960 }
2961 }
2962
2963 let mut type_to_id = self.edge_type_to_id.write();
2964 let mut id_to_type = self.id_to_edge_type.write();
2965
2966 if let Some(&id) = type_to_id.get(edge_type) {
2968 return id;
2969 }
2970
2971 let id = id_to_type.len() as u32;
2972 let edge_type: ArcStr = edge_type.into();
2973 type_to_id.insert(edge_type.clone(), id);
2974 id_to_type.push(edge_type);
2975
2976 id
2977 }
2978
2979 #[cfg(not(feature = "tiered-storage"))]
2986 pub fn create_node_with_id(&self, id: NodeId, labels: &[&str]) {
2987 let epoch = self.current_epoch();
2988 let mut record = NodeRecord::new(id, epoch);
2989 record.set_label_count(labels.len() as u16);
2990
2991 let mut node_label_set = FxHashSet::default();
2993 for label in labels {
2994 let label_id = self.get_or_create_label_id(*label);
2995 node_label_set.insert(label_id);
2996
2997 let mut index = self.label_index.write();
2999 while index.len() <= label_id as usize {
3000 index.push(FxHashMap::default());
3001 }
3002 index[label_id as usize].insert(id, ());
3003 }
3004
3005 self.node_labels.write().insert(id, node_label_set);
3007
3008 let chain = VersionChain::with_initial(record, epoch, TxId::SYSTEM);
3010 self.nodes.write().insert(id, chain);
3011
3012 let id_val = id.as_u64();
3014 let _ = self
3015 .next_node_id
3016 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
3017 if id_val >= current {
3018 Some(id_val + 1)
3019 } else {
3020 None
3021 }
3022 });
3023 }
3024
3025 #[cfg(feature = "tiered-storage")]
3028 pub fn create_node_with_id(&self, id: NodeId, labels: &[&str]) {
3029 let epoch = self.current_epoch();
3030 let mut record = NodeRecord::new(id, epoch);
3031 record.set_label_count(labels.len() as u16);
3032
3033 let mut node_label_set = FxHashSet::default();
3035 for label in labels {
3036 let label_id = self.get_or_create_label_id(*label);
3037 node_label_set.insert(label_id);
3038
3039 let mut index = self.label_index.write();
3041 while index.len() <= label_id as usize {
3042 index.push(FxHashMap::default());
3043 }
3044 index[label_id as usize].insert(id, ());
3045 }
3046
3047 self.node_labels.write().insert(id, node_label_set);
3049
3050 let arena = self.arena_allocator.arena_or_create(epoch);
3052 let (offset, _stored) = arena.alloc_value_with_offset(record);
3053
3054 let hot_ref = HotVersionRef::new(epoch, offset, TxId::SYSTEM);
3056 let mut versions = self.node_versions.write();
3057 versions.insert(id, VersionIndex::with_initial(hot_ref));
3058
3059 let id_val = id.as_u64();
3061 let _ = self
3062 .next_node_id
3063 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
3064 if id_val >= current {
3065 Some(id_val + 1)
3066 } else {
3067 None
3068 }
3069 });
3070 }
3071
3072 #[cfg(not(feature = "tiered-storage"))]
3076 pub fn create_edge_with_id(&self, id: EdgeId, src: NodeId, dst: NodeId, edge_type: &str) {
3077 let epoch = self.current_epoch();
3078 let type_id = self.get_or_create_edge_type_id(edge_type);
3079
3080 let record = EdgeRecord::new(id, src, dst, type_id, epoch);
3081 let chain = VersionChain::with_initial(record, epoch, TxId::SYSTEM);
3082 self.edges.write().insert(id, chain);
3083
3084 self.forward_adj.add_edge(src, dst, id);
3086 if let Some(ref backward) = self.backward_adj {
3087 backward.add_edge(dst, src, id);
3088 }
3089
3090 let id_val = id.as_u64();
3092 let _ = self
3093 .next_edge_id
3094 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
3095 if id_val >= current {
3096 Some(id_val + 1)
3097 } else {
3098 None
3099 }
3100 });
3101 }
3102
3103 #[cfg(feature = "tiered-storage")]
3106 pub fn create_edge_with_id(&self, id: EdgeId, src: NodeId, dst: NodeId, edge_type: &str) {
3107 let epoch = self.current_epoch();
3108 let type_id = self.get_or_create_edge_type_id(edge_type);
3109
3110 let record = EdgeRecord::new(id, src, dst, type_id, epoch);
3111
3112 let arena = self.arena_allocator.arena_or_create(epoch);
3114 let (offset, _stored) = arena.alloc_value_with_offset(record);
3115
3116 let hot_ref = HotVersionRef::new(epoch, offset, TxId::SYSTEM);
3118 let mut versions = self.edge_versions.write();
3119 versions.insert(id, VersionIndex::with_initial(hot_ref));
3120
3121 self.forward_adj.add_edge(src, dst, id);
3123 if let Some(ref backward) = self.backward_adj {
3124 backward.add_edge(dst, src, id);
3125 }
3126
3127 let id_val = id.as_u64();
3129 let _ = self
3130 .next_edge_id
3131 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
3132 if id_val >= current {
3133 Some(id_val + 1)
3134 } else {
3135 None
3136 }
3137 });
3138 }
3139
3140 pub fn set_epoch(&self, epoch: EpochId) {
3142 self.current_epoch.store(epoch.as_u64(), Ordering::SeqCst);
3143 }
3144}
3145
3146impl Default for LpgStore {
3147 fn default() -> Self {
3148 Self::new()
3149 }
3150}
3151
3152#[cfg(test)]
3153mod tests {
3154 use super::*;
3155
3156 #[test]
3157 fn test_create_node() {
3158 let store = LpgStore::new();
3159
3160 let id = store.create_node(&["Person"]);
3161 assert!(id.is_valid());
3162
3163 let node = store.get_node(id).unwrap();
3164 assert!(node.has_label("Person"));
3165 assert!(!node.has_label("Animal"));
3166 }
3167
3168 #[test]
3169 fn test_create_node_with_props() {
3170 let store = LpgStore::new();
3171
3172 let id = store.create_node_with_props(
3173 &["Person"],
3174 [("name", Value::from("Alice")), ("age", Value::from(30i64))],
3175 );
3176
3177 let node = store.get_node(id).unwrap();
3178 assert_eq!(
3179 node.get_property("name").and_then(|v| v.as_str()),
3180 Some("Alice")
3181 );
3182 assert_eq!(
3183 node.get_property("age").and_then(|v| v.as_int64()),
3184 Some(30)
3185 );
3186 }
3187
3188 #[test]
3189 fn test_delete_node() {
3190 let store = LpgStore::new();
3191
3192 let id = store.create_node(&["Person"]);
3193 assert_eq!(store.node_count(), 1);
3194
3195 assert!(store.delete_node(id));
3196 assert_eq!(store.node_count(), 0);
3197 assert!(store.get_node(id).is_none());
3198
3199 assert!(!store.delete_node(id));
3201 }
3202
3203 #[test]
3204 fn test_create_edge() {
3205 let store = LpgStore::new();
3206
3207 let alice = store.create_node(&["Person"]);
3208 let bob = store.create_node(&["Person"]);
3209
3210 let edge_id = store.create_edge(alice, bob, "KNOWS");
3211 assert!(edge_id.is_valid());
3212
3213 let edge = store.get_edge(edge_id).unwrap();
3214 assert_eq!(edge.src, alice);
3215 assert_eq!(edge.dst, bob);
3216 assert_eq!(edge.edge_type.as_str(), "KNOWS");
3217 }
3218
3219 #[test]
3220 fn test_neighbors() {
3221 let store = LpgStore::new();
3222
3223 let a = store.create_node(&["Person"]);
3224 let b = store.create_node(&["Person"]);
3225 let c = store.create_node(&["Person"]);
3226
3227 store.create_edge(a, b, "KNOWS");
3228 store.create_edge(a, c, "KNOWS");
3229
3230 let outgoing: Vec<_> = store.neighbors(a, Direction::Outgoing).collect();
3231 assert_eq!(outgoing.len(), 2);
3232 assert!(outgoing.contains(&b));
3233 assert!(outgoing.contains(&c));
3234
3235 let incoming: Vec<_> = store.neighbors(b, Direction::Incoming).collect();
3236 assert_eq!(incoming.len(), 1);
3237 assert!(incoming.contains(&a));
3238 }
3239
3240 #[test]
3241 fn test_nodes_by_label() {
3242 let store = LpgStore::new();
3243
3244 let p1 = store.create_node(&["Person"]);
3245 let p2 = store.create_node(&["Person"]);
3246 let _a = store.create_node(&["Animal"]);
3247
3248 let persons = store.nodes_by_label("Person");
3249 assert_eq!(persons.len(), 2);
3250 assert!(persons.contains(&p1));
3251 assert!(persons.contains(&p2));
3252
3253 let animals = store.nodes_by_label("Animal");
3254 assert_eq!(animals.len(), 1);
3255 }
3256
3257 #[test]
3258 fn test_delete_edge() {
3259 let store = LpgStore::new();
3260
3261 let a = store.create_node(&["Person"]);
3262 let b = store.create_node(&["Person"]);
3263 let edge_id = store.create_edge(a, b, "KNOWS");
3264
3265 assert_eq!(store.edge_count(), 1);
3266
3267 assert!(store.delete_edge(edge_id));
3268 assert_eq!(store.edge_count(), 0);
3269 assert!(store.get_edge(edge_id).is_none());
3270 }
3271
3272 #[test]
3275 fn test_lpg_store_config() {
3276 let config = LpgStoreConfig {
3278 backward_edges: false,
3279 initial_node_capacity: 100,
3280 initial_edge_capacity: 200,
3281 };
3282 let store = LpgStore::with_config(config);
3283
3284 let a = store.create_node(&["Person"]);
3286 let b = store.create_node(&["Person"]);
3287 store.create_edge(a, b, "KNOWS");
3288
3289 let outgoing: Vec<_> = store.neighbors(a, Direction::Outgoing).collect();
3291 assert_eq!(outgoing.len(), 1);
3292
3293 let incoming: Vec<_> = store.neighbors(b, Direction::Incoming).collect();
3295 assert_eq!(incoming.len(), 0);
3296 }
3297
3298 #[test]
3299 fn test_epoch_management() {
3300 let store = LpgStore::new();
3301
3302 let epoch0 = store.current_epoch();
3303 assert_eq!(epoch0.as_u64(), 0);
3304
3305 let epoch1 = store.new_epoch();
3306 assert_eq!(epoch1.as_u64(), 1);
3307
3308 let current = store.current_epoch();
3309 assert_eq!(current.as_u64(), 1);
3310 }
3311
3312 #[test]
3313 fn test_node_properties() {
3314 let store = LpgStore::new();
3315 let id = store.create_node(&["Person"]);
3316
3317 store.set_node_property(id, "name", Value::from("Alice"));
3319 let name = store.get_node_property(id, &"name".into());
3320 assert!(matches!(name, Some(Value::String(s)) if s.as_str() == "Alice"));
3321
3322 store.set_node_property(id, "name", Value::from("Bob"));
3324 let name = store.get_node_property(id, &"name".into());
3325 assert!(matches!(name, Some(Value::String(s)) if s.as_str() == "Bob"));
3326
3327 let old = store.remove_node_property(id, "name");
3329 assert!(matches!(old, Some(Value::String(s)) if s.as_str() == "Bob"));
3330
3331 let name = store.get_node_property(id, &"name".into());
3333 assert!(name.is_none());
3334
3335 let none = store.remove_node_property(id, "nonexistent");
3337 assert!(none.is_none());
3338 }
3339
3340 #[test]
3341 fn test_edge_properties() {
3342 let store = LpgStore::new();
3343 let a = store.create_node(&["Person"]);
3344 let b = store.create_node(&["Person"]);
3345 let edge_id = store.create_edge(a, b, "KNOWS");
3346
3347 store.set_edge_property(edge_id, "since", Value::from(2020i64));
3349 let since = store.get_edge_property(edge_id, &"since".into());
3350 assert_eq!(since.and_then(|v| v.as_int64()), Some(2020));
3351
3352 let old = store.remove_edge_property(edge_id, "since");
3354 assert_eq!(old.and_then(|v| v.as_int64()), Some(2020));
3355
3356 let since = store.get_edge_property(edge_id, &"since".into());
3357 assert!(since.is_none());
3358 }
3359
3360 #[test]
3361 fn test_add_remove_label() {
3362 let store = LpgStore::new();
3363 let id = store.create_node(&["Person"]);
3364
3365 assert!(store.add_label(id, "Employee"));
3367
3368 let node = store.get_node(id).unwrap();
3369 assert!(node.has_label("Person"));
3370 assert!(node.has_label("Employee"));
3371
3372 assert!(!store.add_label(id, "Employee"));
3374
3375 assert!(store.remove_label(id, "Employee"));
3377
3378 let node = store.get_node(id).unwrap();
3379 assert!(node.has_label("Person"));
3380 assert!(!node.has_label("Employee"));
3381
3382 assert!(!store.remove_label(id, "Employee"));
3384 assert!(!store.remove_label(id, "NonExistent"));
3385 }
3386
3387 #[test]
3388 fn test_add_label_to_nonexistent_node() {
3389 let store = LpgStore::new();
3390 let fake_id = NodeId::new(999);
3391 assert!(!store.add_label(fake_id, "Label"));
3392 }
3393
3394 #[test]
3395 fn test_remove_label_from_nonexistent_node() {
3396 let store = LpgStore::new();
3397 let fake_id = NodeId::new(999);
3398 assert!(!store.remove_label(fake_id, "Label"));
3399 }
3400
3401 #[test]
3402 fn test_node_ids() {
3403 let store = LpgStore::new();
3404
3405 let n1 = store.create_node(&["Person"]);
3406 let n2 = store.create_node(&["Person"]);
3407 let n3 = store.create_node(&["Person"]);
3408
3409 let ids = store.node_ids();
3410 assert_eq!(ids.len(), 3);
3411 assert!(ids.contains(&n1));
3412 assert!(ids.contains(&n2));
3413 assert!(ids.contains(&n3));
3414
3415 store.delete_node(n2);
3417 let ids = store.node_ids();
3418 assert_eq!(ids.len(), 2);
3419 assert!(!ids.contains(&n2));
3420 }
3421
3422 #[test]
3423 fn test_delete_node_nonexistent() {
3424 let store = LpgStore::new();
3425 let fake_id = NodeId::new(999);
3426 assert!(!store.delete_node(fake_id));
3427 }
3428
3429 #[test]
3430 fn test_delete_edge_nonexistent() {
3431 let store = LpgStore::new();
3432 let fake_id = EdgeId::new(999);
3433 assert!(!store.delete_edge(fake_id));
3434 }
3435
3436 #[test]
3437 fn test_delete_edge_double() {
3438 let store = LpgStore::new();
3439 let a = store.create_node(&["Person"]);
3440 let b = store.create_node(&["Person"]);
3441 let edge_id = store.create_edge(a, b, "KNOWS");
3442
3443 assert!(store.delete_edge(edge_id));
3444 assert!(!store.delete_edge(edge_id)); }
3446
3447 #[test]
3448 fn test_create_edge_with_props() {
3449 let store = LpgStore::new();
3450 let a = store.create_node(&["Person"]);
3451 let b = store.create_node(&["Person"]);
3452
3453 let edge_id = store.create_edge_with_props(
3454 a,
3455 b,
3456 "KNOWS",
3457 [
3458 ("since", Value::from(2020i64)),
3459 ("weight", Value::from(1.0)),
3460 ],
3461 );
3462
3463 let edge = store.get_edge(edge_id).unwrap();
3464 assert_eq!(
3465 edge.get_property("since").and_then(|v| v.as_int64()),
3466 Some(2020)
3467 );
3468 assert_eq!(
3469 edge.get_property("weight").and_then(|v| v.as_float64()),
3470 Some(1.0)
3471 );
3472 }
3473
3474 #[test]
3475 fn test_delete_node_edges() {
3476 let store = LpgStore::new();
3477
3478 let a = store.create_node(&["Person"]);
3479 let b = store.create_node(&["Person"]);
3480 let c = store.create_node(&["Person"]);
3481
3482 store.create_edge(a, b, "KNOWS"); store.create_edge(c, a, "KNOWS"); assert_eq!(store.edge_count(), 2);
3486
3487 store.delete_node_edges(a);
3489
3490 assert_eq!(store.edge_count(), 0);
3491 }
3492
3493 #[test]
3494 fn test_neighbors_both_directions() {
3495 let store = LpgStore::new();
3496
3497 let a = store.create_node(&["Person"]);
3498 let b = store.create_node(&["Person"]);
3499 let c = store.create_node(&["Person"]);
3500
3501 store.create_edge(a, b, "KNOWS"); store.create_edge(c, a, "KNOWS"); let neighbors: Vec<_> = store.neighbors(a, Direction::Both).collect();
3506 assert_eq!(neighbors.len(), 2);
3507 assert!(neighbors.contains(&b)); assert!(neighbors.contains(&c)); }
3510
3511 #[test]
3512 fn test_edges_from() {
3513 let store = LpgStore::new();
3514
3515 let a = store.create_node(&["Person"]);
3516 let b = store.create_node(&["Person"]);
3517 let c = store.create_node(&["Person"]);
3518
3519 let e1 = store.create_edge(a, b, "KNOWS");
3520 let e2 = store.create_edge(a, c, "KNOWS");
3521
3522 let edges: Vec<_> = store.edges_from(a, Direction::Outgoing).collect();
3523 assert_eq!(edges.len(), 2);
3524 assert!(edges.iter().any(|(_, e)| *e == e1));
3525 assert!(edges.iter().any(|(_, e)| *e == e2));
3526
3527 let incoming: Vec<_> = store.edges_from(b, Direction::Incoming).collect();
3529 assert_eq!(incoming.len(), 1);
3530 assert_eq!(incoming[0].1, e1);
3531 }
3532
3533 #[test]
3534 fn test_edges_to() {
3535 let store = LpgStore::new();
3536
3537 let a = store.create_node(&["Person"]);
3538 let b = store.create_node(&["Person"]);
3539 let c = store.create_node(&["Person"]);
3540
3541 let e1 = store.create_edge(a, b, "KNOWS");
3542 let e2 = store.create_edge(c, b, "KNOWS");
3543
3544 let to_b = store.edges_to(b);
3546 assert_eq!(to_b.len(), 2);
3547 assert!(to_b.iter().any(|(src, e)| *src == a && *e == e1));
3548 assert!(to_b.iter().any(|(src, e)| *src == c && *e == e2));
3549 }
3550
3551 #[test]
3552 fn test_out_degree_in_degree() {
3553 let store = LpgStore::new();
3554
3555 let a = store.create_node(&["Person"]);
3556 let b = store.create_node(&["Person"]);
3557 let c = store.create_node(&["Person"]);
3558
3559 store.create_edge(a, b, "KNOWS");
3560 store.create_edge(a, c, "KNOWS");
3561 store.create_edge(c, b, "KNOWS");
3562
3563 assert_eq!(store.out_degree(a), 2);
3564 assert_eq!(store.out_degree(b), 0);
3565 assert_eq!(store.out_degree(c), 1);
3566
3567 assert_eq!(store.in_degree(a), 0);
3568 assert_eq!(store.in_degree(b), 2);
3569 assert_eq!(store.in_degree(c), 1);
3570 }
3571
3572 #[test]
3573 fn test_edge_type() {
3574 let store = LpgStore::new();
3575
3576 let a = store.create_node(&["Person"]);
3577 let b = store.create_node(&["Person"]);
3578 let edge_id = store.create_edge(a, b, "KNOWS");
3579
3580 let edge_type = store.edge_type(edge_id);
3581 assert_eq!(edge_type.as_deref(), Some("KNOWS"));
3582
3583 let fake_id = EdgeId::new(999);
3585 assert!(store.edge_type(fake_id).is_none());
3586 }
3587
3588 #[test]
3589 fn test_count_methods() {
3590 let store = LpgStore::new();
3591
3592 assert_eq!(store.label_count(), 0);
3593 assert_eq!(store.edge_type_count(), 0);
3594 assert_eq!(store.property_key_count(), 0);
3595
3596 let a = store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
3597 let b = store.create_node(&["Company"]);
3598 store.create_edge_with_props(a, b, "WORKS_AT", [("since", Value::from(2020i64))]);
3599
3600 assert_eq!(store.label_count(), 2); assert_eq!(store.edge_type_count(), 1); assert_eq!(store.property_key_count(), 2); }
3604
3605 #[test]
3606 fn test_all_nodes_and_edges() {
3607 let store = LpgStore::new();
3608
3609 let a = store.create_node(&["Person"]);
3610 let b = store.create_node(&["Person"]);
3611 store.create_edge(a, b, "KNOWS");
3612
3613 let nodes: Vec<_> = store.all_nodes().collect();
3614 assert_eq!(nodes.len(), 2);
3615
3616 let edges: Vec<_> = store.all_edges().collect();
3617 assert_eq!(edges.len(), 1);
3618 }
3619
3620 #[test]
3621 fn test_all_labels_and_edge_types() {
3622 let store = LpgStore::new();
3623
3624 store.create_node(&["Person"]);
3625 store.create_node(&["Company"]);
3626 let a = store.create_node(&["Animal"]);
3627 let b = store.create_node(&["Animal"]);
3628 store.create_edge(a, b, "EATS");
3629
3630 let labels = store.all_labels();
3631 assert_eq!(labels.len(), 3);
3632 assert!(labels.contains(&"Person".to_string()));
3633 assert!(labels.contains(&"Company".to_string()));
3634 assert!(labels.contains(&"Animal".to_string()));
3635
3636 let edge_types = store.all_edge_types();
3637 assert_eq!(edge_types.len(), 1);
3638 assert!(edge_types.contains(&"EATS".to_string()));
3639 }
3640
3641 #[test]
3642 fn test_all_property_keys() {
3643 let store = LpgStore::new();
3644
3645 let a = store.create_node_with_props(&["Person"], [("name", Value::from("Alice"))]);
3646 let b = store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
3647 store.create_edge_with_props(a, b, "KNOWS", [("since", Value::from(2020i64))]);
3648
3649 let keys = store.all_property_keys();
3650 assert!(keys.contains(&"name".to_string()));
3651 assert!(keys.contains(&"age".to_string()));
3652 assert!(keys.contains(&"since".to_string()));
3653 }
3654
3655 #[test]
3656 fn test_nodes_with_label() {
3657 let store = LpgStore::new();
3658
3659 store.create_node(&["Person"]);
3660 store.create_node(&["Person"]);
3661 store.create_node(&["Company"]);
3662
3663 let persons: Vec<_> = store.nodes_with_label("Person").collect();
3664 assert_eq!(persons.len(), 2);
3665
3666 let companies: Vec<_> = store.nodes_with_label("Company").collect();
3667 assert_eq!(companies.len(), 1);
3668
3669 let none: Vec<_> = store.nodes_with_label("NonExistent").collect();
3670 assert_eq!(none.len(), 0);
3671 }
3672
3673 #[test]
3674 fn test_edges_with_type() {
3675 let store = LpgStore::new();
3676
3677 let a = store.create_node(&["Person"]);
3678 let b = store.create_node(&["Person"]);
3679 let c = store.create_node(&["Company"]);
3680
3681 store.create_edge(a, b, "KNOWS");
3682 store.create_edge(a, c, "WORKS_AT");
3683
3684 let knows: Vec<_> = store.edges_with_type("KNOWS").collect();
3685 assert_eq!(knows.len(), 1);
3686
3687 let works_at: Vec<_> = store.edges_with_type("WORKS_AT").collect();
3688 assert_eq!(works_at.len(), 1);
3689
3690 let none: Vec<_> = store.edges_with_type("NonExistent").collect();
3691 assert_eq!(none.len(), 0);
3692 }
3693
3694 #[test]
3695 fn test_nodes_by_label_nonexistent() {
3696 let store = LpgStore::new();
3697 store.create_node(&["Person"]);
3698
3699 let empty = store.nodes_by_label("NonExistent");
3700 assert!(empty.is_empty());
3701 }
3702
3703 #[test]
3704 fn test_statistics() {
3705 let store = LpgStore::new();
3706
3707 let a = store.create_node(&["Person"]);
3708 let b = store.create_node(&["Person"]);
3709 let c = store.create_node(&["Company"]);
3710
3711 store.create_edge(a, b, "KNOWS");
3712 store.create_edge(a, c, "WORKS_AT");
3713
3714 store.compute_statistics();
3715 let stats = store.statistics();
3716
3717 assert_eq!(stats.total_nodes, 3);
3718 assert_eq!(stats.total_edges, 2);
3719
3720 let person_card = store.estimate_label_cardinality("Person");
3722 assert!(person_card > 0.0);
3723
3724 let avg_degree = store.estimate_avg_degree("KNOWS", true);
3725 assert!(avg_degree >= 0.0);
3726 }
3727
3728 #[test]
3729 fn test_zone_maps() {
3730 let store = LpgStore::new();
3731
3732 store.create_node_with_props(&["Person"], [("age", Value::from(25i64))]);
3733 store.create_node_with_props(&["Person"], [("age", Value::from(35i64))]);
3734
3735 let might_match =
3737 store.node_property_might_match(&"age".into(), CompareOp::Eq, &Value::from(30i64));
3738 assert!(might_match);
3740
3741 let zone = store.node_property_zone_map(&"age".into());
3742 assert!(zone.is_some());
3743
3744 let no_zone = store.node_property_zone_map(&"nonexistent".into());
3746 assert!(no_zone.is_none());
3747
3748 let a = store.create_node(&["A"]);
3750 let b = store.create_node(&["B"]);
3751 store.create_edge_with_props(a, b, "REL", [("weight", Value::from(1.0))]);
3752
3753 let edge_zone = store.edge_property_zone_map(&"weight".into());
3754 assert!(edge_zone.is_some());
3755 }
3756
3757 #[test]
3758 fn test_rebuild_zone_maps() {
3759 let store = LpgStore::new();
3760 store.create_node_with_props(&["Person"], [("age", Value::from(25i64))]);
3761
3762 store.rebuild_zone_maps();
3764 }
3765
3766 #[test]
3767 fn test_create_node_with_id() {
3768 let store = LpgStore::new();
3769
3770 let specific_id = NodeId::new(100);
3771 store.create_node_with_id(specific_id, &["Person", "Employee"]);
3772
3773 let node = store.get_node(specific_id).unwrap();
3774 assert!(node.has_label("Person"));
3775 assert!(node.has_label("Employee"));
3776
3777 let next = store.create_node(&["Other"]);
3779 assert!(next.as_u64() > 100);
3780 }
3781
3782 #[test]
3783 fn test_create_edge_with_id() {
3784 let store = LpgStore::new();
3785
3786 let a = store.create_node(&["A"]);
3787 let b = store.create_node(&["B"]);
3788
3789 let specific_id = EdgeId::new(500);
3790 store.create_edge_with_id(specific_id, a, b, "REL");
3791
3792 let edge = store.get_edge(specific_id).unwrap();
3793 assert_eq!(edge.src, a);
3794 assert_eq!(edge.dst, b);
3795 assert_eq!(edge.edge_type.as_str(), "REL");
3796
3797 let next = store.create_edge(a, b, "OTHER");
3799 assert!(next.as_u64() > 500);
3800 }
3801
3802 #[test]
3803 fn test_set_epoch() {
3804 let store = LpgStore::new();
3805
3806 assert_eq!(store.current_epoch().as_u64(), 0);
3807
3808 store.set_epoch(EpochId::new(42));
3809 assert_eq!(store.current_epoch().as_u64(), 42);
3810 }
3811
3812 #[test]
3813 fn test_get_node_nonexistent() {
3814 let store = LpgStore::new();
3815 let fake_id = NodeId::new(999);
3816 assert!(store.get_node(fake_id).is_none());
3817 }
3818
3819 #[test]
3820 fn test_get_edge_nonexistent() {
3821 let store = LpgStore::new();
3822 let fake_id = EdgeId::new(999);
3823 assert!(store.get_edge(fake_id).is_none());
3824 }
3825
3826 #[test]
3827 fn test_multiple_labels() {
3828 let store = LpgStore::new();
3829
3830 let id = store.create_node(&["Person", "Employee", "Manager"]);
3831 let node = store.get_node(id).unwrap();
3832
3833 assert!(node.has_label("Person"));
3834 assert!(node.has_label("Employee"));
3835 assert!(node.has_label("Manager"));
3836 assert!(!node.has_label("Other"));
3837 }
3838
3839 #[test]
3840 fn test_default_impl() {
3841 let store: LpgStore = Default::default();
3842 assert_eq!(store.node_count(), 0);
3843 assert_eq!(store.edge_count(), 0);
3844 }
3845
3846 #[test]
3847 fn test_edges_from_both_directions() {
3848 let store = LpgStore::new();
3849
3850 let a = store.create_node(&["A"]);
3851 let b = store.create_node(&["B"]);
3852 let c = store.create_node(&["C"]);
3853
3854 let e1 = store.create_edge(a, b, "R1"); let e2 = store.create_edge(c, a, "R2"); let edges: Vec<_> = store.edges_from(a, Direction::Both).collect();
3859 assert_eq!(edges.len(), 2);
3860 assert!(edges.iter().any(|(_, e)| *e == e1)); assert!(edges.iter().any(|(_, e)| *e == e2)); }
3863
3864 #[test]
3865 fn test_no_backward_adj_in_degree() {
3866 let config = LpgStoreConfig {
3867 backward_edges: false,
3868 initial_node_capacity: 10,
3869 initial_edge_capacity: 10,
3870 };
3871 let store = LpgStore::with_config(config);
3872
3873 let a = store.create_node(&["A"]);
3874 let b = store.create_node(&["B"]);
3875 store.create_edge(a, b, "R");
3876
3877 let degree = store.in_degree(b);
3879 assert_eq!(degree, 1);
3880 }
3881
3882 #[test]
3883 fn test_no_backward_adj_edges_to() {
3884 let config = LpgStoreConfig {
3885 backward_edges: false,
3886 initial_node_capacity: 10,
3887 initial_edge_capacity: 10,
3888 };
3889 let store = LpgStore::with_config(config);
3890
3891 let a = store.create_node(&["A"]);
3892 let b = store.create_node(&["B"]);
3893 let e = store.create_edge(a, b, "R");
3894
3895 let edges = store.edges_to(b);
3897 assert_eq!(edges.len(), 1);
3898 assert_eq!(edges[0].1, e);
3899 }
3900
3901 #[test]
3902 fn test_node_versioned_creation() {
3903 let store = LpgStore::new();
3904
3905 let epoch = store.new_epoch();
3906 let tx_id = TxId::new(1);
3907
3908 let id = store.create_node_versioned(&["Person"], epoch, tx_id);
3909 assert!(store.get_node(id).is_some());
3910 }
3911
3912 #[test]
3913 fn test_edge_versioned_creation() {
3914 let store = LpgStore::new();
3915
3916 let a = store.create_node(&["A"]);
3917 let b = store.create_node(&["B"]);
3918
3919 let epoch = store.new_epoch();
3920 let tx_id = TxId::new(1);
3921
3922 let edge_id = store.create_edge_versioned(a, b, "REL", epoch, tx_id);
3923 assert!(store.get_edge(edge_id).is_some());
3924 }
3925
3926 #[test]
3927 fn test_node_with_props_versioned() {
3928 let store = LpgStore::new();
3929
3930 let epoch = store.new_epoch();
3931 let tx_id = TxId::new(1);
3932
3933 let id = store.create_node_with_props_versioned(
3934 &["Person"],
3935 [("name", Value::from("Alice"))],
3936 epoch,
3937 tx_id,
3938 );
3939
3940 let node = store.get_node(id).unwrap();
3941 assert_eq!(
3942 node.get_property("name").and_then(|v| v.as_str()),
3943 Some("Alice")
3944 );
3945 }
3946
3947 #[test]
3948 fn test_discard_uncommitted_versions() {
3949 let store = LpgStore::new();
3950
3951 let epoch = store.new_epoch();
3952 let tx_id = TxId::new(42);
3953
3954 let node_id = store.create_node_versioned(&["Person"], epoch, tx_id);
3956 assert!(store.get_node(node_id).is_some());
3957
3958 store.discard_uncommitted_versions(tx_id);
3960
3961 assert!(store.get_node(node_id).is_none());
3963 }
3964
3965 #[test]
3968 fn test_property_index_create_and_lookup() {
3969 let store = LpgStore::new();
3970
3971 let alice = store.create_node(&["Person"]);
3973 let bob = store.create_node(&["Person"]);
3974 let charlie = store.create_node(&["Person"]);
3975
3976 store.set_node_property(alice, "city", Value::from("NYC"));
3977 store.set_node_property(bob, "city", Value::from("NYC"));
3978 store.set_node_property(charlie, "city", Value::from("LA"));
3979
3980 let nyc_people = store.find_nodes_by_property("city", &Value::from("NYC"));
3982 assert_eq!(nyc_people.len(), 2);
3983
3984 store.create_property_index("city");
3986 assert!(store.has_property_index("city"));
3987
3988 let nyc_people = store.find_nodes_by_property("city", &Value::from("NYC"));
3990 assert_eq!(nyc_people.len(), 2);
3991 assert!(nyc_people.contains(&alice));
3992 assert!(nyc_people.contains(&bob));
3993
3994 let la_people = store.find_nodes_by_property("city", &Value::from("LA"));
3995 assert_eq!(la_people.len(), 1);
3996 assert!(la_people.contains(&charlie));
3997 }
3998
3999 #[test]
4000 fn test_property_index_maintained_on_update() {
4001 let store = LpgStore::new();
4002
4003 store.create_property_index("status");
4005
4006 let node = store.create_node(&["Task"]);
4007 store.set_node_property(node, "status", Value::from("pending"));
4008
4009 let pending = store.find_nodes_by_property("status", &Value::from("pending"));
4011 assert_eq!(pending.len(), 1);
4012 assert!(pending.contains(&node));
4013
4014 store.set_node_property(node, "status", Value::from("done"));
4016
4017 let pending = store.find_nodes_by_property("status", &Value::from("pending"));
4019 assert!(pending.is_empty());
4020
4021 let done = store.find_nodes_by_property("status", &Value::from("done"));
4023 assert_eq!(done.len(), 1);
4024 assert!(done.contains(&node));
4025 }
4026
4027 #[test]
4028 fn test_property_index_maintained_on_remove() {
4029 let store = LpgStore::new();
4030
4031 store.create_property_index("tag");
4032
4033 let node = store.create_node(&["Item"]);
4034 store.set_node_property(node, "tag", Value::from("important"));
4035
4036 let found = store.find_nodes_by_property("tag", &Value::from("important"));
4038 assert_eq!(found.len(), 1);
4039
4040 store.remove_node_property(node, "tag");
4042
4043 let found = store.find_nodes_by_property("tag", &Value::from("important"));
4045 assert!(found.is_empty());
4046 }
4047
4048 #[test]
4049 fn test_property_index_drop() {
4050 let store = LpgStore::new();
4051
4052 store.create_property_index("key");
4053 assert!(store.has_property_index("key"));
4054
4055 assert!(store.drop_property_index("key"));
4056 assert!(!store.has_property_index("key"));
4057
4058 assert!(!store.drop_property_index("key"));
4060 }
4061
4062 #[test]
4063 fn test_property_index_multiple_values() {
4064 let store = LpgStore::new();
4065
4066 store.create_property_index("age");
4067
4068 let n1 = store.create_node(&["Person"]);
4070 let n2 = store.create_node(&["Person"]);
4071 let n3 = store.create_node(&["Person"]);
4072 let n4 = store.create_node(&["Person"]);
4073
4074 store.set_node_property(n1, "age", Value::from(25i64));
4075 store.set_node_property(n2, "age", Value::from(25i64));
4076 store.set_node_property(n3, "age", Value::from(30i64));
4077 store.set_node_property(n4, "age", Value::from(25i64));
4078
4079 let age_25 = store.find_nodes_by_property("age", &Value::from(25i64));
4080 assert_eq!(age_25.len(), 3);
4081
4082 let age_30 = store.find_nodes_by_property("age", &Value::from(30i64));
4083 assert_eq!(age_30.len(), 1);
4084
4085 let age_40 = store.find_nodes_by_property("age", &Value::from(40i64));
4086 assert!(age_40.is_empty());
4087 }
4088
4089 #[test]
4090 fn test_property_index_builds_from_existing_data() {
4091 let store = LpgStore::new();
4092
4093 let n1 = store.create_node(&["Person"]);
4095 let n2 = store.create_node(&["Person"]);
4096 store.set_node_property(n1, "email", Value::from("alice@example.com"));
4097 store.set_node_property(n2, "email", Value::from("bob@example.com"));
4098
4099 store.create_property_index("email");
4101
4102 let alice = store.find_nodes_by_property("email", &Value::from("alice@example.com"));
4104 assert_eq!(alice.len(), 1);
4105 assert!(alice.contains(&n1));
4106
4107 let bob = store.find_nodes_by_property("email", &Value::from("bob@example.com"));
4108 assert_eq!(bob.len(), 1);
4109 assert!(bob.contains(&n2));
4110 }
4111
4112 #[test]
4113 fn test_get_node_property_batch() {
4114 let store = LpgStore::new();
4115
4116 let n1 = store.create_node(&["Person"]);
4117 let n2 = store.create_node(&["Person"]);
4118 let n3 = store.create_node(&["Person"]);
4119
4120 store.set_node_property(n1, "age", Value::from(25i64));
4121 store.set_node_property(n2, "age", Value::from(30i64));
4122 let age_key = PropertyKey::new("age");
4125 let values = store.get_node_property_batch(&[n1, n2, n3], &age_key);
4126
4127 assert_eq!(values.len(), 3);
4128 assert_eq!(values[0], Some(Value::from(25i64)));
4129 assert_eq!(values[1], Some(Value::from(30i64)));
4130 assert_eq!(values[2], None);
4131 }
4132
4133 #[test]
4134 fn test_get_node_property_batch_empty() {
4135 let store = LpgStore::new();
4136 let key = PropertyKey::new("any");
4137
4138 let values = store.get_node_property_batch(&[], &key);
4139 assert!(values.is_empty());
4140 }
4141
4142 #[test]
4143 fn test_get_nodes_properties_batch() {
4144 let store = LpgStore::new();
4145
4146 let n1 = store.create_node(&["Person"]);
4147 let n2 = store.create_node(&["Person"]);
4148 let n3 = store.create_node(&["Person"]);
4149
4150 store.set_node_property(n1, "name", Value::from("Alice"));
4151 store.set_node_property(n1, "age", Value::from(25i64));
4152 store.set_node_property(n2, "name", Value::from("Bob"));
4153 let all_props = store.get_nodes_properties_batch(&[n1, n2, n3]);
4156
4157 assert_eq!(all_props.len(), 3);
4158 assert_eq!(all_props[0].len(), 2); assert_eq!(all_props[1].len(), 1); assert_eq!(all_props[2].len(), 0); assert_eq!(
4163 all_props[0].get(&PropertyKey::new("name")),
4164 Some(&Value::from("Alice"))
4165 );
4166 assert_eq!(
4167 all_props[1].get(&PropertyKey::new("name")),
4168 Some(&Value::from("Bob"))
4169 );
4170 }
4171
4172 #[test]
4173 fn test_get_nodes_properties_batch_empty() {
4174 let store = LpgStore::new();
4175
4176 let all_props = store.get_nodes_properties_batch(&[]);
4177 assert!(all_props.is_empty());
4178 }
4179
4180 #[test]
4181 fn test_get_nodes_properties_selective_batch() {
4182 let store = LpgStore::new();
4183
4184 let n1 = store.create_node(&["Person"]);
4185 let n2 = store.create_node(&["Person"]);
4186
4187 store.set_node_property(n1, "name", Value::from("Alice"));
4189 store.set_node_property(n1, "age", Value::from(25i64));
4190 store.set_node_property(n1, "email", Value::from("alice@example.com"));
4191 store.set_node_property(n2, "name", Value::from("Bob"));
4192 store.set_node_property(n2, "age", Value::from(30i64));
4193 store.set_node_property(n2, "city", Value::from("NYC"));
4194
4195 let keys = vec![PropertyKey::new("name"), PropertyKey::new("age")];
4197 let props = store.get_nodes_properties_selective_batch(&[n1, n2], &keys);
4198
4199 assert_eq!(props.len(), 2);
4200
4201 assert_eq!(props[0].len(), 2);
4203 assert_eq!(
4204 props[0].get(&PropertyKey::new("name")),
4205 Some(&Value::from("Alice"))
4206 );
4207 assert_eq!(
4208 props[0].get(&PropertyKey::new("age")),
4209 Some(&Value::from(25i64))
4210 );
4211 assert_eq!(props[0].get(&PropertyKey::new("email")), None);
4212
4213 assert_eq!(props[1].len(), 2);
4215 assert_eq!(
4216 props[1].get(&PropertyKey::new("name")),
4217 Some(&Value::from("Bob"))
4218 );
4219 assert_eq!(
4220 props[1].get(&PropertyKey::new("age")),
4221 Some(&Value::from(30i64))
4222 );
4223 assert_eq!(props[1].get(&PropertyKey::new("city")), None);
4224 }
4225
4226 #[test]
4227 fn test_get_nodes_properties_selective_batch_empty_keys() {
4228 let store = LpgStore::new();
4229
4230 let n1 = store.create_node(&["Person"]);
4231 store.set_node_property(n1, "name", Value::from("Alice"));
4232
4233 let props = store.get_nodes_properties_selective_batch(&[n1], &[]);
4235
4236 assert_eq!(props.len(), 1);
4237 assert!(props[0].is_empty()); }
4239
4240 #[test]
4241 fn test_get_nodes_properties_selective_batch_missing_keys() {
4242 let store = LpgStore::new();
4243
4244 let n1 = store.create_node(&["Person"]);
4245 store.set_node_property(n1, "name", Value::from("Alice"));
4246
4247 let keys = vec![PropertyKey::new("nonexistent"), PropertyKey::new("name")];
4249 let props = store.get_nodes_properties_selective_batch(&[n1], &keys);
4250
4251 assert_eq!(props.len(), 1);
4252 assert_eq!(props[0].len(), 1); assert_eq!(
4254 props[0].get(&PropertyKey::new("name")),
4255 Some(&Value::from("Alice"))
4256 );
4257 }
4258
4259 #[test]
4262 fn test_find_nodes_in_range_inclusive() {
4263 let store = LpgStore::new();
4264
4265 let n1 = store.create_node_with_props(&["Person"], [("age", Value::from(20i64))]);
4266 let n2 = store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
4267 let n3 = store.create_node_with_props(&["Person"], [("age", Value::from(40i64))]);
4268 let _n4 = store.create_node_with_props(&["Person"], [("age", Value::from(50i64))]);
4269
4270 let result = store.find_nodes_in_range(
4272 "age",
4273 Some(&Value::from(20i64)),
4274 Some(&Value::from(40i64)),
4275 true,
4276 true,
4277 );
4278 assert_eq!(result.len(), 3);
4279 assert!(result.contains(&n1));
4280 assert!(result.contains(&n2));
4281 assert!(result.contains(&n3));
4282 }
4283
4284 #[test]
4285 fn test_find_nodes_in_range_exclusive() {
4286 let store = LpgStore::new();
4287
4288 store.create_node_with_props(&["Person"], [("age", Value::from(20i64))]);
4289 let n2 = store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
4290 store.create_node_with_props(&["Person"], [("age", Value::from(40i64))]);
4291
4292 let result = store.find_nodes_in_range(
4294 "age",
4295 Some(&Value::from(20i64)),
4296 Some(&Value::from(40i64)),
4297 false,
4298 false,
4299 );
4300 assert_eq!(result.len(), 1);
4301 assert!(result.contains(&n2));
4302 }
4303
4304 #[test]
4305 fn test_find_nodes_in_range_open_ended() {
4306 let store = LpgStore::new();
4307
4308 store.create_node_with_props(&["Person"], [("age", Value::from(20i64))]);
4309 store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
4310 let n3 = store.create_node_with_props(&["Person"], [("age", Value::from(40i64))]);
4311 let n4 = store.create_node_with_props(&["Person"], [("age", Value::from(50i64))]);
4312
4313 let result = store.find_nodes_in_range("age", Some(&Value::from(35i64)), None, true, true);
4315 assert_eq!(result.len(), 2);
4316 assert!(result.contains(&n3));
4317 assert!(result.contains(&n4));
4318
4319 let result = store.find_nodes_in_range("age", None, Some(&Value::from(25i64)), true, true);
4321 assert_eq!(result.len(), 1);
4322 }
4323
4324 #[test]
4325 fn test_find_nodes_in_range_empty_result() {
4326 let store = LpgStore::new();
4327
4328 store.create_node_with_props(&["Person"], [("age", Value::from(20i64))]);
4329
4330 let result = store.find_nodes_in_range(
4332 "age",
4333 Some(&Value::from(100i64)),
4334 Some(&Value::from(200i64)),
4335 true,
4336 true,
4337 );
4338 assert!(result.is_empty());
4339 }
4340
4341 #[test]
4342 fn test_find_nodes_in_range_nonexistent_property() {
4343 let store = LpgStore::new();
4344
4345 store.create_node_with_props(&["Person"], [("age", Value::from(20i64))]);
4346
4347 let result = store.find_nodes_in_range(
4348 "weight",
4349 Some(&Value::from(50i64)),
4350 Some(&Value::from(100i64)),
4351 true,
4352 true,
4353 );
4354 assert!(result.is_empty());
4355 }
4356
4357 #[test]
4360 fn test_find_nodes_by_properties_multiple_conditions() {
4361 let store = LpgStore::new();
4362
4363 let alice = store.create_node_with_props(
4364 &["Person"],
4365 [("name", Value::from("Alice")), ("city", Value::from("NYC"))],
4366 );
4367 store.create_node_with_props(
4368 &["Person"],
4369 [("name", Value::from("Bob")), ("city", Value::from("NYC"))],
4370 );
4371 store.create_node_with_props(
4372 &["Person"],
4373 [("name", Value::from("Alice")), ("city", Value::from("LA"))],
4374 );
4375
4376 let result = store.find_nodes_by_properties(&[
4378 ("name", Value::from("Alice")),
4379 ("city", Value::from("NYC")),
4380 ]);
4381 assert_eq!(result.len(), 1);
4382 assert!(result.contains(&alice));
4383 }
4384
4385 #[test]
4386 fn test_find_nodes_by_properties_empty_conditions() {
4387 let store = LpgStore::new();
4388
4389 store.create_node(&["Person"]);
4390 store.create_node(&["Person"]);
4391
4392 let result = store.find_nodes_by_properties(&[]);
4394 assert_eq!(result.len(), 2);
4395 }
4396
4397 #[test]
4398 fn test_find_nodes_by_properties_no_match() {
4399 let store = LpgStore::new();
4400
4401 store.create_node_with_props(&["Person"], [("name", Value::from("Alice"))]);
4402
4403 let result = store.find_nodes_by_properties(&[("name", Value::from("Nobody"))]);
4404 assert!(result.is_empty());
4405 }
4406
4407 #[test]
4408 fn test_find_nodes_by_properties_with_index() {
4409 let store = LpgStore::new();
4410
4411 store.create_property_index("name");
4413
4414 let alice = store.create_node_with_props(
4415 &["Person"],
4416 [("name", Value::from("Alice")), ("age", Value::from(30i64))],
4417 );
4418 store.create_node_with_props(
4419 &["Person"],
4420 [("name", Value::from("Bob")), ("age", Value::from(30i64))],
4421 );
4422
4423 let result = store.find_nodes_by_properties(&[
4425 ("name", Value::from("Alice")),
4426 ("age", Value::from(30i64)),
4427 ]);
4428 assert_eq!(result.len(), 1);
4429 assert!(result.contains(&alice));
4430 }
4431
4432 #[test]
4435 fn test_estimate_label_cardinality() {
4436 let store = LpgStore::new();
4437
4438 store.create_node(&["Person"]);
4439 store.create_node(&["Person"]);
4440 store.create_node(&["Animal"]);
4441
4442 store.ensure_statistics_fresh();
4443
4444 let person_est = store.estimate_label_cardinality("Person");
4445 let animal_est = store.estimate_label_cardinality("Animal");
4446 let unknown_est = store.estimate_label_cardinality("Unknown");
4447
4448 assert!(
4449 person_est >= 1.0,
4450 "Person should have cardinality >= 1, got {person_est}"
4451 );
4452 assert!(
4453 animal_est >= 1.0,
4454 "Animal should have cardinality >= 1, got {animal_est}"
4455 );
4456 assert!(unknown_est >= 0.0);
4458 }
4459
4460 #[test]
4461 fn test_estimate_avg_degree() {
4462 let store = LpgStore::new();
4463
4464 let a = store.create_node(&["Person"]);
4465 let b = store.create_node(&["Person"]);
4466 let c = store.create_node(&["Person"]);
4467
4468 store.create_edge(a, b, "KNOWS");
4469 store.create_edge(a, c, "KNOWS");
4470 store.create_edge(b, c, "KNOWS");
4471
4472 store.ensure_statistics_fresh();
4473
4474 let outgoing = store.estimate_avg_degree("KNOWS", true);
4475 let incoming = store.estimate_avg_degree("KNOWS", false);
4476
4477 assert!(
4478 outgoing > 0.0,
4479 "Outgoing degree should be > 0, got {outgoing}"
4480 );
4481 assert!(
4482 incoming > 0.0,
4483 "Incoming degree should be > 0, got {incoming}"
4484 );
4485 }
4486
4487 #[test]
4490 fn test_delete_node_does_not_cascade() {
4491 let store = LpgStore::new();
4492
4493 let a = store.create_node(&["A"]);
4494 let b = store.create_node(&["B"]);
4495 let e = store.create_edge(a, b, "KNOWS");
4496
4497 assert!(store.delete_node(a));
4498 assert!(store.get_node(a).is_none());
4499
4500 assert!(
4502 store.get_edge(e).is_some(),
4503 "Edge should survive non-detach node delete"
4504 );
4505 }
4506
4507 #[test]
4508 fn test_delete_already_deleted_node() {
4509 let store = LpgStore::new();
4510 let a = store.create_node(&["A"]);
4511
4512 assert!(store.delete_node(a));
4513 assert!(!store.delete_node(a));
4515 }
4516
4517 #[test]
4518 fn test_delete_nonexistent_node() {
4519 let store = LpgStore::new();
4520 assert!(!store.delete_node(NodeId::new(999)));
4521 }
4522}