1use super::property::CompareOp;
13use super::{Edge, EdgeRecord, Node, NodeRecord, PropertyStorage};
14use crate::graph::Direction;
15use crate::index::adjacency::ChunkedAdjacency;
16use crate::index::zone_map::ZoneMapEntry;
17use crate::statistics::{EdgeTypeStatistics, LabelStatistics, Statistics};
18use arcstr::ArcStr;
19use dashmap::DashMap;
20#[cfg(not(feature = "tiered-storage"))]
21use grafeo_common::mvcc::VersionChain;
22use grafeo_common::types::{EdgeId, EpochId, HashableValue, NodeId, PropertyKey, TxId, Value};
23use grafeo_common::utils::hash::{FxHashMap, FxHashSet};
24use parking_lot::RwLock;
25use std::cmp::Ordering as CmpOrdering;
26#[cfg(any(feature = "tiered-storage", feature = "vector-index"))]
27use std::sync::Arc;
28use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
29
30#[cfg(feature = "vector-index")]
31use crate::index::vector::HnswIndex;
32
33fn compare_values_for_range(a: &Value, b: &Value) -> Option<CmpOrdering> {
35 match (a, b) {
36 (Value::Int64(a), Value::Int64(b)) => Some(a.cmp(b)),
37 (Value::Float64(a), Value::Float64(b)) => a.partial_cmp(b),
38 (Value::String(a), Value::String(b)) => Some(a.cmp(b)),
39 (Value::Bool(a), Value::Bool(b)) => Some(a.cmp(b)),
40 _ => None,
41 }
42}
43
44fn value_in_range(
46 value: &Value,
47 min: Option<&Value>,
48 max: Option<&Value>,
49 min_inclusive: bool,
50 max_inclusive: bool,
51) -> bool {
52 if let Some(min_val) = min {
54 match compare_values_for_range(value, min_val) {
55 Some(CmpOrdering::Less) => return false,
56 Some(CmpOrdering::Equal) if !min_inclusive => return false,
57 None => return false, _ => {}
59 }
60 }
61
62 if let Some(max_val) = max {
64 match compare_values_for_range(value, max_val) {
65 Some(CmpOrdering::Greater) => return false,
66 Some(CmpOrdering::Equal) if !max_inclusive => return false,
67 None => return false,
68 _ => {}
69 }
70 }
71
72 true
73}
74
75#[cfg(feature = "tiered-storage")]
77use crate::storage::EpochStore;
78#[cfg(feature = "tiered-storage")]
79use grafeo_common::memory::arena::ArenaAllocator;
80#[cfg(feature = "tiered-storage")]
81use grafeo_common::mvcc::{ColdVersionRef, HotVersionRef, VersionIndex, VersionRef};
82
83#[derive(Debug, Clone)]
89pub struct LpgStoreConfig {
90 pub backward_edges: bool,
93 pub initial_node_capacity: usize,
95 pub initial_edge_capacity: usize,
97}
98
99impl Default for LpgStoreConfig {
100 fn default() -> Self {
101 Self {
102 backward_edges: true,
103 initial_node_capacity: 1024,
104 initial_edge_capacity: 4096,
105 }
106 }
107}
108
109pub struct LpgStore {
168 #[allow(dead_code)]
170 config: LpgStoreConfig,
171
172 #[cfg(not(feature = "tiered-storage"))]
176 nodes: RwLock<FxHashMap<NodeId, VersionChain<NodeRecord>>>,
177
178 #[cfg(not(feature = "tiered-storage"))]
182 edges: RwLock<FxHashMap<EdgeId, VersionChain<EdgeRecord>>>,
183
184 #[cfg(feature = "tiered-storage")]
198 arena_allocator: Arc<ArenaAllocator>,
199
200 #[cfg(feature = "tiered-storage")]
204 node_versions: RwLock<FxHashMap<NodeId, VersionIndex>>,
205
206 #[cfg(feature = "tiered-storage")]
210 edge_versions: RwLock<FxHashMap<EdgeId, VersionIndex>>,
211
212 #[cfg(feature = "tiered-storage")]
215 epoch_store: Arc<EpochStore>,
216
217 node_properties: PropertyStorage<NodeId>,
219
220 edge_properties: PropertyStorage<EdgeId>,
222
223 label_to_id: RwLock<FxHashMap<ArcStr, u32>>,
226
227 id_to_label: RwLock<Vec<ArcStr>>,
230
231 edge_type_to_id: RwLock<FxHashMap<ArcStr, u32>>,
234
235 id_to_edge_type: RwLock<Vec<ArcStr>>,
238
239 forward_adj: ChunkedAdjacency,
241
242 backward_adj: Option<ChunkedAdjacency>,
245
246 label_index: RwLock<Vec<FxHashMap<NodeId, ()>>>,
249
250 node_labels: RwLock<FxHashMap<NodeId, FxHashSet<u32>>>,
254
255 property_indexes: RwLock<FxHashMap<PropertyKey, DashMap<HashableValue, FxHashSet<NodeId>>>>,
261
262 #[cfg(feature = "vector-index")]
267 vector_indexes: RwLock<FxHashMap<String, Arc<HnswIndex>>>,
268
269 next_node_id: AtomicU64,
271
272 next_edge_id: AtomicU64,
274
275 current_epoch: AtomicU64,
277
278 statistics: RwLock<Statistics>,
281
282 needs_stats_recompute: AtomicBool,
284}
285
286impl LpgStore {
287 #[must_use]
289 pub fn new() -> Self {
290 Self::with_config(LpgStoreConfig::default())
291 }
292
293 #[must_use]
295 pub fn with_config(config: LpgStoreConfig) -> Self {
296 let backward_adj = if config.backward_edges {
297 Some(ChunkedAdjacency::new())
298 } else {
299 None
300 };
301
302 Self {
303 #[cfg(not(feature = "tiered-storage"))]
304 nodes: RwLock::new(FxHashMap::default()),
305 #[cfg(not(feature = "tiered-storage"))]
306 edges: RwLock::new(FxHashMap::default()),
307 #[cfg(feature = "tiered-storage")]
308 arena_allocator: Arc::new(ArenaAllocator::new()),
309 #[cfg(feature = "tiered-storage")]
310 node_versions: RwLock::new(FxHashMap::default()),
311 #[cfg(feature = "tiered-storage")]
312 edge_versions: RwLock::new(FxHashMap::default()),
313 #[cfg(feature = "tiered-storage")]
314 epoch_store: Arc::new(EpochStore::new()),
315 node_properties: PropertyStorage::new(),
316 edge_properties: PropertyStorage::new(),
317 label_to_id: RwLock::new(FxHashMap::default()),
318 id_to_label: RwLock::new(Vec::new()),
319 edge_type_to_id: RwLock::new(FxHashMap::default()),
320 id_to_edge_type: RwLock::new(Vec::new()),
321 forward_adj: ChunkedAdjacency::new(),
322 backward_adj,
323 label_index: RwLock::new(Vec::new()),
324 node_labels: RwLock::new(FxHashMap::default()),
325 property_indexes: RwLock::new(FxHashMap::default()),
326 #[cfg(feature = "vector-index")]
327 vector_indexes: RwLock::new(FxHashMap::default()),
328 next_node_id: AtomicU64::new(0),
329 next_edge_id: AtomicU64::new(0),
330 current_epoch: AtomicU64::new(0),
331 statistics: RwLock::new(Statistics::new()),
332 needs_stats_recompute: AtomicBool::new(true),
333 config,
334 }
335 }
336
337 #[must_use]
339 pub fn current_epoch(&self) -> EpochId {
340 EpochId::new(self.current_epoch.load(Ordering::Acquire))
341 }
342
343 pub fn new_epoch(&self) -> EpochId {
345 let id = self.current_epoch.fetch_add(1, Ordering::AcqRel) + 1;
346 EpochId::new(id)
347 }
348
349 pub fn create_node(&self, labels: &[&str]) -> NodeId {
355 self.needs_stats_recompute.store(true, Ordering::Relaxed);
356 self.create_node_versioned(labels, self.current_epoch(), TxId::SYSTEM)
357 }
358
359 #[cfg(not(feature = "tiered-storage"))]
361 pub fn create_node_versioned(&self, labels: &[&str], epoch: EpochId, tx_id: TxId) -> NodeId {
362 let id = NodeId::new(self.next_node_id.fetch_add(1, Ordering::Relaxed));
363
364 let mut record = NodeRecord::new(id, epoch);
365 record.set_label_count(labels.len() as u16);
366
367 let mut node_label_set = FxHashSet::default();
369 for label in labels {
370 let label_id = self.get_or_create_label_id(*label);
371 node_label_set.insert(label_id);
372
373 let mut index = self.label_index.write();
375 while index.len() <= label_id as usize {
376 index.push(FxHashMap::default());
377 }
378 index[label_id as usize].insert(id, ());
379 }
380
381 self.node_labels.write().insert(id, node_label_set);
383
384 let chain = VersionChain::with_initial(record, epoch, tx_id);
386 self.nodes.write().insert(id, chain);
387 id
388 }
389
390 #[cfg(feature = "tiered-storage")]
393 pub fn create_node_versioned(&self, labels: &[&str], epoch: EpochId, tx_id: TxId) -> NodeId {
394 let id = NodeId::new(self.next_node_id.fetch_add(1, Ordering::Relaxed));
395
396 let mut record = NodeRecord::new(id, epoch);
397 record.set_label_count(labels.len() as u16);
398
399 let mut node_label_set = FxHashSet::default();
401 for label in labels {
402 let label_id = self.get_or_create_label_id(*label);
403 node_label_set.insert(label_id);
404
405 let mut index = self.label_index.write();
407 while index.len() <= label_id as usize {
408 index.push(FxHashMap::default());
409 }
410 index[label_id as usize].insert(id, ());
411 }
412
413 self.node_labels.write().insert(id, node_label_set);
415
416 let arena = self.arena_allocator.arena_or_create(epoch);
418 let (offset, _stored) = arena.alloc_value_with_offset(record);
419
420 let hot_ref = HotVersionRef::new(epoch, offset, tx_id);
422
423 let mut versions = self.node_versions.write();
425 if let Some(index) = versions.get_mut(&id) {
426 index.add_hot(hot_ref);
427 } else {
428 versions.insert(id, VersionIndex::with_initial(hot_ref));
429 }
430
431 id
432 }
433
434 pub fn create_node_with_props(
436 &self,
437 labels: &[&str],
438 properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
439 ) -> NodeId {
440 self.create_node_with_props_versioned(
441 labels,
442 properties,
443 self.current_epoch(),
444 TxId::SYSTEM,
445 )
446 }
447
448 #[cfg(not(feature = "tiered-storage"))]
450 pub fn create_node_with_props_versioned(
451 &self,
452 labels: &[&str],
453 properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
454 epoch: EpochId,
455 tx_id: TxId,
456 ) -> NodeId {
457 let id = self.create_node_versioned(labels, epoch, tx_id);
458
459 for (key, value) in properties {
460 self.node_properties.set(id, key.into(), value.into());
461 }
462
463 let count = self.node_properties.get_all(id).len() as u16;
465 if let Some(chain) = self.nodes.write().get_mut(&id) {
466 if let Some(record) = chain.latest_mut() {
467 record.props_count = count;
468 }
469 }
470
471 id
472 }
473
474 #[cfg(feature = "tiered-storage")]
477 pub fn create_node_with_props_versioned(
478 &self,
479 labels: &[&str],
480 properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
481 epoch: EpochId,
482 tx_id: TxId,
483 ) -> NodeId {
484 let id = self.create_node_versioned(labels, epoch, tx_id);
485
486 for (key, value) in properties {
487 self.node_properties.set(id, key.into(), value.into());
488 }
489
490 id
494 }
495
496 #[must_use]
498 pub fn get_node(&self, id: NodeId) -> Option<Node> {
499 self.get_node_at_epoch(id, self.current_epoch())
500 }
501
502 #[must_use]
504 #[cfg(not(feature = "tiered-storage"))]
505 pub fn get_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> Option<Node> {
506 let nodes = self.nodes.read();
507 let chain = nodes.get(&id)?;
508 let record = chain.visible_at(epoch)?;
509
510 if record.is_deleted() {
511 return None;
512 }
513
514 let mut node = Node::new(id);
515
516 let id_to_label = self.id_to_label.read();
518 let node_labels = self.node_labels.read();
519 if let Some(label_ids) = node_labels.get(&id) {
520 for &label_id in label_ids {
521 if let Some(label) = id_to_label.get(label_id as usize) {
522 node.labels.push(label.clone());
523 }
524 }
525 }
526
527 node.properties = self.node_properties.get_all(id).into_iter().collect();
529
530 Some(node)
531 }
532
533 #[must_use]
536 #[cfg(feature = "tiered-storage")]
537 pub fn get_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> Option<Node> {
538 let versions = self.node_versions.read();
539 let index = versions.get(&id)?;
540 let version_ref = index.visible_at(epoch)?;
541
542 let record = self.read_node_record(&version_ref)?;
544
545 if record.is_deleted() {
546 return None;
547 }
548
549 let mut node = Node::new(id);
550
551 let id_to_label = self.id_to_label.read();
553 let node_labels = self.node_labels.read();
554 if let Some(label_ids) = node_labels.get(&id) {
555 for &label_id in label_ids {
556 if let Some(label) = id_to_label.get(label_id as usize) {
557 node.labels.push(label.clone());
558 }
559 }
560 }
561
562 node.properties = self.node_properties.get_all(id).into_iter().collect();
564
565 Some(node)
566 }
567
568 #[must_use]
570 #[cfg(not(feature = "tiered-storage"))]
571 pub fn get_node_versioned(&self, id: NodeId, epoch: EpochId, tx_id: TxId) -> Option<Node> {
572 let nodes = self.nodes.read();
573 let chain = nodes.get(&id)?;
574 let record = chain.visible_to(epoch, tx_id)?;
575
576 if record.is_deleted() {
577 return None;
578 }
579
580 let mut node = Node::new(id);
581
582 let id_to_label = self.id_to_label.read();
584 let node_labels = self.node_labels.read();
585 if let Some(label_ids) = node_labels.get(&id) {
586 for &label_id in label_ids {
587 if let Some(label) = id_to_label.get(label_id as usize) {
588 node.labels.push(label.clone());
589 }
590 }
591 }
592
593 node.properties = self.node_properties.get_all(id).into_iter().collect();
595
596 Some(node)
597 }
598
599 #[must_use]
602 #[cfg(feature = "tiered-storage")]
603 pub fn get_node_versioned(&self, id: NodeId, epoch: EpochId, tx_id: TxId) -> Option<Node> {
604 let versions = self.node_versions.read();
605 let index = versions.get(&id)?;
606 let version_ref = index.visible_to(epoch, tx_id)?;
607
608 let record = self.read_node_record(&version_ref)?;
610
611 if record.is_deleted() {
612 return None;
613 }
614
615 let mut node = Node::new(id);
616
617 let id_to_label = self.id_to_label.read();
619 let node_labels = self.node_labels.read();
620 if let Some(label_ids) = node_labels.get(&id) {
621 for &label_id in label_ids {
622 if let Some(label) = id_to_label.get(label_id as usize) {
623 node.labels.push(label.clone());
624 }
625 }
626 }
627
628 node.properties = self.node_properties.get_all(id).into_iter().collect();
630
631 Some(node)
632 }
633
634 #[cfg(feature = "tiered-storage")]
636 #[allow(unsafe_code)]
637 fn read_node_record(&self, version_ref: &VersionRef) -> Option<NodeRecord> {
638 match version_ref {
639 VersionRef::Hot(hot_ref) => {
640 let arena = self.arena_allocator.arena(hot_ref.epoch);
641 let record: &NodeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
643 Some(*record)
644 }
645 VersionRef::Cold(cold_ref) => {
646 self.epoch_store
648 .get_node(cold_ref.epoch, cold_ref.block_offset, cold_ref.length)
649 }
650 }
651 }
652
653 pub fn delete_node(&self, id: NodeId) -> bool {
655 self.needs_stats_recompute.store(true, Ordering::Relaxed);
656 self.delete_node_at_epoch(id, self.current_epoch())
657 }
658
659 #[cfg(not(feature = "tiered-storage"))]
661 pub fn delete_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> bool {
662 let mut nodes = self.nodes.write();
663 if let Some(chain) = nodes.get_mut(&id) {
664 if let Some(record) = chain.visible_at(epoch) {
666 if record.is_deleted() {
667 return false;
668 }
669 } else {
670 return false;
672 }
673
674 chain.mark_deleted(epoch);
676
677 let mut index = self.label_index.write();
679 let mut node_labels = self.node_labels.write();
680 if let Some(label_ids) = node_labels.remove(&id) {
681 for label_id in label_ids {
682 if let Some(set) = index.get_mut(label_id as usize) {
683 set.remove(&id);
684 }
685 }
686 }
687
688 drop(nodes); drop(index);
691 drop(node_labels);
692 self.node_properties.remove_all(id);
693
694 true
697 } else {
698 false
699 }
700 }
701
702 #[cfg(feature = "tiered-storage")]
705 pub fn delete_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> bool {
706 let mut versions = self.node_versions.write();
707 if let Some(index) = versions.get_mut(&id) {
708 if let Some(version_ref) = index.visible_at(epoch) {
710 if let Some(record) = self.read_node_record(&version_ref) {
711 if record.is_deleted() {
712 return false;
713 }
714 } else {
715 return false;
716 }
717 } else {
718 return false;
719 }
720
721 index.mark_deleted(epoch);
723
724 let mut label_index = self.label_index.write();
726 let mut node_labels = self.node_labels.write();
727 if let Some(label_ids) = node_labels.remove(&id) {
728 for label_id in label_ids {
729 if let Some(set) = label_index.get_mut(label_id as usize) {
730 set.remove(&id);
731 }
732 }
733 }
734
735 drop(versions);
737 drop(label_index);
738 drop(node_labels);
739 self.node_properties.remove_all(id);
740
741 true
742 } else {
743 false
744 }
745 }
746
747 #[cfg(not(feature = "tiered-storage"))]
752 pub fn delete_node_edges(&self, node_id: NodeId) {
753 let outgoing: Vec<EdgeId> = self
755 .forward_adj
756 .edges_from(node_id)
757 .into_iter()
758 .map(|(_, edge_id)| edge_id)
759 .collect();
760
761 let incoming: Vec<EdgeId> = if let Some(ref backward) = self.backward_adj {
763 backward
764 .edges_from(node_id)
765 .into_iter()
766 .map(|(_, edge_id)| edge_id)
767 .collect()
768 } else {
769 let epoch = self.current_epoch();
771 self.edges
772 .read()
773 .iter()
774 .filter_map(|(id, chain)| {
775 chain.visible_at(epoch).and_then(|r| {
776 if !r.is_deleted() && r.dst == node_id {
777 Some(*id)
778 } else {
779 None
780 }
781 })
782 })
783 .collect()
784 };
785
786 for edge_id in outgoing.into_iter().chain(incoming) {
788 self.delete_edge(edge_id);
789 }
790 }
791
792 #[cfg(feature = "tiered-storage")]
795 pub fn delete_node_edges(&self, node_id: NodeId) {
796 let outgoing: Vec<EdgeId> = self
798 .forward_adj
799 .edges_from(node_id)
800 .into_iter()
801 .map(|(_, edge_id)| edge_id)
802 .collect();
803
804 let incoming: Vec<EdgeId> = if let Some(ref backward) = self.backward_adj {
806 backward
807 .edges_from(node_id)
808 .into_iter()
809 .map(|(_, edge_id)| edge_id)
810 .collect()
811 } else {
812 let epoch = self.current_epoch();
814 let versions = self.edge_versions.read();
815 versions
816 .iter()
817 .filter_map(|(id, index)| {
818 index.visible_at(epoch).and_then(|vref| {
819 self.read_edge_record(&vref).and_then(|r| {
820 if !r.is_deleted() && r.dst == node_id {
821 Some(*id)
822 } else {
823 None
824 }
825 })
826 })
827 })
828 .collect()
829 };
830
831 for edge_id in outgoing.into_iter().chain(incoming) {
833 self.delete_edge(edge_id);
834 }
835 }
836
837 #[cfg(not(feature = "tiered-storage"))]
839 pub fn set_node_property(&self, id: NodeId, key: &str, value: Value) {
840 let prop_key: PropertyKey = key.into();
841
842 self.update_property_index_on_set(id, &prop_key, &value);
844
845 self.node_properties.set(id, prop_key, value);
846
847 let count = self.node_properties.get_all(id).len() as u16;
849 if let Some(chain) = self.nodes.write().get_mut(&id) {
850 if let Some(record) = chain.latest_mut() {
851 record.props_count = count;
852 }
853 }
854 }
855
856 #[cfg(feature = "tiered-storage")]
859 pub fn set_node_property(&self, id: NodeId, key: &str, value: Value) {
860 let prop_key: PropertyKey = key.into();
861
862 self.update_property_index_on_set(id, &prop_key, &value);
864
865 self.node_properties.set(id, prop_key, value);
866 }
870
871 pub fn set_edge_property(&self, id: EdgeId, key: &str, value: Value) {
873 self.edge_properties.set(id, key.into(), value);
874 }
875
876 #[cfg(not(feature = "tiered-storage"))]
880 pub fn remove_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
881 let prop_key: PropertyKey = key.into();
882
883 self.update_property_index_on_remove(id, &prop_key);
885
886 let result = self.node_properties.remove(id, &prop_key);
887
888 let count = self.node_properties.get_all(id).len() as u16;
890 if let Some(chain) = self.nodes.write().get_mut(&id) {
891 if let Some(record) = chain.latest_mut() {
892 record.props_count = count;
893 }
894 }
895
896 result
897 }
898
899 #[cfg(feature = "tiered-storage")]
902 pub fn remove_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
903 let prop_key: PropertyKey = key.into();
904
905 self.update_property_index_on_remove(id, &prop_key);
907
908 self.node_properties.remove(id, &prop_key)
909 }
911
912 pub fn remove_edge_property(&self, id: EdgeId, key: &str) -> Option<Value> {
916 self.edge_properties.remove(id, &key.into())
917 }
918
919 #[must_use]
934 pub fn get_node_property(&self, id: NodeId, key: &PropertyKey) -> Option<Value> {
935 self.node_properties.get(id, key)
936 }
937
938 #[must_use]
942 pub fn get_edge_property(&self, id: EdgeId, key: &PropertyKey) -> Option<Value> {
943 self.edge_properties.get(id, key)
944 }
945
946 #[must_use]
969 pub fn get_node_property_batch(&self, ids: &[NodeId], key: &PropertyKey) -> Vec<Option<Value>> {
970 self.node_properties.get_batch(ids, key)
971 }
972
973 #[must_use]
978 pub fn get_nodes_properties_batch(&self, ids: &[NodeId]) -> Vec<FxHashMap<PropertyKey, Value>> {
979 self.node_properties.get_all_batch(ids)
980 }
981
982 #[must_use]
1010 pub fn get_nodes_properties_selective_batch(
1011 &self,
1012 ids: &[NodeId],
1013 keys: &[PropertyKey],
1014 ) -> Vec<FxHashMap<PropertyKey, Value>> {
1015 self.node_properties.get_selective_batch(ids, keys)
1016 }
1017
1018 #[must_use]
1022 pub fn get_edges_properties_selective_batch(
1023 &self,
1024 ids: &[EdgeId],
1025 keys: &[PropertyKey],
1026 ) -> Vec<FxHashMap<PropertyKey, Value>> {
1027 self.edge_properties.get_selective_batch(ids, keys)
1028 }
1029
1030 #[must_use]
1066 pub fn find_nodes_in_range(
1067 &self,
1068 property: &str,
1069 min: Option<&Value>,
1070 max: Option<&Value>,
1071 min_inclusive: bool,
1072 max_inclusive: bool,
1073 ) -> Vec<NodeId> {
1074 let key = PropertyKey::new(property);
1075
1076 if !self
1078 .node_properties
1079 .might_match_range(&key, min, max, min_inclusive, max_inclusive)
1080 {
1081 return Vec::new();
1082 }
1083
1084 self.node_ids()
1086 .into_iter()
1087 .filter(|&node_id| {
1088 self.node_properties
1089 .get(node_id, &key)
1090 .is_some_and(|v| value_in_range(&v, min, max, min_inclusive, max_inclusive))
1091 })
1092 .collect()
1093 }
1094
1095 #[must_use]
1120 pub fn find_nodes_by_properties(&self, conditions: &[(&str, Value)]) -> Vec<NodeId> {
1121 if conditions.is_empty() {
1122 return self.node_ids();
1123 }
1124
1125 let mut best_start: Option<(usize, Vec<NodeId>)> = None;
1128 let indexes = self.property_indexes.read();
1129
1130 for (i, (prop, value)) in conditions.iter().enumerate() {
1131 let key = PropertyKey::new(*prop);
1132 let hv = HashableValue::new(value.clone());
1133
1134 if let Some(index) = indexes.get(&key) {
1135 let matches: Vec<NodeId> = index
1136 .get(&hv)
1137 .map(|nodes| nodes.iter().copied().collect())
1138 .unwrap_or_default();
1139
1140 if matches.is_empty() {
1142 return Vec::new();
1143 }
1144
1145 if best_start
1147 .as_ref()
1148 .is_none_or(|(_, best)| matches.len() < best.len())
1149 {
1150 best_start = Some((i, matches));
1151 }
1152 }
1153 }
1154 drop(indexes);
1155
1156 let (start_idx, mut candidates) = best_start.unwrap_or_else(|| {
1158 let (prop, value) = &conditions[0];
1160 (0, self.find_nodes_by_property(prop, value))
1161 });
1162
1163 for (i, (prop, value)) in conditions.iter().enumerate() {
1165 if i == start_idx {
1166 continue;
1167 }
1168
1169 let key = PropertyKey::new(*prop);
1170 candidates.retain(|&node_id| {
1171 self.node_properties
1172 .get(node_id, &key)
1173 .is_some_and(|v| v == *value)
1174 });
1175
1176 if candidates.is_empty() {
1178 return Vec::new();
1179 }
1180 }
1181
1182 candidates
1183 }
1184
1185 pub fn create_property_index(&self, property: &str) {
1213 let key = PropertyKey::new(property);
1214
1215 let mut indexes = self.property_indexes.write();
1216 if indexes.contains_key(&key) {
1217 return; }
1219
1220 let index: DashMap<HashableValue, FxHashSet<NodeId>> = DashMap::new();
1222
1223 for node_id in self.node_ids() {
1225 if let Some(value) = self.node_properties.get(node_id, &key) {
1226 let hv = HashableValue::new(value);
1227 index
1228 .entry(hv)
1229 .or_insert_with(FxHashSet::default)
1230 .insert(node_id);
1231 }
1232 }
1233
1234 indexes.insert(key, index);
1235 }
1236
1237 pub fn drop_property_index(&self, property: &str) -> bool {
1241 let key = PropertyKey::new(property);
1242 self.property_indexes.write().remove(&key).is_some()
1243 }
1244
1245 #[must_use]
1247 pub fn has_property_index(&self, property: &str) -> bool {
1248 let key = PropertyKey::new(property);
1249 self.property_indexes.read().contains_key(&key)
1250 }
1251
1252 #[cfg(feature = "vector-index")]
1254 pub fn add_vector_index(&self, label: &str, property: &str, index: Arc<HnswIndex>) {
1255 let key = format!("{label}:{property}");
1256 self.vector_indexes.write().insert(key, index);
1257 }
1258
1259 #[cfg(feature = "vector-index")]
1261 #[must_use]
1262 pub fn get_vector_index(&self, label: &str, property: &str) -> Option<Arc<HnswIndex>> {
1263 let key = format!("{label}:{property}");
1264 self.vector_indexes.read().get(&key).cloned()
1265 }
1266
1267 #[must_use]
1290 pub fn find_nodes_by_property(&self, property: &str, value: &Value) -> Vec<NodeId> {
1291 let key = PropertyKey::new(property);
1292 let hv = HashableValue::new(value.clone());
1293
1294 let indexes = self.property_indexes.read();
1296 if let Some(index) = indexes.get(&key) {
1297 if let Some(nodes) = index.get(&hv) {
1298 return nodes.iter().copied().collect();
1299 }
1300 return Vec::new();
1301 }
1302 drop(indexes);
1303
1304 self.node_ids()
1306 .into_iter()
1307 .filter(|&node_id| {
1308 self.node_properties
1309 .get(node_id, &key)
1310 .is_some_and(|v| v == *value)
1311 })
1312 .collect()
1313 }
1314
1315 fn update_property_index_on_set(&self, node_id: NodeId, key: &PropertyKey, new_value: &Value) {
1317 let indexes = self.property_indexes.read();
1318 if let Some(index) = indexes.get(key) {
1319 if let Some(old_value) = self.node_properties.get(node_id, key) {
1321 let old_hv = HashableValue::new(old_value);
1322 if let Some(mut nodes) = index.get_mut(&old_hv) {
1323 nodes.remove(&node_id);
1324 if nodes.is_empty() {
1325 drop(nodes);
1326 index.remove(&old_hv);
1327 }
1328 }
1329 }
1330
1331 let new_hv = HashableValue::new(new_value.clone());
1333 index
1334 .entry(new_hv)
1335 .or_insert_with(FxHashSet::default)
1336 .insert(node_id);
1337 }
1338 }
1339
1340 fn update_property_index_on_remove(&self, node_id: NodeId, key: &PropertyKey) {
1342 let indexes = self.property_indexes.read();
1343 if let Some(index) = indexes.get(key) {
1344 if let Some(old_value) = self.node_properties.get(node_id, key) {
1346 let old_hv = HashableValue::new(old_value);
1347 if let Some(mut nodes) = index.get_mut(&old_hv) {
1348 nodes.remove(&node_id);
1349 if nodes.is_empty() {
1350 drop(nodes);
1351 index.remove(&old_hv);
1352 }
1353 }
1354 }
1355 }
1356 }
1357
1358 #[cfg(not(feature = "tiered-storage"))]
1363 pub fn add_label(&self, node_id: NodeId, label: &str) -> bool {
1364 let epoch = self.current_epoch();
1365
1366 let nodes = self.nodes.read();
1368 if let Some(chain) = nodes.get(&node_id) {
1369 if chain.visible_at(epoch).map_or(true, |r| r.is_deleted()) {
1370 return false;
1371 }
1372 } else {
1373 return false;
1374 }
1375 drop(nodes);
1376
1377 let label_id = self.get_or_create_label_id(label);
1379
1380 let mut node_labels = self.node_labels.write();
1382 let label_set = node_labels
1383 .entry(node_id)
1384 .or_insert_with(FxHashSet::default);
1385
1386 if label_set.contains(&label_id) {
1387 return false; }
1389
1390 label_set.insert(label_id);
1391 drop(node_labels);
1392
1393 let mut index = self.label_index.write();
1395 if (label_id as usize) >= index.len() {
1396 index.resize(label_id as usize + 1, FxHashMap::default());
1397 }
1398 index[label_id as usize].insert(node_id, ());
1399
1400 if let Some(chain) = self.nodes.write().get_mut(&node_id) {
1402 if let Some(record) = chain.latest_mut() {
1403 let count = self.node_labels.read().get(&node_id).map_or(0, |s| s.len());
1404 record.set_label_count(count as u16);
1405 }
1406 }
1407
1408 true
1409 }
1410
1411 #[cfg(feature = "tiered-storage")]
1414 pub fn add_label(&self, node_id: NodeId, label: &str) -> bool {
1415 let epoch = self.current_epoch();
1416
1417 let versions = self.node_versions.read();
1419 if let Some(index) = versions.get(&node_id) {
1420 if let Some(vref) = index.visible_at(epoch) {
1421 if let Some(record) = self.read_node_record(&vref) {
1422 if record.is_deleted() {
1423 return false;
1424 }
1425 } else {
1426 return false;
1427 }
1428 } else {
1429 return false;
1430 }
1431 } else {
1432 return false;
1433 }
1434 drop(versions);
1435
1436 let label_id = self.get_or_create_label_id(label);
1438
1439 let mut node_labels = self.node_labels.write();
1441 let label_set = node_labels
1442 .entry(node_id)
1443 .or_insert_with(FxHashSet::default);
1444
1445 if label_set.contains(&label_id) {
1446 return false; }
1448
1449 label_set.insert(label_id);
1450 drop(node_labels);
1451
1452 let mut index = self.label_index.write();
1454 if (label_id as usize) >= index.len() {
1455 index.resize(label_id as usize + 1, FxHashMap::default());
1456 }
1457 index[label_id as usize].insert(node_id, ());
1458
1459 true
1463 }
1464
1465 #[cfg(not(feature = "tiered-storage"))]
1470 pub fn remove_label(&self, node_id: NodeId, label: &str) -> bool {
1471 let epoch = self.current_epoch();
1472
1473 let nodes = self.nodes.read();
1475 if let Some(chain) = nodes.get(&node_id) {
1476 if chain.visible_at(epoch).map_or(true, |r| r.is_deleted()) {
1477 return false;
1478 }
1479 } else {
1480 return false;
1481 }
1482 drop(nodes);
1483
1484 let label_id = {
1486 let label_ids = self.label_to_id.read();
1487 match label_ids.get(label) {
1488 Some(&id) => id,
1489 None => return false, }
1491 };
1492
1493 let mut node_labels = self.node_labels.write();
1495 if let Some(label_set) = node_labels.get_mut(&node_id) {
1496 if !label_set.remove(&label_id) {
1497 return false; }
1499 } else {
1500 return false;
1501 }
1502 drop(node_labels);
1503
1504 let mut index = self.label_index.write();
1506 if (label_id as usize) < index.len() {
1507 index[label_id as usize].remove(&node_id);
1508 }
1509
1510 if let Some(chain) = self.nodes.write().get_mut(&node_id) {
1512 if let Some(record) = chain.latest_mut() {
1513 let count = self.node_labels.read().get(&node_id).map_or(0, |s| s.len());
1514 record.set_label_count(count as u16);
1515 }
1516 }
1517
1518 true
1519 }
1520
1521 #[cfg(feature = "tiered-storage")]
1524 pub fn remove_label(&self, node_id: NodeId, label: &str) -> bool {
1525 let epoch = self.current_epoch();
1526
1527 let versions = self.node_versions.read();
1529 if let Some(index) = versions.get(&node_id) {
1530 if let Some(vref) = index.visible_at(epoch) {
1531 if let Some(record) = self.read_node_record(&vref) {
1532 if record.is_deleted() {
1533 return false;
1534 }
1535 } else {
1536 return false;
1537 }
1538 } else {
1539 return false;
1540 }
1541 } else {
1542 return false;
1543 }
1544 drop(versions);
1545
1546 let label_id = {
1548 let label_ids = self.label_to_id.read();
1549 match label_ids.get(label) {
1550 Some(&id) => id,
1551 None => return false, }
1553 };
1554
1555 let mut node_labels = self.node_labels.write();
1557 if let Some(label_set) = node_labels.get_mut(&node_id) {
1558 if !label_set.remove(&label_id) {
1559 return false; }
1561 } else {
1562 return false;
1563 }
1564 drop(node_labels);
1565
1566 let mut index = self.label_index.write();
1568 if (label_id as usize) < index.len() {
1569 index[label_id as usize].remove(&node_id);
1570 }
1571
1572 true
1575 }
1576
1577 #[must_use]
1579 #[cfg(not(feature = "tiered-storage"))]
1580 pub fn node_count(&self) -> usize {
1581 let epoch = self.current_epoch();
1582 self.nodes
1583 .read()
1584 .values()
1585 .filter_map(|chain| chain.visible_at(epoch))
1586 .filter(|r| !r.is_deleted())
1587 .count()
1588 }
1589
1590 #[must_use]
1593 #[cfg(feature = "tiered-storage")]
1594 pub fn node_count(&self) -> usize {
1595 let epoch = self.current_epoch();
1596 let versions = self.node_versions.read();
1597 versions
1598 .iter()
1599 .filter(|(_, index)| {
1600 index.visible_at(epoch).map_or(false, |vref| {
1601 self.read_node_record(&vref)
1602 .map_or(false, |r| !r.is_deleted())
1603 })
1604 })
1605 .count()
1606 }
1607
1608 #[must_use]
1614 #[cfg(not(feature = "tiered-storage"))]
1615 pub fn node_ids(&self) -> Vec<NodeId> {
1616 let epoch = self.current_epoch();
1617 let mut ids: Vec<NodeId> = self
1618 .nodes
1619 .read()
1620 .iter()
1621 .filter_map(|(id, chain)| {
1622 chain
1623 .visible_at(epoch)
1624 .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
1625 })
1626 .collect();
1627 ids.sort_unstable();
1628 ids
1629 }
1630
1631 #[must_use]
1634 #[cfg(feature = "tiered-storage")]
1635 pub fn node_ids(&self) -> Vec<NodeId> {
1636 let epoch = self.current_epoch();
1637 let versions = self.node_versions.read();
1638 let mut ids: Vec<NodeId> = versions
1639 .iter()
1640 .filter_map(|(id, index)| {
1641 index.visible_at(epoch).and_then(|vref| {
1642 self.read_node_record(&vref)
1643 .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
1644 })
1645 })
1646 .collect();
1647 ids.sort_unstable();
1648 ids
1649 }
1650
1651 pub fn create_edge(&self, src: NodeId, dst: NodeId, edge_type: &str) -> EdgeId {
1655 self.needs_stats_recompute.store(true, Ordering::Relaxed);
1656 self.create_edge_versioned(src, dst, edge_type, self.current_epoch(), TxId::SYSTEM)
1657 }
1658
1659 #[cfg(not(feature = "tiered-storage"))]
1661 pub fn create_edge_versioned(
1662 &self,
1663 src: NodeId,
1664 dst: NodeId,
1665 edge_type: &str,
1666 epoch: EpochId,
1667 tx_id: TxId,
1668 ) -> EdgeId {
1669 let id = EdgeId::new(self.next_edge_id.fetch_add(1, Ordering::Relaxed));
1670 let type_id = self.get_or_create_edge_type_id(edge_type);
1671
1672 let record = EdgeRecord::new(id, src, dst, type_id, epoch);
1673 let chain = VersionChain::with_initial(record, epoch, tx_id);
1674 self.edges.write().insert(id, chain);
1675
1676 self.forward_adj.add_edge(src, dst, id);
1678 if let Some(ref backward) = self.backward_adj {
1679 backward.add_edge(dst, src, id);
1680 }
1681
1682 id
1683 }
1684
1685 #[cfg(feature = "tiered-storage")]
1688 pub fn create_edge_versioned(
1689 &self,
1690 src: NodeId,
1691 dst: NodeId,
1692 edge_type: &str,
1693 epoch: EpochId,
1694 tx_id: TxId,
1695 ) -> EdgeId {
1696 let id = EdgeId::new(self.next_edge_id.fetch_add(1, Ordering::Relaxed));
1697 let type_id = self.get_or_create_edge_type_id(edge_type);
1698
1699 let record = EdgeRecord::new(id, src, dst, type_id, epoch);
1700
1701 let arena = self.arena_allocator.arena_or_create(epoch);
1703 let (offset, _stored) = arena.alloc_value_with_offset(record);
1704
1705 let hot_ref = HotVersionRef::new(epoch, offset, tx_id);
1707
1708 let mut versions = self.edge_versions.write();
1710 if let Some(index) = versions.get_mut(&id) {
1711 index.add_hot(hot_ref);
1712 } else {
1713 versions.insert(id, VersionIndex::with_initial(hot_ref));
1714 }
1715
1716 self.forward_adj.add_edge(src, dst, id);
1718 if let Some(ref backward) = self.backward_adj {
1719 backward.add_edge(dst, src, id);
1720 }
1721
1722 id
1723 }
1724
1725 pub fn create_edge_with_props(
1727 &self,
1728 src: NodeId,
1729 dst: NodeId,
1730 edge_type: &str,
1731 properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
1732 ) -> EdgeId {
1733 let id = self.create_edge(src, dst, edge_type);
1734
1735 for (key, value) in properties {
1736 self.edge_properties.set(id, key.into(), value.into());
1737 }
1738
1739 id
1740 }
1741
1742 #[must_use]
1744 pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
1745 self.get_edge_at_epoch(id, self.current_epoch())
1746 }
1747
1748 #[must_use]
1750 #[cfg(not(feature = "tiered-storage"))]
1751 pub fn get_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> Option<Edge> {
1752 let edges = self.edges.read();
1753 let chain = edges.get(&id)?;
1754 let record = chain.visible_at(epoch)?;
1755
1756 if record.is_deleted() {
1757 return None;
1758 }
1759
1760 let edge_type = {
1761 let id_to_type = self.id_to_edge_type.read();
1762 id_to_type.get(record.type_id as usize)?.clone()
1763 };
1764
1765 let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1766
1767 edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1769
1770 Some(edge)
1771 }
1772
1773 #[must_use]
1776 #[cfg(feature = "tiered-storage")]
1777 pub fn get_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> Option<Edge> {
1778 let versions = self.edge_versions.read();
1779 let index = versions.get(&id)?;
1780 let version_ref = index.visible_at(epoch)?;
1781
1782 let record = self.read_edge_record(&version_ref)?;
1783
1784 if record.is_deleted() {
1785 return None;
1786 }
1787
1788 let edge_type = {
1789 let id_to_type = self.id_to_edge_type.read();
1790 id_to_type.get(record.type_id as usize)?.clone()
1791 };
1792
1793 let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1794
1795 edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1797
1798 Some(edge)
1799 }
1800
1801 #[must_use]
1803 #[cfg(not(feature = "tiered-storage"))]
1804 pub fn get_edge_versioned(&self, id: EdgeId, epoch: EpochId, tx_id: TxId) -> Option<Edge> {
1805 let edges = self.edges.read();
1806 let chain = edges.get(&id)?;
1807 let record = chain.visible_to(epoch, tx_id)?;
1808
1809 if record.is_deleted() {
1810 return None;
1811 }
1812
1813 let edge_type = {
1814 let id_to_type = self.id_to_edge_type.read();
1815 id_to_type.get(record.type_id as usize)?.clone()
1816 };
1817
1818 let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1819
1820 edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1822
1823 Some(edge)
1824 }
1825
1826 #[must_use]
1829 #[cfg(feature = "tiered-storage")]
1830 pub fn get_edge_versioned(&self, id: EdgeId, epoch: EpochId, tx_id: TxId) -> Option<Edge> {
1831 let versions = self.edge_versions.read();
1832 let index = versions.get(&id)?;
1833 let version_ref = index.visible_to(epoch, tx_id)?;
1834
1835 let record = self.read_edge_record(&version_ref)?;
1836
1837 if record.is_deleted() {
1838 return None;
1839 }
1840
1841 let edge_type = {
1842 let id_to_type = self.id_to_edge_type.read();
1843 id_to_type.get(record.type_id as usize)?.clone()
1844 };
1845
1846 let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1847
1848 edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1850
1851 Some(edge)
1852 }
1853
1854 #[cfg(feature = "tiered-storage")]
1856 #[allow(unsafe_code)]
1857 fn read_edge_record(&self, version_ref: &VersionRef) -> Option<EdgeRecord> {
1858 match version_ref {
1859 VersionRef::Hot(hot_ref) => {
1860 let arena = self.arena_allocator.arena(hot_ref.epoch);
1861 let record: &EdgeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
1863 Some(*record)
1864 }
1865 VersionRef::Cold(cold_ref) => {
1866 self.epoch_store
1868 .get_edge(cold_ref.epoch, cold_ref.block_offset, cold_ref.length)
1869 }
1870 }
1871 }
1872
1873 pub fn delete_edge(&self, id: EdgeId) -> bool {
1875 self.needs_stats_recompute.store(true, Ordering::Relaxed);
1876 self.delete_edge_at_epoch(id, self.current_epoch())
1877 }
1878
1879 #[cfg(not(feature = "tiered-storage"))]
1881 pub fn delete_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> bool {
1882 let mut edges = self.edges.write();
1883 if let Some(chain) = edges.get_mut(&id) {
1884 let (src, dst) = {
1886 match chain.visible_at(epoch) {
1887 Some(record) => {
1888 if record.is_deleted() {
1889 return false;
1890 }
1891 (record.src, record.dst)
1892 }
1893 None => return false, }
1895 };
1896
1897 chain.mark_deleted(epoch);
1899
1900 drop(edges); self.forward_adj.mark_deleted(src, id);
1904 if let Some(ref backward) = self.backward_adj {
1905 backward.mark_deleted(dst, id);
1906 }
1907
1908 self.edge_properties.remove_all(id);
1910
1911 true
1912 } else {
1913 false
1914 }
1915 }
1916
1917 #[cfg(feature = "tiered-storage")]
1920 pub fn delete_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> bool {
1921 let mut versions = self.edge_versions.write();
1922 if let Some(index) = versions.get_mut(&id) {
1923 let (src, dst) = {
1925 match index.visible_at(epoch) {
1926 Some(version_ref) => {
1927 if let Some(record) = self.read_edge_record(&version_ref) {
1928 if record.is_deleted() {
1929 return false;
1930 }
1931 (record.src, record.dst)
1932 } else {
1933 return false;
1934 }
1935 }
1936 None => return false,
1937 }
1938 };
1939
1940 index.mark_deleted(epoch);
1942
1943 drop(versions); self.forward_adj.mark_deleted(src, id);
1947 if let Some(ref backward) = self.backward_adj {
1948 backward.mark_deleted(dst, id);
1949 }
1950
1951 self.edge_properties.remove_all(id);
1953
1954 true
1955 } else {
1956 false
1957 }
1958 }
1959
1960 #[must_use]
1962 #[cfg(not(feature = "tiered-storage"))]
1963 pub fn edge_count(&self) -> usize {
1964 let epoch = self.current_epoch();
1965 self.edges
1966 .read()
1967 .values()
1968 .filter_map(|chain| chain.visible_at(epoch))
1969 .filter(|r| !r.is_deleted())
1970 .count()
1971 }
1972
1973 #[must_use]
1976 #[cfg(feature = "tiered-storage")]
1977 pub fn edge_count(&self) -> usize {
1978 let epoch = self.current_epoch();
1979 let versions = self.edge_versions.read();
1980 versions
1981 .iter()
1982 .filter(|(_, index)| {
1983 index.visible_at(epoch).map_or(false, |vref| {
1984 self.read_edge_record(&vref)
1985 .map_or(false, |r| !r.is_deleted())
1986 })
1987 })
1988 .count()
1989 }
1990
1991 #[cfg(not(feature = "tiered-storage"))]
1996 pub fn discard_uncommitted_versions(&self, tx_id: TxId) {
1997 {
1999 let mut nodes = self.nodes.write();
2000 for chain in nodes.values_mut() {
2001 chain.remove_versions_by(tx_id);
2002 }
2003 nodes.retain(|_, chain| !chain.is_empty());
2005 }
2006
2007 {
2009 let mut edges = self.edges.write();
2010 for chain in edges.values_mut() {
2011 chain.remove_versions_by(tx_id);
2012 }
2013 edges.retain(|_, chain| !chain.is_empty());
2015 }
2016 }
2017
2018 #[cfg(feature = "tiered-storage")]
2021 pub fn discard_uncommitted_versions(&self, tx_id: TxId) {
2022 {
2024 let mut versions = self.node_versions.write();
2025 for index in versions.values_mut() {
2026 index.remove_versions_by(tx_id);
2027 }
2028 versions.retain(|_, index| !index.is_empty());
2030 }
2031
2032 {
2034 let mut versions = self.edge_versions.write();
2035 for index in versions.values_mut() {
2036 index.remove_versions_by(tx_id);
2037 }
2038 versions.retain(|_, index| !index.is_empty());
2040 }
2041 }
2042
2043 #[cfg(feature = "tiered-storage")]
2062 #[allow(unsafe_code)]
2063 pub fn freeze_epoch(&self, epoch: EpochId) -> usize {
2064 let mut node_records: Vec<(u64, NodeRecord)> = Vec::new();
2066 let mut node_hot_refs: Vec<(NodeId, HotVersionRef)> = Vec::new();
2067
2068 {
2069 let versions = self.node_versions.read();
2070 for (node_id, index) in versions.iter() {
2071 for hot_ref in index.hot_refs_for_epoch(epoch) {
2072 let arena = self.arena_allocator.arena(hot_ref.epoch);
2073 let record: &NodeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
2075 node_records.push((node_id.as_u64(), *record));
2076 node_hot_refs.push((*node_id, *hot_ref));
2077 }
2078 }
2079 }
2080
2081 let mut edge_records: Vec<(u64, EdgeRecord)> = Vec::new();
2083 let mut edge_hot_refs: Vec<(EdgeId, HotVersionRef)> = Vec::new();
2084
2085 {
2086 let versions = self.edge_versions.read();
2087 for (edge_id, index) in versions.iter() {
2088 for hot_ref in index.hot_refs_for_epoch(epoch) {
2089 let arena = self.arena_allocator.arena(hot_ref.epoch);
2090 let record: &EdgeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
2092 edge_records.push((edge_id.as_u64(), *record));
2093 edge_hot_refs.push((*edge_id, *hot_ref));
2094 }
2095 }
2096 }
2097
2098 let total_frozen = node_records.len() + edge_records.len();
2099
2100 if total_frozen == 0 {
2101 return 0;
2102 }
2103
2104 let (node_entries, edge_entries) =
2106 self.epoch_store
2107 .freeze_epoch(epoch, node_records, edge_records);
2108
2109 let node_entry_map: FxHashMap<u64, _> = node_entries
2111 .iter()
2112 .map(|e| (e.entity_id, (e.offset, e.length)))
2113 .collect();
2114 let edge_entry_map: FxHashMap<u64, _> = edge_entries
2115 .iter()
2116 .map(|e| (e.entity_id, (e.offset, e.length)))
2117 .collect();
2118
2119 {
2121 let mut versions = self.node_versions.write();
2122 for (node_id, hot_ref) in &node_hot_refs {
2123 if let Some(index) = versions.get_mut(node_id) {
2124 if let Some(&(offset, length)) = node_entry_map.get(&node_id.as_u64()) {
2125 let cold_ref = ColdVersionRef {
2126 epoch,
2127 block_offset: offset,
2128 length,
2129 created_by: hot_ref.created_by,
2130 deleted_epoch: hot_ref.deleted_epoch,
2131 };
2132 index.freeze_epoch(epoch, std::iter::once(cold_ref));
2133 }
2134 }
2135 }
2136 }
2137
2138 {
2139 let mut versions = self.edge_versions.write();
2140 for (edge_id, hot_ref) in &edge_hot_refs {
2141 if let Some(index) = versions.get_mut(edge_id) {
2142 if let Some(&(offset, length)) = edge_entry_map.get(&edge_id.as_u64()) {
2143 let cold_ref = ColdVersionRef {
2144 epoch,
2145 block_offset: offset,
2146 length,
2147 created_by: hot_ref.created_by,
2148 deleted_epoch: hot_ref.deleted_epoch,
2149 };
2150 index.freeze_epoch(epoch, std::iter::once(cold_ref));
2151 }
2152 }
2153 }
2154 }
2155
2156 total_frozen
2157 }
2158
2159 #[cfg(feature = "tiered-storage")]
2161 #[must_use]
2162 pub fn epoch_store(&self) -> &EpochStore {
2163 &self.epoch_store
2164 }
2165
2166 #[must_use]
2168 pub fn label_count(&self) -> usize {
2169 self.id_to_label.read().len()
2170 }
2171
2172 #[must_use]
2176 pub fn property_key_count(&self) -> usize {
2177 let node_keys = self.node_properties.column_count();
2178 let edge_keys = self.edge_properties.column_count();
2179 node_keys + edge_keys
2183 }
2184
2185 #[must_use]
2187 pub fn edge_type_count(&self) -> usize {
2188 self.id_to_edge_type.read().len()
2189 }
2190
2191 pub fn neighbors(
2198 &self,
2199 node: NodeId,
2200 direction: Direction,
2201 ) -> impl Iterator<Item = NodeId> + '_ {
2202 let forward: Box<dyn Iterator<Item = NodeId>> = match direction {
2203 Direction::Outgoing | Direction::Both => {
2204 Box::new(self.forward_adj.neighbors(node).into_iter())
2205 }
2206 Direction::Incoming => Box::new(std::iter::empty()),
2207 };
2208
2209 let backward: Box<dyn Iterator<Item = NodeId>> = match direction {
2210 Direction::Incoming | Direction::Both => {
2211 if let Some(ref adj) = self.backward_adj {
2212 Box::new(adj.neighbors(node).into_iter())
2213 } else {
2214 Box::new(std::iter::empty())
2215 }
2216 }
2217 Direction::Outgoing => Box::new(std::iter::empty()),
2218 };
2219
2220 forward.chain(backward)
2221 }
2222
2223 pub fn edges_from(
2227 &self,
2228 node: NodeId,
2229 direction: Direction,
2230 ) -> impl Iterator<Item = (NodeId, EdgeId)> + '_ {
2231 let forward: Box<dyn Iterator<Item = (NodeId, EdgeId)>> = match direction {
2232 Direction::Outgoing | Direction::Both => {
2233 Box::new(self.forward_adj.edges_from(node).into_iter())
2234 }
2235 Direction::Incoming => Box::new(std::iter::empty()),
2236 };
2237
2238 let backward: Box<dyn Iterator<Item = (NodeId, EdgeId)>> = match direction {
2239 Direction::Incoming | Direction::Both => {
2240 if let Some(ref adj) = self.backward_adj {
2241 Box::new(adj.edges_from(node).into_iter())
2242 } else {
2243 Box::new(std::iter::empty())
2244 }
2245 }
2246 Direction::Outgoing => Box::new(std::iter::empty()),
2247 };
2248
2249 forward.chain(backward)
2250 }
2251
2252 pub fn edges_to(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
2265 if let Some(ref backward) = self.backward_adj {
2266 backward.edges_from(node)
2267 } else {
2268 self.all_edges()
2270 .filter_map(|edge| {
2271 if edge.dst == node {
2272 Some((edge.src, edge.id))
2273 } else {
2274 None
2275 }
2276 })
2277 .collect()
2278 }
2279 }
2280
2281 #[must_use]
2285 pub fn out_degree(&self, node: NodeId) -> usize {
2286 self.forward_adj.out_degree(node)
2287 }
2288
2289 #[must_use]
2294 pub fn in_degree(&self, node: NodeId) -> usize {
2295 if let Some(ref backward) = self.backward_adj {
2296 backward.in_degree(node)
2297 } else {
2298 self.all_edges().filter(|edge| edge.dst == node).count()
2300 }
2301 }
2302
2303 #[must_use]
2305 #[cfg(not(feature = "tiered-storage"))]
2306 pub fn edge_type(&self, id: EdgeId) -> Option<ArcStr> {
2307 let edges = self.edges.read();
2308 let chain = edges.get(&id)?;
2309 let epoch = self.current_epoch();
2310 let record = chain.visible_at(epoch)?;
2311 let id_to_type = self.id_to_edge_type.read();
2312 id_to_type.get(record.type_id as usize).cloned()
2313 }
2314
2315 #[must_use]
2318 #[cfg(feature = "tiered-storage")]
2319 pub fn edge_type(&self, id: EdgeId) -> Option<ArcStr> {
2320 let versions = self.edge_versions.read();
2321 let index = versions.get(&id)?;
2322 let epoch = self.current_epoch();
2323 let vref = index.visible_at(epoch)?;
2324 let record = self.read_edge_record(&vref)?;
2325 let id_to_type = self.id_to_edge_type.read();
2326 id_to_type.get(record.type_id as usize).cloned()
2327 }
2328
2329 pub fn nodes_by_label(&self, label: &str) -> Vec<NodeId> {
2335 let label_to_id = self.label_to_id.read();
2336 if let Some(&label_id) = label_to_id.get(label) {
2337 let index = self.label_index.read();
2338 if let Some(set) = index.get(label_id as usize) {
2339 let mut ids: Vec<NodeId> = set.keys().copied().collect();
2340 ids.sort_unstable();
2341 return ids;
2342 }
2343 }
2344 Vec::new()
2345 }
2346
2347 #[cfg(not(feature = "tiered-storage"))]
2354 pub fn all_nodes(&self) -> impl Iterator<Item = Node> + '_ {
2355 let epoch = self.current_epoch();
2356 let node_ids: Vec<NodeId> = self
2357 .nodes
2358 .read()
2359 .iter()
2360 .filter_map(|(id, chain)| {
2361 chain
2362 .visible_at(epoch)
2363 .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
2364 })
2365 .collect();
2366
2367 node_ids.into_iter().filter_map(move |id| self.get_node(id))
2368 }
2369
2370 #[cfg(feature = "tiered-storage")]
2373 pub fn all_nodes(&self) -> impl Iterator<Item = Node> + '_ {
2374 let node_ids = self.node_ids();
2375 node_ids.into_iter().filter_map(move |id| self.get_node(id))
2376 }
2377
2378 #[cfg(not(feature = "tiered-storage"))]
2383 pub fn all_edges(&self) -> impl Iterator<Item = Edge> + '_ {
2384 let epoch = self.current_epoch();
2385 let edge_ids: Vec<EdgeId> = self
2386 .edges
2387 .read()
2388 .iter()
2389 .filter_map(|(id, chain)| {
2390 chain
2391 .visible_at(epoch)
2392 .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
2393 })
2394 .collect();
2395
2396 edge_ids.into_iter().filter_map(move |id| self.get_edge(id))
2397 }
2398
2399 #[cfg(feature = "tiered-storage")]
2402 pub fn all_edges(&self) -> impl Iterator<Item = Edge> + '_ {
2403 let epoch = self.current_epoch();
2404 let versions = self.edge_versions.read();
2405 let edge_ids: Vec<EdgeId> = versions
2406 .iter()
2407 .filter_map(|(id, index)| {
2408 index.visible_at(epoch).and_then(|vref| {
2409 self.read_edge_record(&vref)
2410 .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
2411 })
2412 })
2413 .collect();
2414
2415 edge_ids.into_iter().filter_map(move |id| self.get_edge(id))
2416 }
2417
2418 pub fn all_labels(&self) -> Vec<String> {
2420 self.id_to_label
2421 .read()
2422 .iter()
2423 .map(|s| s.to_string())
2424 .collect()
2425 }
2426
2427 pub fn all_edge_types(&self) -> Vec<String> {
2429 self.id_to_edge_type
2430 .read()
2431 .iter()
2432 .map(|s| s.to_string())
2433 .collect()
2434 }
2435
2436 pub fn all_property_keys(&self) -> Vec<String> {
2438 let mut keys = std::collections::HashSet::new();
2439 for key in self.node_properties.keys() {
2440 keys.insert(key.to_string());
2441 }
2442 for key in self.edge_properties.keys() {
2443 keys.insert(key.to_string());
2444 }
2445 keys.into_iter().collect()
2446 }
2447
2448 pub fn nodes_with_label<'a>(&'a self, label: &str) -> impl Iterator<Item = Node> + 'a {
2450 let node_ids = self.nodes_by_label(label);
2451 node_ids.into_iter().filter_map(move |id| self.get_node(id))
2452 }
2453
2454 #[cfg(not(feature = "tiered-storage"))]
2456 pub fn edges_with_type<'a>(&'a self, edge_type: &str) -> impl Iterator<Item = Edge> + 'a {
2457 let epoch = self.current_epoch();
2458 let type_to_id = self.edge_type_to_id.read();
2459
2460 if let Some(&type_id) = type_to_id.get(edge_type) {
2461 let edge_ids: Vec<EdgeId> = self
2462 .edges
2463 .read()
2464 .iter()
2465 .filter_map(|(id, chain)| {
2466 chain.visible_at(epoch).and_then(|r| {
2467 if !r.is_deleted() && r.type_id == type_id {
2468 Some(*id)
2469 } else {
2470 None
2471 }
2472 })
2473 })
2474 .collect();
2475
2476 Box::new(edge_ids.into_iter().filter_map(move |id| self.get_edge(id)))
2478 as Box<dyn Iterator<Item = Edge> + 'a>
2479 } else {
2480 Box::new(std::iter::empty()) as Box<dyn Iterator<Item = Edge> + 'a>
2482 }
2483 }
2484
2485 #[cfg(feature = "tiered-storage")]
2488 pub fn edges_with_type<'a>(&'a self, edge_type: &str) -> impl Iterator<Item = Edge> + 'a {
2489 let epoch = self.current_epoch();
2490 let type_to_id = self.edge_type_to_id.read();
2491
2492 if let Some(&type_id) = type_to_id.get(edge_type) {
2493 let versions = self.edge_versions.read();
2494 let edge_ids: Vec<EdgeId> = versions
2495 .iter()
2496 .filter_map(|(id, index)| {
2497 index.visible_at(epoch).and_then(|vref| {
2498 self.read_edge_record(&vref).and_then(|r| {
2499 if !r.is_deleted() && r.type_id == type_id {
2500 Some(*id)
2501 } else {
2502 None
2503 }
2504 })
2505 })
2506 })
2507 .collect();
2508
2509 Box::new(edge_ids.into_iter().filter_map(move |id| self.get_edge(id)))
2510 as Box<dyn Iterator<Item = Edge> + 'a>
2511 } else {
2512 Box::new(std::iter::empty()) as Box<dyn Iterator<Item = Edge> + 'a>
2513 }
2514 }
2515
2516 #[must_use]
2523 pub fn node_property_might_match(
2524 &self,
2525 property: &PropertyKey,
2526 op: CompareOp,
2527 value: &Value,
2528 ) -> bool {
2529 self.node_properties.might_match(property, op, value)
2530 }
2531
2532 #[must_use]
2534 pub fn edge_property_might_match(
2535 &self,
2536 property: &PropertyKey,
2537 op: CompareOp,
2538 value: &Value,
2539 ) -> bool {
2540 self.edge_properties.might_match(property, op, value)
2541 }
2542
2543 #[must_use]
2545 pub fn node_property_zone_map(&self, property: &PropertyKey) -> Option<ZoneMapEntry> {
2546 self.node_properties.zone_map(property)
2547 }
2548
2549 #[must_use]
2551 pub fn edge_property_zone_map(&self, property: &PropertyKey) -> Option<ZoneMapEntry> {
2552 self.edge_properties.zone_map(property)
2553 }
2554
2555 pub fn rebuild_zone_maps(&self) {
2557 self.node_properties.rebuild_zone_maps();
2558 self.edge_properties.rebuild_zone_maps();
2559 }
2560
2561 #[must_use]
2565 pub fn statistics(&self) -> Statistics {
2566 self.statistics.read().clone()
2567 }
2568
2569 pub fn ensure_statistics_fresh(&self) {
2574 if self.needs_stats_recompute.swap(false, Ordering::Relaxed) {
2575 self.compute_statistics();
2576 }
2577 }
2578
2579 #[cfg(not(feature = "tiered-storage"))]
2584 pub fn compute_statistics(&self) {
2585 let mut stats = Statistics::new();
2586
2587 stats.total_nodes = self.node_count() as u64;
2589 stats.total_edges = self.edge_count() as u64;
2590
2591 let id_to_label = self.id_to_label.read();
2593 let label_index = self.label_index.read();
2594
2595 for (label_id, label_name) in id_to_label.iter().enumerate() {
2596 let node_count = label_index
2597 .get(label_id)
2598 .map(|set| set.len() as u64)
2599 .unwrap_or(0);
2600
2601 if node_count > 0 {
2602 let avg_out_degree = if stats.total_nodes > 0 {
2604 stats.total_edges as f64 / stats.total_nodes as f64
2605 } else {
2606 0.0
2607 };
2608
2609 let label_stats =
2610 LabelStatistics::new(node_count).with_degrees(avg_out_degree, avg_out_degree);
2611
2612 stats.update_label(label_name.as_ref(), label_stats);
2613 }
2614 }
2615
2616 let id_to_edge_type = self.id_to_edge_type.read();
2618 let edges = self.edges.read();
2619 let epoch = self.current_epoch();
2620
2621 let mut edge_type_counts: FxHashMap<u32, u64> = FxHashMap::default();
2622 for chain in edges.values() {
2623 if let Some(record) = chain.visible_at(epoch) {
2624 if !record.is_deleted() {
2625 *edge_type_counts.entry(record.type_id).or_default() += 1;
2626 }
2627 }
2628 }
2629
2630 for (type_id, count) in edge_type_counts {
2631 if let Some(type_name) = id_to_edge_type.get(type_id as usize) {
2632 let avg_degree = if stats.total_nodes > 0 {
2633 count as f64 / stats.total_nodes as f64
2634 } else {
2635 0.0
2636 };
2637
2638 let edge_stats = EdgeTypeStatistics::new(count, avg_degree, avg_degree);
2639 stats.update_edge_type(type_name.as_ref(), edge_stats);
2640 }
2641 }
2642
2643 *self.statistics.write() = stats;
2644 }
2645
2646 #[cfg(feature = "tiered-storage")]
2649 pub fn compute_statistics(&self) {
2650 let mut stats = Statistics::new();
2651
2652 stats.total_nodes = self.node_count() as u64;
2654 stats.total_edges = self.edge_count() as u64;
2655
2656 let id_to_label = self.id_to_label.read();
2658 let label_index = self.label_index.read();
2659
2660 for (label_id, label_name) in id_to_label.iter().enumerate() {
2661 let node_count = label_index
2662 .get(label_id)
2663 .map(|set| set.len() as u64)
2664 .unwrap_or(0);
2665
2666 if node_count > 0 {
2667 let avg_out_degree = if stats.total_nodes > 0 {
2668 stats.total_edges as f64 / stats.total_nodes as f64
2669 } else {
2670 0.0
2671 };
2672
2673 let label_stats =
2674 LabelStatistics::new(node_count).with_degrees(avg_out_degree, avg_out_degree);
2675
2676 stats.update_label(label_name.as_ref(), label_stats);
2677 }
2678 }
2679
2680 let id_to_edge_type = self.id_to_edge_type.read();
2682 let versions = self.edge_versions.read();
2683 let epoch = self.current_epoch();
2684
2685 let mut edge_type_counts: FxHashMap<u32, u64> = FxHashMap::default();
2686 for index in versions.values() {
2687 if let Some(vref) = index.visible_at(epoch) {
2688 if let Some(record) = self.read_edge_record(&vref) {
2689 if !record.is_deleted() {
2690 *edge_type_counts.entry(record.type_id).or_default() += 1;
2691 }
2692 }
2693 }
2694 }
2695
2696 for (type_id, count) in edge_type_counts {
2697 if let Some(type_name) = id_to_edge_type.get(type_id as usize) {
2698 let avg_degree = if stats.total_nodes > 0 {
2699 count as f64 / stats.total_nodes as f64
2700 } else {
2701 0.0
2702 };
2703
2704 let edge_stats = EdgeTypeStatistics::new(count, avg_degree, avg_degree);
2705 stats.update_edge_type(type_name.as_ref(), edge_stats);
2706 }
2707 }
2708
2709 *self.statistics.write() = stats;
2710 }
2711
2712 #[must_use]
2714 pub fn estimate_label_cardinality(&self, label: &str) -> f64 {
2715 self.statistics.read().estimate_label_cardinality(label)
2716 }
2717
2718 #[must_use]
2720 pub fn estimate_avg_degree(&self, edge_type: &str, outgoing: bool) -> f64 {
2721 self.statistics
2722 .read()
2723 .estimate_avg_degree(edge_type, outgoing)
2724 }
2725
2726 fn get_or_create_label_id(&self, label: &str) -> u32 {
2729 {
2730 let label_to_id = self.label_to_id.read();
2731 if let Some(&id) = label_to_id.get(label) {
2732 return id;
2733 }
2734 }
2735
2736 let mut label_to_id = self.label_to_id.write();
2737 let mut id_to_label = self.id_to_label.write();
2738
2739 if let Some(&id) = label_to_id.get(label) {
2741 return id;
2742 }
2743
2744 let id = id_to_label.len() as u32;
2745
2746 let label: ArcStr = label.into();
2747 label_to_id.insert(label.clone(), id);
2748 id_to_label.push(label);
2749
2750 id
2751 }
2752
2753 fn get_or_create_edge_type_id(&self, edge_type: &str) -> u32 {
2754 {
2755 let type_to_id = self.edge_type_to_id.read();
2756 if let Some(&id) = type_to_id.get(edge_type) {
2757 return id;
2758 }
2759 }
2760
2761 let mut type_to_id = self.edge_type_to_id.write();
2762 let mut id_to_type = self.id_to_edge_type.write();
2763
2764 if let Some(&id) = type_to_id.get(edge_type) {
2766 return id;
2767 }
2768
2769 let id = id_to_type.len() as u32;
2770 let edge_type: ArcStr = edge_type.into();
2771 type_to_id.insert(edge_type.clone(), id);
2772 id_to_type.push(edge_type);
2773
2774 id
2775 }
2776
2777 #[cfg(not(feature = "tiered-storage"))]
2784 pub fn create_node_with_id(&self, id: NodeId, labels: &[&str]) {
2785 let epoch = self.current_epoch();
2786 let mut record = NodeRecord::new(id, epoch);
2787 record.set_label_count(labels.len() as u16);
2788
2789 let mut node_label_set = FxHashSet::default();
2791 for label in labels {
2792 let label_id = self.get_or_create_label_id(*label);
2793 node_label_set.insert(label_id);
2794
2795 let mut index = self.label_index.write();
2797 while index.len() <= label_id as usize {
2798 index.push(FxHashMap::default());
2799 }
2800 index[label_id as usize].insert(id, ());
2801 }
2802
2803 self.node_labels.write().insert(id, node_label_set);
2805
2806 let chain = VersionChain::with_initial(record, epoch, TxId::SYSTEM);
2808 self.nodes.write().insert(id, chain);
2809
2810 let id_val = id.as_u64();
2812 let _ = self
2813 .next_node_id
2814 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
2815 if id_val >= current {
2816 Some(id_val + 1)
2817 } else {
2818 None
2819 }
2820 });
2821 }
2822
2823 #[cfg(feature = "tiered-storage")]
2826 pub fn create_node_with_id(&self, id: NodeId, labels: &[&str]) {
2827 let epoch = self.current_epoch();
2828 let mut record = NodeRecord::new(id, epoch);
2829 record.set_label_count(labels.len() as u16);
2830
2831 let mut node_label_set = FxHashSet::default();
2833 for label in labels {
2834 let label_id = self.get_or_create_label_id(*label);
2835 node_label_set.insert(label_id);
2836
2837 let mut index = self.label_index.write();
2839 while index.len() <= label_id as usize {
2840 index.push(FxHashMap::default());
2841 }
2842 index[label_id as usize].insert(id, ());
2843 }
2844
2845 self.node_labels.write().insert(id, node_label_set);
2847
2848 let arena = self.arena_allocator.arena_or_create(epoch);
2850 let (offset, _stored) = arena.alloc_value_with_offset(record);
2851
2852 let hot_ref = HotVersionRef::new(epoch, offset, TxId::SYSTEM);
2854 let mut versions = self.node_versions.write();
2855 versions.insert(id, VersionIndex::with_initial(hot_ref));
2856
2857 let id_val = id.as_u64();
2859 let _ = self
2860 .next_node_id
2861 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
2862 if id_val >= current {
2863 Some(id_val + 1)
2864 } else {
2865 None
2866 }
2867 });
2868 }
2869
2870 #[cfg(not(feature = "tiered-storage"))]
2874 pub fn create_edge_with_id(&self, id: EdgeId, src: NodeId, dst: NodeId, edge_type: &str) {
2875 let epoch = self.current_epoch();
2876 let type_id = self.get_or_create_edge_type_id(edge_type);
2877
2878 let record = EdgeRecord::new(id, src, dst, type_id, epoch);
2879 let chain = VersionChain::with_initial(record, epoch, TxId::SYSTEM);
2880 self.edges.write().insert(id, chain);
2881
2882 self.forward_adj.add_edge(src, dst, id);
2884 if let Some(ref backward) = self.backward_adj {
2885 backward.add_edge(dst, src, id);
2886 }
2887
2888 let id_val = id.as_u64();
2890 let _ = self
2891 .next_edge_id
2892 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
2893 if id_val >= current {
2894 Some(id_val + 1)
2895 } else {
2896 None
2897 }
2898 });
2899 }
2900
2901 #[cfg(feature = "tiered-storage")]
2904 pub fn create_edge_with_id(&self, id: EdgeId, src: NodeId, dst: NodeId, edge_type: &str) {
2905 let epoch = self.current_epoch();
2906 let type_id = self.get_or_create_edge_type_id(edge_type);
2907
2908 let record = EdgeRecord::new(id, src, dst, type_id, epoch);
2909
2910 let arena = self.arena_allocator.arena_or_create(epoch);
2912 let (offset, _stored) = arena.alloc_value_with_offset(record);
2913
2914 let hot_ref = HotVersionRef::new(epoch, offset, TxId::SYSTEM);
2916 let mut versions = self.edge_versions.write();
2917 versions.insert(id, VersionIndex::with_initial(hot_ref));
2918
2919 self.forward_adj.add_edge(src, dst, id);
2921 if let Some(ref backward) = self.backward_adj {
2922 backward.add_edge(dst, src, id);
2923 }
2924
2925 let id_val = id.as_u64();
2927 let _ = self
2928 .next_edge_id
2929 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
2930 if id_val >= current {
2931 Some(id_val + 1)
2932 } else {
2933 None
2934 }
2935 });
2936 }
2937
2938 pub fn set_epoch(&self, epoch: EpochId) {
2940 self.current_epoch.store(epoch.as_u64(), Ordering::SeqCst);
2941 }
2942}
2943
2944impl Default for LpgStore {
2945 fn default() -> Self {
2946 Self::new()
2947 }
2948}
2949
2950#[cfg(test)]
2951mod tests {
2952 use super::*;
2953
2954 #[test]
2955 fn test_create_node() {
2956 let store = LpgStore::new();
2957
2958 let id = store.create_node(&["Person"]);
2959 assert!(id.is_valid());
2960
2961 let node = store.get_node(id).unwrap();
2962 assert!(node.has_label("Person"));
2963 assert!(!node.has_label("Animal"));
2964 }
2965
2966 #[test]
2967 fn test_create_node_with_props() {
2968 let store = LpgStore::new();
2969
2970 let id = store.create_node_with_props(
2971 &["Person"],
2972 [("name", Value::from("Alice")), ("age", Value::from(30i64))],
2973 );
2974
2975 let node = store.get_node(id).unwrap();
2976 assert_eq!(
2977 node.get_property("name").and_then(|v| v.as_str()),
2978 Some("Alice")
2979 );
2980 assert_eq!(
2981 node.get_property("age").and_then(|v| v.as_int64()),
2982 Some(30)
2983 );
2984 }
2985
2986 #[test]
2987 fn test_delete_node() {
2988 let store = LpgStore::new();
2989
2990 let id = store.create_node(&["Person"]);
2991 assert_eq!(store.node_count(), 1);
2992
2993 assert!(store.delete_node(id));
2994 assert_eq!(store.node_count(), 0);
2995 assert!(store.get_node(id).is_none());
2996
2997 assert!(!store.delete_node(id));
2999 }
3000
3001 #[test]
3002 fn test_create_edge() {
3003 let store = LpgStore::new();
3004
3005 let alice = store.create_node(&["Person"]);
3006 let bob = store.create_node(&["Person"]);
3007
3008 let edge_id = store.create_edge(alice, bob, "KNOWS");
3009 assert!(edge_id.is_valid());
3010
3011 let edge = store.get_edge(edge_id).unwrap();
3012 assert_eq!(edge.src, alice);
3013 assert_eq!(edge.dst, bob);
3014 assert_eq!(edge.edge_type.as_str(), "KNOWS");
3015 }
3016
3017 #[test]
3018 fn test_neighbors() {
3019 let store = LpgStore::new();
3020
3021 let a = store.create_node(&["Person"]);
3022 let b = store.create_node(&["Person"]);
3023 let c = store.create_node(&["Person"]);
3024
3025 store.create_edge(a, b, "KNOWS");
3026 store.create_edge(a, c, "KNOWS");
3027
3028 let outgoing: Vec<_> = store.neighbors(a, Direction::Outgoing).collect();
3029 assert_eq!(outgoing.len(), 2);
3030 assert!(outgoing.contains(&b));
3031 assert!(outgoing.contains(&c));
3032
3033 let incoming: Vec<_> = store.neighbors(b, Direction::Incoming).collect();
3034 assert_eq!(incoming.len(), 1);
3035 assert!(incoming.contains(&a));
3036 }
3037
3038 #[test]
3039 fn test_nodes_by_label() {
3040 let store = LpgStore::new();
3041
3042 let p1 = store.create_node(&["Person"]);
3043 let p2 = store.create_node(&["Person"]);
3044 let _a = store.create_node(&["Animal"]);
3045
3046 let persons = store.nodes_by_label("Person");
3047 assert_eq!(persons.len(), 2);
3048 assert!(persons.contains(&p1));
3049 assert!(persons.contains(&p2));
3050
3051 let animals = store.nodes_by_label("Animal");
3052 assert_eq!(animals.len(), 1);
3053 }
3054
3055 #[test]
3056 fn test_delete_edge() {
3057 let store = LpgStore::new();
3058
3059 let a = store.create_node(&["Person"]);
3060 let b = store.create_node(&["Person"]);
3061 let edge_id = store.create_edge(a, b, "KNOWS");
3062
3063 assert_eq!(store.edge_count(), 1);
3064
3065 assert!(store.delete_edge(edge_id));
3066 assert_eq!(store.edge_count(), 0);
3067 assert!(store.get_edge(edge_id).is_none());
3068 }
3069
3070 #[test]
3073 fn test_lpg_store_config() {
3074 let config = LpgStoreConfig {
3076 backward_edges: false,
3077 initial_node_capacity: 100,
3078 initial_edge_capacity: 200,
3079 };
3080 let store = LpgStore::with_config(config);
3081
3082 let a = store.create_node(&["Person"]);
3084 let b = store.create_node(&["Person"]);
3085 store.create_edge(a, b, "KNOWS");
3086
3087 let outgoing: Vec<_> = store.neighbors(a, Direction::Outgoing).collect();
3089 assert_eq!(outgoing.len(), 1);
3090
3091 let incoming: Vec<_> = store.neighbors(b, Direction::Incoming).collect();
3093 assert_eq!(incoming.len(), 0);
3094 }
3095
3096 #[test]
3097 fn test_epoch_management() {
3098 let store = LpgStore::new();
3099
3100 let epoch0 = store.current_epoch();
3101 assert_eq!(epoch0.as_u64(), 0);
3102
3103 let epoch1 = store.new_epoch();
3104 assert_eq!(epoch1.as_u64(), 1);
3105
3106 let current = store.current_epoch();
3107 assert_eq!(current.as_u64(), 1);
3108 }
3109
3110 #[test]
3111 fn test_node_properties() {
3112 let store = LpgStore::new();
3113 let id = store.create_node(&["Person"]);
3114
3115 store.set_node_property(id, "name", Value::from("Alice"));
3117 let name = store.get_node_property(id, &"name".into());
3118 assert!(matches!(name, Some(Value::String(s)) if s.as_str() == "Alice"));
3119
3120 store.set_node_property(id, "name", Value::from("Bob"));
3122 let name = store.get_node_property(id, &"name".into());
3123 assert!(matches!(name, Some(Value::String(s)) if s.as_str() == "Bob"));
3124
3125 let old = store.remove_node_property(id, "name");
3127 assert!(matches!(old, Some(Value::String(s)) if s.as_str() == "Bob"));
3128
3129 let name = store.get_node_property(id, &"name".into());
3131 assert!(name.is_none());
3132
3133 let none = store.remove_node_property(id, "nonexistent");
3135 assert!(none.is_none());
3136 }
3137
3138 #[test]
3139 fn test_edge_properties() {
3140 let store = LpgStore::new();
3141 let a = store.create_node(&["Person"]);
3142 let b = store.create_node(&["Person"]);
3143 let edge_id = store.create_edge(a, b, "KNOWS");
3144
3145 store.set_edge_property(edge_id, "since", Value::from(2020i64));
3147 let since = store.get_edge_property(edge_id, &"since".into());
3148 assert_eq!(since.and_then(|v| v.as_int64()), Some(2020));
3149
3150 let old = store.remove_edge_property(edge_id, "since");
3152 assert_eq!(old.and_then(|v| v.as_int64()), Some(2020));
3153
3154 let since = store.get_edge_property(edge_id, &"since".into());
3155 assert!(since.is_none());
3156 }
3157
3158 #[test]
3159 fn test_add_remove_label() {
3160 let store = LpgStore::new();
3161 let id = store.create_node(&["Person"]);
3162
3163 assert!(store.add_label(id, "Employee"));
3165
3166 let node = store.get_node(id).unwrap();
3167 assert!(node.has_label("Person"));
3168 assert!(node.has_label("Employee"));
3169
3170 assert!(!store.add_label(id, "Employee"));
3172
3173 assert!(store.remove_label(id, "Employee"));
3175
3176 let node = store.get_node(id).unwrap();
3177 assert!(node.has_label("Person"));
3178 assert!(!node.has_label("Employee"));
3179
3180 assert!(!store.remove_label(id, "Employee"));
3182 assert!(!store.remove_label(id, "NonExistent"));
3183 }
3184
3185 #[test]
3186 fn test_add_label_to_nonexistent_node() {
3187 let store = LpgStore::new();
3188 let fake_id = NodeId::new(999);
3189 assert!(!store.add_label(fake_id, "Label"));
3190 }
3191
3192 #[test]
3193 fn test_remove_label_from_nonexistent_node() {
3194 let store = LpgStore::new();
3195 let fake_id = NodeId::new(999);
3196 assert!(!store.remove_label(fake_id, "Label"));
3197 }
3198
3199 #[test]
3200 fn test_node_ids() {
3201 let store = LpgStore::new();
3202
3203 let n1 = store.create_node(&["Person"]);
3204 let n2 = store.create_node(&["Person"]);
3205 let n3 = store.create_node(&["Person"]);
3206
3207 let ids = store.node_ids();
3208 assert_eq!(ids.len(), 3);
3209 assert!(ids.contains(&n1));
3210 assert!(ids.contains(&n2));
3211 assert!(ids.contains(&n3));
3212
3213 store.delete_node(n2);
3215 let ids = store.node_ids();
3216 assert_eq!(ids.len(), 2);
3217 assert!(!ids.contains(&n2));
3218 }
3219
3220 #[test]
3221 fn test_delete_node_nonexistent() {
3222 let store = LpgStore::new();
3223 let fake_id = NodeId::new(999);
3224 assert!(!store.delete_node(fake_id));
3225 }
3226
3227 #[test]
3228 fn test_delete_edge_nonexistent() {
3229 let store = LpgStore::new();
3230 let fake_id = EdgeId::new(999);
3231 assert!(!store.delete_edge(fake_id));
3232 }
3233
3234 #[test]
3235 fn test_delete_edge_double() {
3236 let store = LpgStore::new();
3237 let a = store.create_node(&["Person"]);
3238 let b = store.create_node(&["Person"]);
3239 let edge_id = store.create_edge(a, b, "KNOWS");
3240
3241 assert!(store.delete_edge(edge_id));
3242 assert!(!store.delete_edge(edge_id)); }
3244
3245 #[test]
3246 fn test_create_edge_with_props() {
3247 let store = LpgStore::new();
3248 let a = store.create_node(&["Person"]);
3249 let b = store.create_node(&["Person"]);
3250
3251 let edge_id = store.create_edge_with_props(
3252 a,
3253 b,
3254 "KNOWS",
3255 [
3256 ("since", Value::from(2020i64)),
3257 ("weight", Value::from(1.0)),
3258 ],
3259 );
3260
3261 let edge = store.get_edge(edge_id).unwrap();
3262 assert_eq!(
3263 edge.get_property("since").and_then(|v| v.as_int64()),
3264 Some(2020)
3265 );
3266 assert_eq!(
3267 edge.get_property("weight").and_then(|v| v.as_float64()),
3268 Some(1.0)
3269 );
3270 }
3271
3272 #[test]
3273 fn test_delete_node_edges() {
3274 let store = LpgStore::new();
3275
3276 let a = store.create_node(&["Person"]);
3277 let b = store.create_node(&["Person"]);
3278 let c = store.create_node(&["Person"]);
3279
3280 store.create_edge(a, b, "KNOWS"); store.create_edge(c, a, "KNOWS"); assert_eq!(store.edge_count(), 2);
3284
3285 store.delete_node_edges(a);
3287
3288 assert_eq!(store.edge_count(), 0);
3289 }
3290
3291 #[test]
3292 fn test_neighbors_both_directions() {
3293 let store = LpgStore::new();
3294
3295 let a = store.create_node(&["Person"]);
3296 let b = store.create_node(&["Person"]);
3297 let c = store.create_node(&["Person"]);
3298
3299 store.create_edge(a, b, "KNOWS"); store.create_edge(c, a, "KNOWS"); let neighbors: Vec<_> = store.neighbors(a, Direction::Both).collect();
3304 assert_eq!(neighbors.len(), 2);
3305 assert!(neighbors.contains(&b)); assert!(neighbors.contains(&c)); }
3308
3309 #[test]
3310 fn test_edges_from() {
3311 let store = LpgStore::new();
3312
3313 let a = store.create_node(&["Person"]);
3314 let b = store.create_node(&["Person"]);
3315 let c = store.create_node(&["Person"]);
3316
3317 let e1 = store.create_edge(a, b, "KNOWS");
3318 let e2 = store.create_edge(a, c, "KNOWS");
3319
3320 let edges: Vec<_> = store.edges_from(a, Direction::Outgoing).collect();
3321 assert_eq!(edges.len(), 2);
3322 assert!(edges.iter().any(|(_, e)| *e == e1));
3323 assert!(edges.iter().any(|(_, e)| *e == e2));
3324
3325 let incoming: Vec<_> = store.edges_from(b, Direction::Incoming).collect();
3327 assert_eq!(incoming.len(), 1);
3328 assert_eq!(incoming[0].1, e1);
3329 }
3330
3331 #[test]
3332 fn test_edges_to() {
3333 let store = LpgStore::new();
3334
3335 let a = store.create_node(&["Person"]);
3336 let b = store.create_node(&["Person"]);
3337 let c = store.create_node(&["Person"]);
3338
3339 let e1 = store.create_edge(a, b, "KNOWS");
3340 let e2 = store.create_edge(c, b, "KNOWS");
3341
3342 let to_b = store.edges_to(b);
3344 assert_eq!(to_b.len(), 2);
3345 assert!(to_b.iter().any(|(src, e)| *src == a && *e == e1));
3346 assert!(to_b.iter().any(|(src, e)| *src == c && *e == e2));
3347 }
3348
3349 #[test]
3350 fn test_out_degree_in_degree() {
3351 let store = LpgStore::new();
3352
3353 let a = store.create_node(&["Person"]);
3354 let b = store.create_node(&["Person"]);
3355 let c = store.create_node(&["Person"]);
3356
3357 store.create_edge(a, b, "KNOWS");
3358 store.create_edge(a, c, "KNOWS");
3359 store.create_edge(c, b, "KNOWS");
3360
3361 assert_eq!(store.out_degree(a), 2);
3362 assert_eq!(store.out_degree(b), 0);
3363 assert_eq!(store.out_degree(c), 1);
3364
3365 assert_eq!(store.in_degree(a), 0);
3366 assert_eq!(store.in_degree(b), 2);
3367 assert_eq!(store.in_degree(c), 1);
3368 }
3369
3370 #[test]
3371 fn test_edge_type() {
3372 let store = LpgStore::new();
3373
3374 let a = store.create_node(&["Person"]);
3375 let b = store.create_node(&["Person"]);
3376 let edge_id = store.create_edge(a, b, "KNOWS");
3377
3378 let edge_type = store.edge_type(edge_id);
3379 assert_eq!(edge_type.as_deref(), Some("KNOWS"));
3380
3381 let fake_id = EdgeId::new(999);
3383 assert!(store.edge_type(fake_id).is_none());
3384 }
3385
3386 #[test]
3387 fn test_count_methods() {
3388 let store = LpgStore::new();
3389
3390 assert_eq!(store.label_count(), 0);
3391 assert_eq!(store.edge_type_count(), 0);
3392 assert_eq!(store.property_key_count(), 0);
3393
3394 let a = store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
3395 let b = store.create_node(&["Company"]);
3396 store.create_edge_with_props(a, b, "WORKS_AT", [("since", Value::from(2020i64))]);
3397
3398 assert_eq!(store.label_count(), 2); assert_eq!(store.edge_type_count(), 1); assert_eq!(store.property_key_count(), 2); }
3402
3403 #[test]
3404 fn test_all_nodes_and_edges() {
3405 let store = LpgStore::new();
3406
3407 let a = store.create_node(&["Person"]);
3408 let b = store.create_node(&["Person"]);
3409 store.create_edge(a, b, "KNOWS");
3410
3411 let nodes: Vec<_> = store.all_nodes().collect();
3412 assert_eq!(nodes.len(), 2);
3413
3414 let edges: Vec<_> = store.all_edges().collect();
3415 assert_eq!(edges.len(), 1);
3416 }
3417
3418 #[test]
3419 fn test_all_labels_and_edge_types() {
3420 let store = LpgStore::new();
3421
3422 store.create_node(&["Person"]);
3423 store.create_node(&["Company"]);
3424 let a = store.create_node(&["Animal"]);
3425 let b = store.create_node(&["Animal"]);
3426 store.create_edge(a, b, "EATS");
3427
3428 let labels = store.all_labels();
3429 assert_eq!(labels.len(), 3);
3430 assert!(labels.contains(&"Person".to_string()));
3431 assert!(labels.contains(&"Company".to_string()));
3432 assert!(labels.contains(&"Animal".to_string()));
3433
3434 let edge_types = store.all_edge_types();
3435 assert_eq!(edge_types.len(), 1);
3436 assert!(edge_types.contains(&"EATS".to_string()));
3437 }
3438
3439 #[test]
3440 fn test_all_property_keys() {
3441 let store = LpgStore::new();
3442
3443 let a = store.create_node_with_props(&["Person"], [("name", Value::from("Alice"))]);
3444 let b = store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
3445 store.create_edge_with_props(a, b, "KNOWS", [("since", Value::from(2020i64))]);
3446
3447 let keys = store.all_property_keys();
3448 assert!(keys.contains(&"name".to_string()));
3449 assert!(keys.contains(&"age".to_string()));
3450 assert!(keys.contains(&"since".to_string()));
3451 }
3452
3453 #[test]
3454 fn test_nodes_with_label() {
3455 let store = LpgStore::new();
3456
3457 store.create_node(&["Person"]);
3458 store.create_node(&["Person"]);
3459 store.create_node(&["Company"]);
3460
3461 let persons: Vec<_> = store.nodes_with_label("Person").collect();
3462 assert_eq!(persons.len(), 2);
3463
3464 let companies: Vec<_> = store.nodes_with_label("Company").collect();
3465 assert_eq!(companies.len(), 1);
3466
3467 let none: Vec<_> = store.nodes_with_label("NonExistent").collect();
3468 assert_eq!(none.len(), 0);
3469 }
3470
3471 #[test]
3472 fn test_edges_with_type() {
3473 let store = LpgStore::new();
3474
3475 let a = store.create_node(&["Person"]);
3476 let b = store.create_node(&["Person"]);
3477 let c = store.create_node(&["Company"]);
3478
3479 store.create_edge(a, b, "KNOWS");
3480 store.create_edge(a, c, "WORKS_AT");
3481
3482 let knows: Vec<_> = store.edges_with_type("KNOWS").collect();
3483 assert_eq!(knows.len(), 1);
3484
3485 let works_at: Vec<_> = store.edges_with_type("WORKS_AT").collect();
3486 assert_eq!(works_at.len(), 1);
3487
3488 let none: Vec<_> = store.edges_with_type("NonExistent").collect();
3489 assert_eq!(none.len(), 0);
3490 }
3491
3492 #[test]
3493 fn test_nodes_by_label_nonexistent() {
3494 let store = LpgStore::new();
3495 store.create_node(&["Person"]);
3496
3497 let empty = store.nodes_by_label("NonExistent");
3498 assert!(empty.is_empty());
3499 }
3500
3501 #[test]
3502 fn test_statistics() {
3503 let store = LpgStore::new();
3504
3505 let a = store.create_node(&["Person"]);
3506 let b = store.create_node(&["Person"]);
3507 let c = store.create_node(&["Company"]);
3508
3509 store.create_edge(a, b, "KNOWS");
3510 store.create_edge(a, c, "WORKS_AT");
3511
3512 store.compute_statistics();
3513 let stats = store.statistics();
3514
3515 assert_eq!(stats.total_nodes, 3);
3516 assert_eq!(stats.total_edges, 2);
3517
3518 let person_card = store.estimate_label_cardinality("Person");
3520 assert!(person_card > 0.0);
3521
3522 let avg_degree = store.estimate_avg_degree("KNOWS", true);
3523 assert!(avg_degree >= 0.0);
3524 }
3525
3526 #[test]
3527 fn test_zone_maps() {
3528 let store = LpgStore::new();
3529
3530 store.create_node_with_props(&["Person"], [("age", Value::from(25i64))]);
3531 store.create_node_with_props(&["Person"], [("age", Value::from(35i64))]);
3532
3533 let might_match =
3535 store.node_property_might_match(&"age".into(), CompareOp::Eq, &Value::from(30i64));
3536 assert!(might_match);
3538
3539 let zone = store.node_property_zone_map(&"age".into());
3540 assert!(zone.is_some());
3541
3542 let no_zone = store.node_property_zone_map(&"nonexistent".into());
3544 assert!(no_zone.is_none());
3545
3546 let a = store.create_node(&["A"]);
3548 let b = store.create_node(&["B"]);
3549 store.create_edge_with_props(a, b, "REL", [("weight", Value::from(1.0))]);
3550
3551 let edge_zone = store.edge_property_zone_map(&"weight".into());
3552 assert!(edge_zone.is_some());
3553 }
3554
3555 #[test]
3556 fn test_rebuild_zone_maps() {
3557 let store = LpgStore::new();
3558 store.create_node_with_props(&["Person"], [("age", Value::from(25i64))]);
3559
3560 store.rebuild_zone_maps();
3562 }
3563
3564 #[test]
3565 fn test_create_node_with_id() {
3566 let store = LpgStore::new();
3567
3568 let specific_id = NodeId::new(100);
3569 store.create_node_with_id(specific_id, &["Person", "Employee"]);
3570
3571 let node = store.get_node(specific_id).unwrap();
3572 assert!(node.has_label("Person"));
3573 assert!(node.has_label("Employee"));
3574
3575 let next = store.create_node(&["Other"]);
3577 assert!(next.as_u64() > 100);
3578 }
3579
3580 #[test]
3581 fn test_create_edge_with_id() {
3582 let store = LpgStore::new();
3583
3584 let a = store.create_node(&["A"]);
3585 let b = store.create_node(&["B"]);
3586
3587 let specific_id = EdgeId::new(500);
3588 store.create_edge_with_id(specific_id, a, b, "REL");
3589
3590 let edge = store.get_edge(specific_id).unwrap();
3591 assert_eq!(edge.src, a);
3592 assert_eq!(edge.dst, b);
3593 assert_eq!(edge.edge_type.as_str(), "REL");
3594
3595 let next = store.create_edge(a, b, "OTHER");
3597 assert!(next.as_u64() > 500);
3598 }
3599
3600 #[test]
3601 fn test_set_epoch() {
3602 let store = LpgStore::new();
3603
3604 assert_eq!(store.current_epoch().as_u64(), 0);
3605
3606 store.set_epoch(EpochId::new(42));
3607 assert_eq!(store.current_epoch().as_u64(), 42);
3608 }
3609
3610 #[test]
3611 fn test_get_node_nonexistent() {
3612 let store = LpgStore::new();
3613 let fake_id = NodeId::new(999);
3614 assert!(store.get_node(fake_id).is_none());
3615 }
3616
3617 #[test]
3618 fn test_get_edge_nonexistent() {
3619 let store = LpgStore::new();
3620 let fake_id = EdgeId::new(999);
3621 assert!(store.get_edge(fake_id).is_none());
3622 }
3623
3624 #[test]
3625 fn test_multiple_labels() {
3626 let store = LpgStore::new();
3627
3628 let id = store.create_node(&["Person", "Employee", "Manager"]);
3629 let node = store.get_node(id).unwrap();
3630
3631 assert!(node.has_label("Person"));
3632 assert!(node.has_label("Employee"));
3633 assert!(node.has_label("Manager"));
3634 assert!(!node.has_label("Other"));
3635 }
3636
3637 #[test]
3638 fn test_default_impl() {
3639 let store: LpgStore = Default::default();
3640 assert_eq!(store.node_count(), 0);
3641 assert_eq!(store.edge_count(), 0);
3642 }
3643
3644 #[test]
3645 fn test_edges_from_both_directions() {
3646 let store = LpgStore::new();
3647
3648 let a = store.create_node(&["A"]);
3649 let b = store.create_node(&["B"]);
3650 let c = store.create_node(&["C"]);
3651
3652 let e1 = store.create_edge(a, b, "R1"); let e2 = store.create_edge(c, a, "R2"); let edges: Vec<_> = store.edges_from(a, Direction::Both).collect();
3657 assert_eq!(edges.len(), 2);
3658 assert!(edges.iter().any(|(_, e)| *e == e1)); assert!(edges.iter().any(|(_, e)| *e == e2)); }
3661
3662 #[test]
3663 fn test_no_backward_adj_in_degree() {
3664 let config = LpgStoreConfig {
3665 backward_edges: false,
3666 initial_node_capacity: 10,
3667 initial_edge_capacity: 10,
3668 };
3669 let store = LpgStore::with_config(config);
3670
3671 let a = store.create_node(&["A"]);
3672 let b = store.create_node(&["B"]);
3673 store.create_edge(a, b, "R");
3674
3675 let degree = store.in_degree(b);
3677 assert_eq!(degree, 1);
3678 }
3679
3680 #[test]
3681 fn test_no_backward_adj_edges_to() {
3682 let config = LpgStoreConfig {
3683 backward_edges: false,
3684 initial_node_capacity: 10,
3685 initial_edge_capacity: 10,
3686 };
3687 let store = LpgStore::with_config(config);
3688
3689 let a = store.create_node(&["A"]);
3690 let b = store.create_node(&["B"]);
3691 let e = store.create_edge(a, b, "R");
3692
3693 let edges = store.edges_to(b);
3695 assert_eq!(edges.len(), 1);
3696 assert_eq!(edges[0].1, e);
3697 }
3698
3699 #[test]
3700 fn test_node_versioned_creation() {
3701 let store = LpgStore::new();
3702
3703 let epoch = store.new_epoch();
3704 let tx_id = TxId::new(1);
3705
3706 let id = store.create_node_versioned(&["Person"], epoch, tx_id);
3707 assert!(store.get_node(id).is_some());
3708 }
3709
3710 #[test]
3711 fn test_edge_versioned_creation() {
3712 let store = LpgStore::new();
3713
3714 let a = store.create_node(&["A"]);
3715 let b = store.create_node(&["B"]);
3716
3717 let epoch = store.new_epoch();
3718 let tx_id = TxId::new(1);
3719
3720 let edge_id = store.create_edge_versioned(a, b, "REL", epoch, tx_id);
3721 assert!(store.get_edge(edge_id).is_some());
3722 }
3723
3724 #[test]
3725 fn test_node_with_props_versioned() {
3726 let store = LpgStore::new();
3727
3728 let epoch = store.new_epoch();
3729 let tx_id = TxId::new(1);
3730
3731 let id = store.create_node_with_props_versioned(
3732 &["Person"],
3733 [("name", Value::from("Alice"))],
3734 epoch,
3735 tx_id,
3736 );
3737
3738 let node = store.get_node(id).unwrap();
3739 assert_eq!(
3740 node.get_property("name").and_then(|v| v.as_str()),
3741 Some("Alice")
3742 );
3743 }
3744
3745 #[test]
3746 fn test_discard_uncommitted_versions() {
3747 let store = LpgStore::new();
3748
3749 let epoch = store.new_epoch();
3750 let tx_id = TxId::new(42);
3751
3752 let node_id = store.create_node_versioned(&["Person"], epoch, tx_id);
3754 assert!(store.get_node(node_id).is_some());
3755
3756 store.discard_uncommitted_versions(tx_id);
3758
3759 assert!(store.get_node(node_id).is_none());
3761 }
3762
3763 #[test]
3766 fn test_property_index_create_and_lookup() {
3767 let store = LpgStore::new();
3768
3769 let alice = store.create_node(&["Person"]);
3771 let bob = store.create_node(&["Person"]);
3772 let charlie = store.create_node(&["Person"]);
3773
3774 store.set_node_property(alice, "city", Value::from("NYC"));
3775 store.set_node_property(bob, "city", Value::from("NYC"));
3776 store.set_node_property(charlie, "city", Value::from("LA"));
3777
3778 let nyc_people = store.find_nodes_by_property("city", &Value::from("NYC"));
3780 assert_eq!(nyc_people.len(), 2);
3781
3782 store.create_property_index("city");
3784 assert!(store.has_property_index("city"));
3785
3786 let nyc_people = store.find_nodes_by_property("city", &Value::from("NYC"));
3788 assert_eq!(nyc_people.len(), 2);
3789 assert!(nyc_people.contains(&alice));
3790 assert!(nyc_people.contains(&bob));
3791
3792 let la_people = store.find_nodes_by_property("city", &Value::from("LA"));
3793 assert_eq!(la_people.len(), 1);
3794 assert!(la_people.contains(&charlie));
3795 }
3796
3797 #[test]
3798 fn test_property_index_maintained_on_update() {
3799 let store = LpgStore::new();
3800
3801 store.create_property_index("status");
3803
3804 let node = store.create_node(&["Task"]);
3805 store.set_node_property(node, "status", Value::from("pending"));
3806
3807 let pending = store.find_nodes_by_property("status", &Value::from("pending"));
3809 assert_eq!(pending.len(), 1);
3810 assert!(pending.contains(&node));
3811
3812 store.set_node_property(node, "status", Value::from("done"));
3814
3815 let pending = store.find_nodes_by_property("status", &Value::from("pending"));
3817 assert!(pending.is_empty());
3818
3819 let done = store.find_nodes_by_property("status", &Value::from("done"));
3821 assert_eq!(done.len(), 1);
3822 assert!(done.contains(&node));
3823 }
3824
3825 #[test]
3826 fn test_property_index_maintained_on_remove() {
3827 let store = LpgStore::new();
3828
3829 store.create_property_index("tag");
3830
3831 let node = store.create_node(&["Item"]);
3832 store.set_node_property(node, "tag", Value::from("important"));
3833
3834 let found = store.find_nodes_by_property("tag", &Value::from("important"));
3836 assert_eq!(found.len(), 1);
3837
3838 store.remove_node_property(node, "tag");
3840
3841 let found = store.find_nodes_by_property("tag", &Value::from("important"));
3843 assert!(found.is_empty());
3844 }
3845
3846 #[test]
3847 fn test_property_index_drop() {
3848 let store = LpgStore::new();
3849
3850 store.create_property_index("key");
3851 assert!(store.has_property_index("key"));
3852
3853 assert!(store.drop_property_index("key"));
3854 assert!(!store.has_property_index("key"));
3855
3856 assert!(!store.drop_property_index("key"));
3858 }
3859
3860 #[test]
3861 fn test_property_index_multiple_values() {
3862 let store = LpgStore::new();
3863
3864 store.create_property_index("age");
3865
3866 let n1 = store.create_node(&["Person"]);
3868 let n2 = store.create_node(&["Person"]);
3869 let n3 = store.create_node(&["Person"]);
3870 let n4 = store.create_node(&["Person"]);
3871
3872 store.set_node_property(n1, "age", Value::from(25i64));
3873 store.set_node_property(n2, "age", Value::from(25i64));
3874 store.set_node_property(n3, "age", Value::from(30i64));
3875 store.set_node_property(n4, "age", Value::from(25i64));
3876
3877 let age_25 = store.find_nodes_by_property("age", &Value::from(25i64));
3878 assert_eq!(age_25.len(), 3);
3879
3880 let age_30 = store.find_nodes_by_property("age", &Value::from(30i64));
3881 assert_eq!(age_30.len(), 1);
3882
3883 let age_40 = store.find_nodes_by_property("age", &Value::from(40i64));
3884 assert!(age_40.is_empty());
3885 }
3886
3887 #[test]
3888 fn test_property_index_builds_from_existing_data() {
3889 let store = LpgStore::new();
3890
3891 let n1 = store.create_node(&["Person"]);
3893 let n2 = store.create_node(&["Person"]);
3894 store.set_node_property(n1, "email", Value::from("alice@example.com"));
3895 store.set_node_property(n2, "email", Value::from("bob@example.com"));
3896
3897 store.create_property_index("email");
3899
3900 let alice = store.find_nodes_by_property("email", &Value::from("alice@example.com"));
3902 assert_eq!(alice.len(), 1);
3903 assert!(alice.contains(&n1));
3904
3905 let bob = store.find_nodes_by_property("email", &Value::from("bob@example.com"));
3906 assert_eq!(bob.len(), 1);
3907 assert!(bob.contains(&n2));
3908 }
3909
3910 #[test]
3911 fn test_get_node_property_batch() {
3912 let store = LpgStore::new();
3913
3914 let n1 = store.create_node(&["Person"]);
3915 let n2 = store.create_node(&["Person"]);
3916 let n3 = store.create_node(&["Person"]);
3917
3918 store.set_node_property(n1, "age", Value::from(25i64));
3919 store.set_node_property(n2, "age", Value::from(30i64));
3920 let age_key = PropertyKey::new("age");
3923 let values = store.get_node_property_batch(&[n1, n2, n3], &age_key);
3924
3925 assert_eq!(values.len(), 3);
3926 assert_eq!(values[0], Some(Value::from(25i64)));
3927 assert_eq!(values[1], Some(Value::from(30i64)));
3928 assert_eq!(values[2], None);
3929 }
3930
3931 #[test]
3932 fn test_get_node_property_batch_empty() {
3933 let store = LpgStore::new();
3934 let key = PropertyKey::new("any");
3935
3936 let values = store.get_node_property_batch(&[], &key);
3937 assert!(values.is_empty());
3938 }
3939
3940 #[test]
3941 fn test_get_nodes_properties_batch() {
3942 let store = LpgStore::new();
3943
3944 let n1 = store.create_node(&["Person"]);
3945 let n2 = store.create_node(&["Person"]);
3946 let n3 = store.create_node(&["Person"]);
3947
3948 store.set_node_property(n1, "name", Value::from("Alice"));
3949 store.set_node_property(n1, "age", Value::from(25i64));
3950 store.set_node_property(n2, "name", Value::from("Bob"));
3951 let all_props = store.get_nodes_properties_batch(&[n1, n2, n3]);
3954
3955 assert_eq!(all_props.len(), 3);
3956 assert_eq!(all_props[0].len(), 2); assert_eq!(all_props[1].len(), 1); assert_eq!(all_props[2].len(), 0); assert_eq!(
3961 all_props[0].get(&PropertyKey::new("name")),
3962 Some(&Value::from("Alice"))
3963 );
3964 assert_eq!(
3965 all_props[1].get(&PropertyKey::new("name")),
3966 Some(&Value::from("Bob"))
3967 );
3968 }
3969
3970 #[test]
3971 fn test_get_nodes_properties_batch_empty() {
3972 let store = LpgStore::new();
3973
3974 let all_props = store.get_nodes_properties_batch(&[]);
3975 assert!(all_props.is_empty());
3976 }
3977
3978 #[test]
3979 fn test_get_nodes_properties_selective_batch() {
3980 let store = LpgStore::new();
3981
3982 let n1 = store.create_node(&["Person"]);
3983 let n2 = store.create_node(&["Person"]);
3984
3985 store.set_node_property(n1, "name", Value::from("Alice"));
3987 store.set_node_property(n1, "age", Value::from(25i64));
3988 store.set_node_property(n1, "email", Value::from("alice@example.com"));
3989 store.set_node_property(n2, "name", Value::from("Bob"));
3990 store.set_node_property(n2, "age", Value::from(30i64));
3991 store.set_node_property(n2, "city", Value::from("NYC"));
3992
3993 let keys = vec![PropertyKey::new("name"), PropertyKey::new("age")];
3995 let props = store.get_nodes_properties_selective_batch(&[n1, n2], &keys);
3996
3997 assert_eq!(props.len(), 2);
3998
3999 assert_eq!(props[0].len(), 2);
4001 assert_eq!(
4002 props[0].get(&PropertyKey::new("name")),
4003 Some(&Value::from("Alice"))
4004 );
4005 assert_eq!(
4006 props[0].get(&PropertyKey::new("age")),
4007 Some(&Value::from(25i64))
4008 );
4009 assert_eq!(props[0].get(&PropertyKey::new("email")), None);
4010
4011 assert_eq!(props[1].len(), 2);
4013 assert_eq!(
4014 props[1].get(&PropertyKey::new("name")),
4015 Some(&Value::from("Bob"))
4016 );
4017 assert_eq!(
4018 props[1].get(&PropertyKey::new("age")),
4019 Some(&Value::from(30i64))
4020 );
4021 assert_eq!(props[1].get(&PropertyKey::new("city")), None);
4022 }
4023
4024 #[test]
4025 fn test_get_nodes_properties_selective_batch_empty_keys() {
4026 let store = LpgStore::new();
4027
4028 let n1 = store.create_node(&["Person"]);
4029 store.set_node_property(n1, "name", Value::from("Alice"));
4030
4031 let props = store.get_nodes_properties_selective_batch(&[n1], &[]);
4033
4034 assert_eq!(props.len(), 1);
4035 assert!(props[0].is_empty()); }
4037
4038 #[test]
4039 fn test_get_nodes_properties_selective_batch_missing_keys() {
4040 let store = LpgStore::new();
4041
4042 let n1 = store.create_node(&["Person"]);
4043 store.set_node_property(n1, "name", Value::from("Alice"));
4044
4045 let keys = vec![PropertyKey::new("nonexistent"), PropertyKey::new("name")];
4047 let props = store.get_nodes_properties_selective_batch(&[n1], &keys);
4048
4049 assert_eq!(props.len(), 1);
4050 assert_eq!(props[0].len(), 1); assert_eq!(
4052 props[0].get(&PropertyKey::new("name")),
4053 Some(&Value::from("Alice"))
4054 );
4055 }
4056}