1use super::property::CompareOp;
13use super::{Edge, EdgeRecord, Node, NodeRecord, PropertyStorage};
14use crate::graph::Direction;
15use crate::index::adjacency::ChunkedAdjacency;
16use crate::index::zone_map::ZoneMapEntry;
17use crate::statistics::{EdgeTypeStatistics, LabelStatistics, Statistics};
18use arcstr::ArcStr;
19use dashmap::DashMap;
20#[cfg(not(feature = "tiered-storage"))]
21use grafeo_common::mvcc::VersionChain;
22use grafeo_common::types::{EdgeId, EpochId, HashableValue, NodeId, PropertyKey, TxId, Value};
23use grafeo_common::utils::hash::{FxHashMap, FxHashSet};
24use parking_lot::RwLock;
25use std::cmp::Ordering as CmpOrdering;
26#[cfg(any(feature = "tiered-storage", feature = "vector-index"))]
27use std::sync::Arc;
28use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
29
30#[cfg(feature = "vector-index")]
31use crate::index::vector::HnswIndex;
32
33fn compare_values_for_range(a: &Value, b: &Value) -> Option<CmpOrdering> {
35 match (a, b) {
36 (Value::Int64(a), Value::Int64(b)) => Some(a.cmp(b)),
37 (Value::Float64(a), Value::Float64(b)) => a.partial_cmp(b),
38 (Value::String(a), Value::String(b)) => Some(a.cmp(b)),
39 (Value::Bool(a), Value::Bool(b)) => Some(a.cmp(b)),
40 _ => None,
41 }
42}
43
44fn value_in_range(
46 value: &Value,
47 min: Option<&Value>,
48 max: Option<&Value>,
49 min_inclusive: bool,
50 max_inclusive: bool,
51) -> bool {
52 if let Some(min_val) = min {
54 match compare_values_for_range(value, min_val) {
55 Some(CmpOrdering::Less) => return false,
56 Some(CmpOrdering::Equal) if !min_inclusive => return false,
57 None => return false, _ => {}
59 }
60 }
61
62 if let Some(max_val) = max {
64 match compare_values_for_range(value, max_val) {
65 Some(CmpOrdering::Greater) => return false,
66 Some(CmpOrdering::Equal) if !max_inclusive => return false,
67 None => return false,
68 _ => {}
69 }
70 }
71
72 true
73}
74
75#[cfg(feature = "tiered-storage")]
77use crate::storage::EpochStore;
78#[cfg(feature = "tiered-storage")]
79use grafeo_common::memory::arena::ArenaAllocator;
80#[cfg(feature = "tiered-storage")]
81use grafeo_common::mvcc::{ColdVersionRef, HotVersionRef, VersionIndex, VersionRef};
82
83#[derive(Debug, Clone)]
89pub struct LpgStoreConfig {
90 pub backward_edges: bool,
93 pub initial_node_capacity: usize,
95 pub initial_edge_capacity: usize,
97}
98
99impl Default for LpgStoreConfig {
100 fn default() -> Self {
101 Self {
102 backward_edges: true,
103 initial_node_capacity: 1024,
104 initial_edge_capacity: 4096,
105 }
106 }
107}
108
109pub struct LpgStore {
168 #[allow(dead_code)]
170 config: LpgStoreConfig,
171
172 #[cfg(not(feature = "tiered-storage"))]
176 nodes: RwLock<FxHashMap<NodeId, VersionChain<NodeRecord>>>,
177
178 #[cfg(not(feature = "tiered-storage"))]
182 edges: RwLock<FxHashMap<EdgeId, VersionChain<EdgeRecord>>>,
183
184 #[cfg(feature = "tiered-storage")]
198 arena_allocator: Arc<ArenaAllocator>,
199
200 #[cfg(feature = "tiered-storage")]
204 node_versions: RwLock<FxHashMap<NodeId, VersionIndex>>,
205
206 #[cfg(feature = "tiered-storage")]
210 edge_versions: RwLock<FxHashMap<EdgeId, VersionIndex>>,
211
212 #[cfg(feature = "tiered-storage")]
215 epoch_store: Arc<EpochStore>,
216
217 node_properties: PropertyStorage<NodeId>,
219
220 edge_properties: PropertyStorage<EdgeId>,
222
223 label_to_id: RwLock<FxHashMap<ArcStr, u32>>,
226
227 id_to_label: RwLock<Vec<ArcStr>>,
230
231 edge_type_to_id: RwLock<FxHashMap<ArcStr, u32>>,
234
235 id_to_edge_type: RwLock<Vec<ArcStr>>,
238
239 forward_adj: ChunkedAdjacency,
241
242 backward_adj: Option<ChunkedAdjacency>,
245
246 label_index: RwLock<Vec<FxHashMap<NodeId, ()>>>,
249
250 node_labels: RwLock<FxHashMap<NodeId, FxHashSet<u32>>>,
254
255 property_indexes: RwLock<FxHashMap<PropertyKey, DashMap<HashableValue, FxHashSet<NodeId>>>>,
261
262 #[cfg(feature = "vector-index")]
267 vector_indexes: RwLock<FxHashMap<String, Arc<HnswIndex>>>,
268
269 next_node_id: AtomicU64,
271
272 next_edge_id: AtomicU64,
274
275 current_epoch: AtomicU64,
277
278 statistics: RwLock<Statistics>,
281
282 needs_stats_recompute: AtomicBool,
284}
285
286impl LpgStore {
287 #[must_use]
289 pub fn new() -> Self {
290 Self::with_config(LpgStoreConfig::default())
291 }
292
293 #[must_use]
295 pub fn with_config(config: LpgStoreConfig) -> Self {
296 let backward_adj = if config.backward_edges {
297 Some(ChunkedAdjacency::new())
298 } else {
299 None
300 };
301
302 Self {
303 #[cfg(not(feature = "tiered-storage"))]
304 nodes: RwLock::new(FxHashMap::default()),
305 #[cfg(not(feature = "tiered-storage"))]
306 edges: RwLock::new(FxHashMap::default()),
307 #[cfg(feature = "tiered-storage")]
308 arena_allocator: Arc::new(ArenaAllocator::new()),
309 #[cfg(feature = "tiered-storage")]
310 node_versions: RwLock::new(FxHashMap::default()),
311 #[cfg(feature = "tiered-storage")]
312 edge_versions: RwLock::new(FxHashMap::default()),
313 #[cfg(feature = "tiered-storage")]
314 epoch_store: Arc::new(EpochStore::new()),
315 node_properties: PropertyStorage::new(),
316 edge_properties: PropertyStorage::new(),
317 label_to_id: RwLock::new(FxHashMap::default()),
318 id_to_label: RwLock::new(Vec::new()),
319 edge_type_to_id: RwLock::new(FxHashMap::default()),
320 id_to_edge_type: RwLock::new(Vec::new()),
321 forward_adj: ChunkedAdjacency::new(),
322 backward_adj,
323 label_index: RwLock::new(Vec::new()),
324 node_labels: RwLock::new(FxHashMap::default()),
325 property_indexes: RwLock::new(FxHashMap::default()),
326 #[cfg(feature = "vector-index")]
327 vector_indexes: RwLock::new(FxHashMap::default()),
328 next_node_id: AtomicU64::new(0),
329 next_edge_id: AtomicU64::new(0),
330 current_epoch: AtomicU64::new(0),
331 statistics: RwLock::new(Statistics::new()),
332 needs_stats_recompute: AtomicBool::new(true),
333 config,
334 }
335 }
336
337 #[must_use]
339 pub fn current_epoch(&self) -> EpochId {
340 EpochId::new(self.current_epoch.load(Ordering::Acquire))
341 }
342
343 pub fn new_epoch(&self) -> EpochId {
345 let id = self.current_epoch.fetch_add(1, Ordering::AcqRel) + 1;
346 EpochId::new(id)
347 }
348
349 pub fn create_node(&self, labels: &[&str]) -> NodeId {
355 self.needs_stats_recompute.store(true, Ordering::Relaxed);
356 self.create_node_versioned(labels, self.current_epoch(), TxId::SYSTEM)
357 }
358
359 #[cfg(not(feature = "tiered-storage"))]
361 pub fn create_node_versioned(&self, labels: &[&str], epoch: EpochId, tx_id: TxId) -> NodeId {
362 let id = NodeId::new(self.next_node_id.fetch_add(1, Ordering::Relaxed));
363
364 let mut record = NodeRecord::new(id, epoch);
365 record.set_label_count(labels.len() as u16);
366
367 let mut node_label_set = FxHashSet::default();
369 for label in labels {
370 let label_id = self.get_or_create_label_id(*label);
371 node_label_set.insert(label_id);
372
373 let mut index = self.label_index.write();
375 while index.len() <= label_id as usize {
376 index.push(FxHashMap::default());
377 }
378 index[label_id as usize].insert(id, ());
379 }
380
381 self.node_labels.write().insert(id, node_label_set);
383
384 let chain = VersionChain::with_initial(record, epoch, tx_id);
386 self.nodes.write().insert(id, chain);
387 id
388 }
389
390 #[cfg(feature = "tiered-storage")]
393 pub fn create_node_versioned(&self, labels: &[&str], epoch: EpochId, tx_id: TxId) -> NodeId {
394 let id = NodeId::new(self.next_node_id.fetch_add(1, Ordering::Relaxed));
395
396 let mut record = NodeRecord::new(id, epoch);
397 record.set_label_count(labels.len() as u16);
398
399 let mut node_label_set = FxHashSet::default();
401 for label in labels {
402 let label_id = self.get_or_create_label_id(*label);
403 node_label_set.insert(label_id);
404
405 let mut index = self.label_index.write();
407 while index.len() <= label_id as usize {
408 index.push(FxHashMap::default());
409 }
410 index[label_id as usize].insert(id, ());
411 }
412
413 self.node_labels.write().insert(id, node_label_set);
415
416 let arena = self.arena_allocator.arena_or_create(epoch);
418 let (offset, _stored) = arena.alloc_value_with_offset(record);
419
420 let hot_ref = HotVersionRef::new(epoch, offset, tx_id);
422
423 let mut versions = self.node_versions.write();
425 if let Some(index) = versions.get_mut(&id) {
426 index.add_hot(hot_ref);
427 } else {
428 versions.insert(id, VersionIndex::with_initial(hot_ref));
429 }
430
431 id
432 }
433
434 pub fn create_node_with_props(
436 &self,
437 labels: &[&str],
438 properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
439 ) -> NodeId {
440 self.create_node_with_props_versioned(
441 labels,
442 properties,
443 self.current_epoch(),
444 TxId::SYSTEM,
445 )
446 }
447
448 #[cfg(not(feature = "tiered-storage"))]
450 pub fn create_node_with_props_versioned(
451 &self,
452 labels: &[&str],
453 properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
454 epoch: EpochId,
455 tx_id: TxId,
456 ) -> NodeId {
457 let id = self.create_node_versioned(labels, epoch, tx_id);
458
459 for (key, value) in properties {
460 let prop_key: PropertyKey = key.into();
461 let prop_value: Value = value.into();
462 self.update_property_index_on_set(id, &prop_key, &prop_value);
464 self.node_properties.set(id, prop_key, prop_value);
465 }
466
467 let count = self.node_properties.get_all(id).len() as u16;
469 if let Some(chain) = self.nodes.write().get_mut(&id)
470 && let Some(record) = chain.latest_mut()
471 {
472 record.props_count = count;
473 }
474
475 id
476 }
477
478 #[cfg(feature = "tiered-storage")]
481 pub fn create_node_with_props_versioned(
482 &self,
483 labels: &[&str],
484 properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
485 epoch: EpochId,
486 tx_id: TxId,
487 ) -> NodeId {
488 let id = self.create_node_versioned(labels, epoch, tx_id);
489
490 for (key, value) in properties {
491 let prop_key: PropertyKey = key.into();
492 let prop_value: Value = value.into();
493 self.update_property_index_on_set(id, &prop_key, &prop_value);
495 self.node_properties.set(id, prop_key, prop_value);
496 }
497
498 id
502 }
503
504 #[must_use]
506 pub fn get_node(&self, id: NodeId) -> Option<Node> {
507 self.get_node_at_epoch(id, self.current_epoch())
508 }
509
510 #[must_use]
512 #[cfg(not(feature = "tiered-storage"))]
513 pub fn get_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> Option<Node> {
514 let nodes = self.nodes.read();
515 let chain = nodes.get(&id)?;
516 let record = chain.visible_at(epoch)?;
517
518 if record.is_deleted() {
519 return None;
520 }
521
522 let mut node = Node::new(id);
523
524 let id_to_label = self.id_to_label.read();
526 let node_labels = self.node_labels.read();
527 if let Some(label_ids) = node_labels.get(&id) {
528 for &label_id in label_ids {
529 if let Some(label) = id_to_label.get(label_id as usize) {
530 node.labels.push(label.clone());
531 }
532 }
533 }
534
535 node.properties = self.node_properties.get_all(id).into_iter().collect();
537
538 Some(node)
539 }
540
541 #[must_use]
544 #[cfg(feature = "tiered-storage")]
545 pub fn get_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> Option<Node> {
546 let versions = self.node_versions.read();
547 let index = versions.get(&id)?;
548 let version_ref = index.visible_at(epoch)?;
549
550 let record = self.read_node_record(&version_ref)?;
552
553 if record.is_deleted() {
554 return None;
555 }
556
557 let mut node = Node::new(id);
558
559 let id_to_label = self.id_to_label.read();
561 let node_labels = self.node_labels.read();
562 if let Some(label_ids) = node_labels.get(&id) {
563 for &label_id in label_ids {
564 if let Some(label) = id_to_label.get(label_id as usize) {
565 node.labels.push(label.clone());
566 }
567 }
568 }
569
570 node.properties = self.node_properties.get_all(id).into_iter().collect();
572
573 Some(node)
574 }
575
576 #[must_use]
578 #[cfg(not(feature = "tiered-storage"))]
579 pub fn get_node_versioned(&self, id: NodeId, epoch: EpochId, tx_id: TxId) -> Option<Node> {
580 let nodes = self.nodes.read();
581 let chain = nodes.get(&id)?;
582 let record = chain.visible_to(epoch, tx_id)?;
583
584 if record.is_deleted() {
585 return None;
586 }
587
588 let mut node = Node::new(id);
589
590 let id_to_label = self.id_to_label.read();
592 let node_labels = self.node_labels.read();
593 if let Some(label_ids) = node_labels.get(&id) {
594 for &label_id in label_ids {
595 if let Some(label) = id_to_label.get(label_id as usize) {
596 node.labels.push(label.clone());
597 }
598 }
599 }
600
601 node.properties = self.node_properties.get_all(id).into_iter().collect();
603
604 Some(node)
605 }
606
607 #[must_use]
610 #[cfg(feature = "tiered-storage")]
611 pub fn get_node_versioned(&self, id: NodeId, epoch: EpochId, tx_id: TxId) -> Option<Node> {
612 let versions = self.node_versions.read();
613 let index = versions.get(&id)?;
614 let version_ref = index.visible_to(epoch, tx_id)?;
615
616 let record = self.read_node_record(&version_ref)?;
618
619 if record.is_deleted() {
620 return None;
621 }
622
623 let mut node = Node::new(id);
624
625 let id_to_label = self.id_to_label.read();
627 let node_labels = self.node_labels.read();
628 if let Some(label_ids) = node_labels.get(&id) {
629 for &label_id in label_ids {
630 if let Some(label) = id_to_label.get(label_id as usize) {
631 node.labels.push(label.clone());
632 }
633 }
634 }
635
636 node.properties = self.node_properties.get_all(id).into_iter().collect();
638
639 Some(node)
640 }
641
642 #[cfg(feature = "tiered-storage")]
644 #[allow(unsafe_code)]
645 fn read_node_record(&self, version_ref: &VersionRef) -> Option<NodeRecord> {
646 match version_ref {
647 VersionRef::Hot(hot_ref) => {
648 let arena = self.arena_allocator.arena(hot_ref.epoch);
649 let record: &NodeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
651 Some(*record)
652 }
653 VersionRef::Cold(cold_ref) => {
654 self.epoch_store
656 .get_node(cold_ref.epoch, cold_ref.block_offset, cold_ref.length)
657 }
658 }
659 }
660
661 pub fn delete_node(&self, id: NodeId) -> bool {
663 self.needs_stats_recompute.store(true, Ordering::Relaxed);
664 self.delete_node_at_epoch(id, self.current_epoch())
665 }
666
667 #[cfg(not(feature = "tiered-storage"))]
669 pub fn delete_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> bool {
670 let mut nodes = self.nodes.write();
671 if let Some(chain) = nodes.get_mut(&id) {
672 if let Some(record) = chain.visible_at(epoch) {
674 if record.is_deleted() {
675 return false;
676 }
677 } else {
678 return false;
680 }
681
682 chain.mark_deleted(epoch);
684
685 let mut index = self.label_index.write();
687 let mut node_labels = self.node_labels.write();
688 if let Some(label_ids) = node_labels.remove(&id) {
689 for label_id in label_ids {
690 if let Some(set) = index.get_mut(label_id as usize) {
691 set.remove(&id);
692 }
693 }
694 }
695
696 drop(nodes); drop(index);
699 drop(node_labels);
700 self.node_properties.remove_all(id);
701
702 true
705 } else {
706 false
707 }
708 }
709
710 #[cfg(feature = "tiered-storage")]
713 pub fn delete_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> bool {
714 let mut versions = self.node_versions.write();
715 if let Some(index) = versions.get_mut(&id) {
716 if let Some(version_ref) = index.visible_at(epoch) {
718 if let Some(record) = self.read_node_record(&version_ref) {
719 if record.is_deleted() {
720 return false;
721 }
722 } else {
723 return false;
724 }
725 } else {
726 return false;
727 }
728
729 index.mark_deleted(epoch);
731
732 let mut label_index = self.label_index.write();
734 let mut node_labels = self.node_labels.write();
735 if let Some(label_ids) = node_labels.remove(&id) {
736 for label_id in label_ids {
737 if let Some(set) = label_index.get_mut(label_id as usize) {
738 set.remove(&id);
739 }
740 }
741 }
742
743 drop(versions);
745 drop(label_index);
746 drop(node_labels);
747 self.node_properties.remove_all(id);
748
749 true
750 } else {
751 false
752 }
753 }
754
755 #[cfg(not(feature = "tiered-storage"))]
760 pub fn delete_node_edges(&self, node_id: NodeId) {
761 let outgoing: Vec<EdgeId> = self
763 .forward_adj
764 .edges_from(node_id)
765 .into_iter()
766 .map(|(_, edge_id)| edge_id)
767 .collect();
768
769 let incoming: Vec<EdgeId> = if let Some(ref backward) = self.backward_adj {
771 backward
772 .edges_from(node_id)
773 .into_iter()
774 .map(|(_, edge_id)| edge_id)
775 .collect()
776 } else {
777 let epoch = self.current_epoch();
779 self.edges
780 .read()
781 .iter()
782 .filter_map(|(id, chain)| {
783 chain.visible_at(epoch).and_then(|r| {
784 if !r.is_deleted() && r.dst == node_id {
785 Some(*id)
786 } else {
787 None
788 }
789 })
790 })
791 .collect()
792 };
793
794 for edge_id in outgoing.into_iter().chain(incoming) {
796 self.delete_edge(edge_id);
797 }
798 }
799
800 #[cfg(feature = "tiered-storage")]
803 pub fn delete_node_edges(&self, node_id: NodeId) {
804 let outgoing: Vec<EdgeId> = self
806 .forward_adj
807 .edges_from(node_id)
808 .into_iter()
809 .map(|(_, edge_id)| edge_id)
810 .collect();
811
812 let incoming: Vec<EdgeId> = if let Some(ref backward) = self.backward_adj {
814 backward
815 .edges_from(node_id)
816 .into_iter()
817 .map(|(_, edge_id)| edge_id)
818 .collect()
819 } else {
820 let epoch = self.current_epoch();
822 let versions = self.edge_versions.read();
823 versions
824 .iter()
825 .filter_map(|(id, index)| {
826 index.visible_at(epoch).and_then(|vref| {
827 self.read_edge_record(&vref).and_then(|r| {
828 if !r.is_deleted() && r.dst == node_id {
829 Some(*id)
830 } else {
831 None
832 }
833 })
834 })
835 })
836 .collect()
837 };
838
839 for edge_id in outgoing.into_iter().chain(incoming) {
841 self.delete_edge(edge_id);
842 }
843 }
844
845 #[cfg(not(feature = "tiered-storage"))]
847 pub fn set_node_property(&self, id: NodeId, key: &str, value: Value) {
848 let prop_key: PropertyKey = key.into();
849
850 self.update_property_index_on_set(id, &prop_key, &value);
852
853 self.node_properties.set(id, prop_key, value);
854
855 let count = self.node_properties.get_all(id).len() as u16;
857 if let Some(chain) = self.nodes.write().get_mut(&id)
858 && let Some(record) = chain.latest_mut()
859 {
860 record.props_count = count;
861 }
862 }
863
864 #[cfg(feature = "tiered-storage")]
867 pub fn set_node_property(&self, id: NodeId, key: &str, value: Value) {
868 let prop_key: PropertyKey = key.into();
869
870 self.update_property_index_on_set(id, &prop_key, &value);
872
873 self.node_properties.set(id, prop_key, value);
874 }
878
879 pub fn set_edge_property(&self, id: EdgeId, key: &str, value: Value) {
881 self.edge_properties.set(id, key.into(), value);
882 }
883
884 #[cfg(not(feature = "tiered-storage"))]
888 pub fn remove_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
889 let prop_key: PropertyKey = key.into();
890
891 self.update_property_index_on_remove(id, &prop_key);
893
894 let result = self.node_properties.remove(id, &prop_key);
895
896 let count = self.node_properties.get_all(id).len() as u16;
898 if let Some(chain) = self.nodes.write().get_mut(&id)
899 && let Some(record) = chain.latest_mut()
900 {
901 record.props_count = count;
902 }
903
904 result
905 }
906
907 #[cfg(feature = "tiered-storage")]
910 pub fn remove_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
911 let prop_key: PropertyKey = key.into();
912
913 self.update_property_index_on_remove(id, &prop_key);
915
916 self.node_properties.remove(id, &prop_key)
917 }
919
920 pub fn remove_edge_property(&self, id: EdgeId, key: &str) -> Option<Value> {
924 self.edge_properties.remove(id, &key.into())
925 }
926
927 #[must_use]
942 pub fn get_node_property(&self, id: NodeId, key: &PropertyKey) -> Option<Value> {
943 self.node_properties.get(id, key)
944 }
945
946 #[must_use]
950 pub fn get_edge_property(&self, id: EdgeId, key: &PropertyKey) -> Option<Value> {
951 self.edge_properties.get(id, key)
952 }
953
954 #[must_use]
977 pub fn get_node_property_batch(&self, ids: &[NodeId], key: &PropertyKey) -> Vec<Option<Value>> {
978 self.node_properties.get_batch(ids, key)
979 }
980
981 #[must_use]
986 pub fn get_nodes_properties_batch(&self, ids: &[NodeId]) -> Vec<FxHashMap<PropertyKey, Value>> {
987 self.node_properties.get_all_batch(ids)
988 }
989
990 #[must_use]
1018 pub fn get_nodes_properties_selective_batch(
1019 &self,
1020 ids: &[NodeId],
1021 keys: &[PropertyKey],
1022 ) -> Vec<FxHashMap<PropertyKey, Value>> {
1023 self.node_properties.get_selective_batch(ids, keys)
1024 }
1025
1026 #[must_use]
1030 pub fn get_edges_properties_selective_batch(
1031 &self,
1032 ids: &[EdgeId],
1033 keys: &[PropertyKey],
1034 ) -> Vec<FxHashMap<PropertyKey, Value>> {
1035 self.edge_properties.get_selective_batch(ids, keys)
1036 }
1037
1038 #[must_use]
1074 pub fn find_nodes_in_range(
1075 &self,
1076 property: &str,
1077 min: Option<&Value>,
1078 max: Option<&Value>,
1079 min_inclusive: bool,
1080 max_inclusive: bool,
1081 ) -> Vec<NodeId> {
1082 let key = PropertyKey::new(property);
1083
1084 if !self
1086 .node_properties
1087 .might_match_range(&key, min, max, min_inclusive, max_inclusive)
1088 {
1089 return Vec::new();
1090 }
1091
1092 self.node_ids()
1094 .into_iter()
1095 .filter(|&node_id| {
1096 self.node_properties
1097 .get(node_id, &key)
1098 .is_some_and(|v| value_in_range(&v, min, max, min_inclusive, max_inclusive))
1099 })
1100 .collect()
1101 }
1102
1103 #[must_use]
1128 pub fn find_nodes_by_properties(&self, conditions: &[(&str, Value)]) -> Vec<NodeId> {
1129 if conditions.is_empty() {
1130 return self.node_ids();
1131 }
1132
1133 let mut best_start: Option<(usize, Vec<NodeId>)> = None;
1136 let indexes = self.property_indexes.read();
1137
1138 for (i, (prop, value)) in conditions.iter().enumerate() {
1139 let key = PropertyKey::new(*prop);
1140 let hv = HashableValue::new(value.clone());
1141
1142 if let Some(index) = indexes.get(&key) {
1143 let matches: Vec<NodeId> = index
1144 .get(&hv)
1145 .map(|nodes| nodes.iter().copied().collect())
1146 .unwrap_or_default();
1147
1148 if matches.is_empty() {
1150 return Vec::new();
1151 }
1152
1153 if best_start
1155 .as_ref()
1156 .is_none_or(|(_, best)| matches.len() < best.len())
1157 {
1158 best_start = Some((i, matches));
1159 }
1160 }
1161 }
1162 drop(indexes);
1163
1164 let (start_idx, mut candidates) = best_start.unwrap_or_else(|| {
1166 let (prop, value) = &conditions[0];
1168 (0, self.find_nodes_by_property(prop, value))
1169 });
1170
1171 for (i, (prop, value)) in conditions.iter().enumerate() {
1173 if i == start_idx {
1174 continue;
1175 }
1176
1177 let key = PropertyKey::new(*prop);
1178 candidates.retain(|&node_id| {
1179 self.node_properties
1180 .get(node_id, &key)
1181 .is_some_and(|v| v == *value)
1182 });
1183
1184 if candidates.is_empty() {
1186 return Vec::new();
1187 }
1188 }
1189
1190 candidates
1191 }
1192
1193 pub fn create_property_index(&self, property: &str) {
1221 let key = PropertyKey::new(property);
1222
1223 let mut indexes = self.property_indexes.write();
1224 if indexes.contains_key(&key) {
1225 return; }
1227
1228 let index: DashMap<HashableValue, FxHashSet<NodeId>> = DashMap::new();
1230
1231 for node_id in self.node_ids() {
1233 if let Some(value) = self.node_properties.get(node_id, &key) {
1234 let hv = HashableValue::new(value);
1235 index.entry(hv).or_default().insert(node_id);
1236 }
1237 }
1238
1239 indexes.insert(key, index);
1240 }
1241
1242 pub fn drop_property_index(&self, property: &str) -> bool {
1246 let key = PropertyKey::new(property);
1247 self.property_indexes.write().remove(&key).is_some()
1248 }
1249
1250 #[must_use]
1252 pub fn has_property_index(&self, property: &str) -> bool {
1253 let key = PropertyKey::new(property);
1254 self.property_indexes.read().contains_key(&key)
1255 }
1256
1257 #[cfg(feature = "vector-index")]
1259 pub fn add_vector_index(&self, label: &str, property: &str, index: Arc<HnswIndex>) {
1260 let key = format!("{label}:{property}");
1261 self.vector_indexes.write().insert(key, index);
1262 }
1263
1264 #[cfg(feature = "vector-index")]
1266 #[must_use]
1267 pub fn get_vector_index(&self, label: &str, property: &str) -> Option<Arc<HnswIndex>> {
1268 let key = format!("{label}:{property}");
1269 self.vector_indexes.read().get(&key).cloned()
1270 }
1271
1272 #[cfg(feature = "vector-index")]
1276 pub fn remove_vector_index(&self, label: &str, property: &str) -> bool {
1277 let key = format!("{label}:{property}");
1278 self.vector_indexes.write().remove(&key).is_some()
1279 }
1280
1281 #[cfg(feature = "vector-index")]
1285 #[must_use]
1286 pub fn vector_index_entries(&self) -> Vec<(String, Arc<HnswIndex>)> {
1287 self.vector_indexes
1288 .read()
1289 .iter()
1290 .map(|(k, v)| (k.clone(), v.clone()))
1291 .collect()
1292 }
1293
1294 #[must_use]
1317 pub fn find_nodes_by_property(&self, property: &str, value: &Value) -> Vec<NodeId> {
1318 let key = PropertyKey::new(property);
1319 let hv = HashableValue::new(value.clone());
1320
1321 let indexes = self.property_indexes.read();
1323 if let Some(index) = indexes.get(&key) {
1324 if let Some(nodes) = index.get(&hv) {
1325 return nodes.iter().copied().collect();
1326 }
1327 return Vec::new();
1328 }
1329 drop(indexes);
1330
1331 self.node_ids()
1333 .into_iter()
1334 .filter(|&node_id| {
1335 self.node_properties
1336 .get(node_id, &key)
1337 .is_some_and(|v| v == *value)
1338 })
1339 .collect()
1340 }
1341
1342 fn update_property_index_on_set(&self, node_id: NodeId, key: &PropertyKey, new_value: &Value) {
1344 let indexes = self.property_indexes.read();
1345 if let Some(index) = indexes.get(key) {
1346 if let Some(old_value) = self.node_properties.get(node_id, key) {
1348 let old_hv = HashableValue::new(old_value);
1349 if let Some(mut nodes) = index.get_mut(&old_hv) {
1350 nodes.remove(&node_id);
1351 if nodes.is_empty() {
1352 drop(nodes);
1353 index.remove(&old_hv);
1354 }
1355 }
1356 }
1357
1358 let new_hv = HashableValue::new(new_value.clone());
1360 index
1361 .entry(new_hv)
1362 .or_insert_with(FxHashSet::default)
1363 .insert(node_id);
1364 }
1365 }
1366
1367 fn update_property_index_on_remove(&self, node_id: NodeId, key: &PropertyKey) {
1369 let indexes = self.property_indexes.read();
1370 if let Some(index) = indexes.get(key) {
1371 if let Some(old_value) = self.node_properties.get(node_id, key) {
1373 let old_hv = HashableValue::new(old_value);
1374 if let Some(mut nodes) = index.get_mut(&old_hv) {
1375 nodes.remove(&node_id);
1376 if nodes.is_empty() {
1377 drop(nodes);
1378 index.remove(&old_hv);
1379 }
1380 }
1381 }
1382 }
1383 }
1384
1385 #[cfg(not(feature = "tiered-storage"))]
1390 pub fn add_label(&self, node_id: NodeId, label: &str) -> bool {
1391 let epoch = self.current_epoch();
1392
1393 let nodes = self.nodes.read();
1395 if let Some(chain) = nodes.get(&node_id) {
1396 if chain.visible_at(epoch).map_or(true, |r| r.is_deleted()) {
1397 return false;
1398 }
1399 } else {
1400 return false;
1401 }
1402 drop(nodes);
1403
1404 let label_id = self.get_or_create_label_id(label);
1406
1407 let mut node_labels = self.node_labels.write();
1409 let label_set = node_labels.entry(node_id).or_default();
1410
1411 if label_set.contains(&label_id) {
1412 return false; }
1414
1415 label_set.insert(label_id);
1416 drop(node_labels);
1417
1418 let mut index = self.label_index.write();
1420 if (label_id as usize) >= index.len() {
1421 index.resize(label_id as usize + 1, FxHashMap::default());
1422 }
1423 index[label_id as usize].insert(node_id, ());
1424
1425 if let Some(chain) = self.nodes.write().get_mut(&node_id)
1427 && let Some(record) = chain.latest_mut()
1428 {
1429 let count = self.node_labels.read().get(&node_id).map_or(0, |s| s.len());
1430 record.set_label_count(count as u16);
1431 }
1432
1433 true
1434 }
1435
1436 #[cfg(feature = "tiered-storage")]
1439 pub fn add_label(&self, node_id: NodeId, label: &str) -> bool {
1440 let epoch = self.current_epoch();
1441
1442 let versions = self.node_versions.read();
1444 if let Some(index) = versions.get(&node_id) {
1445 if let Some(vref) = index.visible_at(epoch) {
1446 if let Some(record) = self.read_node_record(&vref) {
1447 if record.is_deleted() {
1448 return false;
1449 }
1450 } else {
1451 return false;
1452 }
1453 } else {
1454 return false;
1455 }
1456 } else {
1457 return false;
1458 }
1459 drop(versions);
1460
1461 let label_id = self.get_or_create_label_id(label);
1463
1464 let mut node_labels = self.node_labels.write();
1466 let label_set = node_labels.entry(node_id).or_default();
1467
1468 if label_set.contains(&label_id) {
1469 return false; }
1471
1472 label_set.insert(label_id);
1473 drop(node_labels);
1474
1475 let mut index = self.label_index.write();
1477 if (label_id as usize) >= index.len() {
1478 index.resize(label_id as usize + 1, FxHashMap::default());
1479 }
1480 index[label_id as usize].insert(node_id, ());
1481
1482 true
1486 }
1487
1488 #[cfg(not(feature = "tiered-storage"))]
1493 pub fn remove_label(&self, node_id: NodeId, label: &str) -> bool {
1494 let epoch = self.current_epoch();
1495
1496 let nodes = self.nodes.read();
1498 if let Some(chain) = nodes.get(&node_id) {
1499 if chain.visible_at(epoch).map_or(true, |r| r.is_deleted()) {
1500 return false;
1501 }
1502 } else {
1503 return false;
1504 }
1505 drop(nodes);
1506
1507 let label_id = {
1509 let label_ids = self.label_to_id.read();
1510 match label_ids.get(label) {
1511 Some(&id) => id,
1512 None => return false, }
1514 };
1515
1516 let mut node_labels = self.node_labels.write();
1518 if let Some(label_set) = node_labels.get_mut(&node_id) {
1519 if !label_set.remove(&label_id) {
1520 return false; }
1522 } else {
1523 return false;
1524 }
1525 drop(node_labels);
1526
1527 let mut index = self.label_index.write();
1529 if (label_id as usize) < index.len() {
1530 index[label_id as usize].remove(&node_id);
1531 }
1532
1533 if let Some(chain) = self.nodes.write().get_mut(&node_id)
1535 && let Some(record) = chain.latest_mut()
1536 {
1537 let count = self.node_labels.read().get(&node_id).map_or(0, |s| s.len());
1538 record.set_label_count(count as u16);
1539 }
1540
1541 true
1542 }
1543
1544 #[cfg(feature = "tiered-storage")]
1547 pub fn remove_label(&self, node_id: NodeId, label: &str) -> bool {
1548 let epoch = self.current_epoch();
1549
1550 let versions = self.node_versions.read();
1552 if let Some(index) = versions.get(&node_id) {
1553 if let Some(vref) = index.visible_at(epoch) {
1554 if let Some(record) = self.read_node_record(&vref) {
1555 if record.is_deleted() {
1556 return false;
1557 }
1558 } else {
1559 return false;
1560 }
1561 } else {
1562 return false;
1563 }
1564 } else {
1565 return false;
1566 }
1567 drop(versions);
1568
1569 let label_id = {
1571 let label_ids = self.label_to_id.read();
1572 match label_ids.get(label) {
1573 Some(&id) => id,
1574 None => return false, }
1576 };
1577
1578 let mut node_labels = self.node_labels.write();
1580 if let Some(label_set) = node_labels.get_mut(&node_id) {
1581 if !label_set.remove(&label_id) {
1582 return false; }
1584 } else {
1585 return false;
1586 }
1587 drop(node_labels);
1588
1589 let mut index = self.label_index.write();
1591 if (label_id as usize) < index.len() {
1592 index[label_id as usize].remove(&node_id);
1593 }
1594
1595 true
1598 }
1599
1600 #[must_use]
1602 #[cfg(not(feature = "tiered-storage"))]
1603 pub fn node_count(&self) -> usize {
1604 let epoch = self.current_epoch();
1605 self.nodes
1606 .read()
1607 .values()
1608 .filter_map(|chain| chain.visible_at(epoch))
1609 .filter(|r| !r.is_deleted())
1610 .count()
1611 }
1612
1613 #[must_use]
1616 #[cfg(feature = "tiered-storage")]
1617 pub fn node_count(&self) -> usize {
1618 let epoch = self.current_epoch();
1619 let versions = self.node_versions.read();
1620 versions
1621 .iter()
1622 .filter(|(_, index)| {
1623 index.visible_at(epoch).map_or(false, |vref| {
1624 self.read_node_record(&vref)
1625 .map_or(false, |r| !r.is_deleted())
1626 })
1627 })
1628 .count()
1629 }
1630
1631 #[must_use]
1637 #[cfg(not(feature = "tiered-storage"))]
1638 pub fn node_ids(&self) -> Vec<NodeId> {
1639 let epoch = self.current_epoch();
1640 let mut ids: Vec<NodeId> = self
1641 .nodes
1642 .read()
1643 .iter()
1644 .filter_map(|(id, chain)| {
1645 chain
1646 .visible_at(epoch)
1647 .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
1648 })
1649 .collect();
1650 ids.sort_unstable();
1651 ids
1652 }
1653
1654 #[must_use]
1657 #[cfg(feature = "tiered-storage")]
1658 pub fn node_ids(&self) -> Vec<NodeId> {
1659 let epoch = self.current_epoch();
1660 let versions = self.node_versions.read();
1661 let mut ids: Vec<NodeId> = versions
1662 .iter()
1663 .filter_map(|(id, index)| {
1664 index.visible_at(epoch).and_then(|vref| {
1665 self.read_node_record(&vref)
1666 .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
1667 })
1668 })
1669 .collect();
1670 ids.sort_unstable();
1671 ids
1672 }
1673
1674 pub fn create_edge(&self, src: NodeId, dst: NodeId, edge_type: &str) -> EdgeId {
1678 self.needs_stats_recompute.store(true, Ordering::Relaxed);
1679 self.create_edge_versioned(src, dst, edge_type, self.current_epoch(), TxId::SYSTEM)
1680 }
1681
1682 #[cfg(not(feature = "tiered-storage"))]
1684 pub fn create_edge_versioned(
1685 &self,
1686 src: NodeId,
1687 dst: NodeId,
1688 edge_type: &str,
1689 epoch: EpochId,
1690 tx_id: TxId,
1691 ) -> EdgeId {
1692 let id = EdgeId::new(self.next_edge_id.fetch_add(1, Ordering::Relaxed));
1693 let type_id = self.get_or_create_edge_type_id(edge_type);
1694
1695 let record = EdgeRecord::new(id, src, dst, type_id, epoch);
1696 let chain = VersionChain::with_initial(record, epoch, tx_id);
1697 self.edges.write().insert(id, chain);
1698
1699 self.forward_adj.add_edge(src, dst, id);
1701 if let Some(ref backward) = self.backward_adj {
1702 backward.add_edge(dst, src, id);
1703 }
1704
1705 id
1706 }
1707
1708 #[cfg(feature = "tiered-storage")]
1711 pub fn create_edge_versioned(
1712 &self,
1713 src: NodeId,
1714 dst: NodeId,
1715 edge_type: &str,
1716 epoch: EpochId,
1717 tx_id: TxId,
1718 ) -> EdgeId {
1719 let id = EdgeId::new(self.next_edge_id.fetch_add(1, Ordering::Relaxed));
1720 let type_id = self.get_or_create_edge_type_id(edge_type);
1721
1722 let record = EdgeRecord::new(id, src, dst, type_id, epoch);
1723
1724 let arena = self.arena_allocator.arena_or_create(epoch);
1726 let (offset, _stored) = arena.alloc_value_with_offset(record);
1727
1728 let hot_ref = HotVersionRef::new(epoch, offset, tx_id);
1730
1731 let mut versions = self.edge_versions.write();
1733 if let Some(index) = versions.get_mut(&id) {
1734 index.add_hot(hot_ref);
1735 } else {
1736 versions.insert(id, VersionIndex::with_initial(hot_ref));
1737 }
1738
1739 self.forward_adj.add_edge(src, dst, id);
1741 if let Some(ref backward) = self.backward_adj {
1742 backward.add_edge(dst, src, id);
1743 }
1744
1745 id
1746 }
1747
1748 pub fn create_edge_with_props(
1750 &self,
1751 src: NodeId,
1752 dst: NodeId,
1753 edge_type: &str,
1754 properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
1755 ) -> EdgeId {
1756 let id = self.create_edge(src, dst, edge_type);
1757
1758 for (key, value) in properties {
1759 self.edge_properties.set(id, key.into(), value.into());
1760 }
1761
1762 id
1763 }
1764
1765 #[must_use]
1767 pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
1768 self.get_edge_at_epoch(id, self.current_epoch())
1769 }
1770
1771 #[must_use]
1773 #[cfg(not(feature = "tiered-storage"))]
1774 pub fn get_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> Option<Edge> {
1775 let edges = self.edges.read();
1776 let chain = edges.get(&id)?;
1777 let record = chain.visible_at(epoch)?;
1778
1779 if record.is_deleted() {
1780 return None;
1781 }
1782
1783 let edge_type = {
1784 let id_to_type = self.id_to_edge_type.read();
1785 id_to_type.get(record.type_id as usize)?.clone()
1786 };
1787
1788 let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1789
1790 edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1792
1793 Some(edge)
1794 }
1795
1796 #[must_use]
1799 #[cfg(feature = "tiered-storage")]
1800 pub fn get_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> Option<Edge> {
1801 let versions = self.edge_versions.read();
1802 let index = versions.get(&id)?;
1803 let version_ref = index.visible_at(epoch)?;
1804
1805 let record = self.read_edge_record(&version_ref)?;
1806
1807 if record.is_deleted() {
1808 return None;
1809 }
1810
1811 let edge_type = {
1812 let id_to_type = self.id_to_edge_type.read();
1813 id_to_type.get(record.type_id as usize)?.clone()
1814 };
1815
1816 let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1817
1818 edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1820
1821 Some(edge)
1822 }
1823
1824 #[must_use]
1826 #[cfg(not(feature = "tiered-storage"))]
1827 pub fn get_edge_versioned(&self, id: EdgeId, epoch: EpochId, tx_id: TxId) -> Option<Edge> {
1828 let edges = self.edges.read();
1829 let chain = edges.get(&id)?;
1830 let record = chain.visible_to(epoch, tx_id)?;
1831
1832 if record.is_deleted() {
1833 return None;
1834 }
1835
1836 let edge_type = {
1837 let id_to_type = self.id_to_edge_type.read();
1838 id_to_type.get(record.type_id as usize)?.clone()
1839 };
1840
1841 let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1842
1843 edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1845
1846 Some(edge)
1847 }
1848
1849 #[must_use]
1852 #[cfg(feature = "tiered-storage")]
1853 pub fn get_edge_versioned(&self, id: EdgeId, epoch: EpochId, tx_id: TxId) -> Option<Edge> {
1854 let versions = self.edge_versions.read();
1855 let index = versions.get(&id)?;
1856 let version_ref = index.visible_to(epoch, tx_id)?;
1857
1858 let record = self.read_edge_record(&version_ref)?;
1859
1860 if record.is_deleted() {
1861 return None;
1862 }
1863
1864 let edge_type = {
1865 let id_to_type = self.id_to_edge_type.read();
1866 id_to_type.get(record.type_id as usize)?.clone()
1867 };
1868
1869 let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1870
1871 edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1873
1874 Some(edge)
1875 }
1876
1877 #[cfg(feature = "tiered-storage")]
1879 #[allow(unsafe_code)]
1880 fn read_edge_record(&self, version_ref: &VersionRef) -> Option<EdgeRecord> {
1881 match version_ref {
1882 VersionRef::Hot(hot_ref) => {
1883 let arena = self.arena_allocator.arena(hot_ref.epoch);
1884 let record: &EdgeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
1886 Some(*record)
1887 }
1888 VersionRef::Cold(cold_ref) => {
1889 self.epoch_store
1891 .get_edge(cold_ref.epoch, cold_ref.block_offset, cold_ref.length)
1892 }
1893 }
1894 }
1895
1896 pub fn delete_edge(&self, id: EdgeId) -> bool {
1898 self.needs_stats_recompute.store(true, Ordering::Relaxed);
1899 self.delete_edge_at_epoch(id, self.current_epoch())
1900 }
1901
1902 #[cfg(not(feature = "tiered-storage"))]
1904 pub fn delete_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> bool {
1905 let mut edges = self.edges.write();
1906 if let Some(chain) = edges.get_mut(&id) {
1907 let (src, dst) = {
1909 match chain.visible_at(epoch) {
1910 Some(record) => {
1911 if record.is_deleted() {
1912 return false;
1913 }
1914 (record.src, record.dst)
1915 }
1916 None => return false, }
1918 };
1919
1920 chain.mark_deleted(epoch);
1922
1923 drop(edges); self.forward_adj.mark_deleted(src, id);
1927 if let Some(ref backward) = self.backward_adj {
1928 backward.mark_deleted(dst, id);
1929 }
1930
1931 self.edge_properties.remove_all(id);
1933
1934 true
1935 } else {
1936 false
1937 }
1938 }
1939
1940 #[cfg(feature = "tiered-storage")]
1943 pub fn delete_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> bool {
1944 let mut versions = self.edge_versions.write();
1945 if let Some(index) = versions.get_mut(&id) {
1946 let (src, dst) = {
1948 match index.visible_at(epoch) {
1949 Some(version_ref) => {
1950 if let Some(record) = self.read_edge_record(&version_ref) {
1951 if record.is_deleted() {
1952 return false;
1953 }
1954 (record.src, record.dst)
1955 } else {
1956 return false;
1957 }
1958 }
1959 None => return false,
1960 }
1961 };
1962
1963 index.mark_deleted(epoch);
1965
1966 drop(versions); self.forward_adj.mark_deleted(src, id);
1970 if let Some(ref backward) = self.backward_adj {
1971 backward.mark_deleted(dst, id);
1972 }
1973
1974 self.edge_properties.remove_all(id);
1976
1977 true
1978 } else {
1979 false
1980 }
1981 }
1982
1983 #[must_use]
1985 #[cfg(not(feature = "tiered-storage"))]
1986 pub fn edge_count(&self) -> usize {
1987 let epoch = self.current_epoch();
1988 self.edges
1989 .read()
1990 .values()
1991 .filter_map(|chain| chain.visible_at(epoch))
1992 .filter(|r| !r.is_deleted())
1993 .count()
1994 }
1995
1996 #[must_use]
1999 #[cfg(feature = "tiered-storage")]
2000 pub fn edge_count(&self) -> usize {
2001 let epoch = self.current_epoch();
2002 let versions = self.edge_versions.read();
2003 versions
2004 .iter()
2005 .filter(|(_, index)| {
2006 index.visible_at(epoch).map_or(false, |vref| {
2007 self.read_edge_record(&vref)
2008 .map_or(false, |r| !r.is_deleted())
2009 })
2010 })
2011 .count()
2012 }
2013
2014 #[cfg(not(feature = "tiered-storage"))]
2019 pub fn discard_uncommitted_versions(&self, tx_id: TxId) {
2020 {
2022 let mut nodes = self.nodes.write();
2023 for chain in nodes.values_mut() {
2024 chain.remove_versions_by(tx_id);
2025 }
2026 nodes.retain(|_, chain| !chain.is_empty());
2028 }
2029
2030 {
2032 let mut edges = self.edges.write();
2033 for chain in edges.values_mut() {
2034 chain.remove_versions_by(tx_id);
2035 }
2036 edges.retain(|_, chain| !chain.is_empty());
2038 }
2039 }
2040
2041 #[cfg(feature = "tiered-storage")]
2044 pub fn discard_uncommitted_versions(&self, tx_id: TxId) {
2045 {
2047 let mut versions = self.node_versions.write();
2048 for index in versions.values_mut() {
2049 index.remove_versions_by(tx_id);
2050 }
2051 versions.retain(|_, index| !index.is_empty());
2053 }
2054
2055 {
2057 let mut versions = self.edge_versions.write();
2058 for index in versions.values_mut() {
2059 index.remove_versions_by(tx_id);
2060 }
2061 versions.retain(|_, index| !index.is_empty());
2063 }
2064 }
2065
2066 #[cfg(feature = "tiered-storage")]
2085 #[allow(unsafe_code)]
2086 pub fn freeze_epoch(&self, epoch: EpochId) -> usize {
2087 let mut node_records: Vec<(u64, NodeRecord)> = Vec::new();
2089 let mut node_hot_refs: Vec<(NodeId, HotVersionRef)> = Vec::new();
2090
2091 {
2092 let versions = self.node_versions.read();
2093 for (node_id, index) in versions.iter() {
2094 for hot_ref in index.hot_refs_for_epoch(epoch) {
2095 let arena = self.arena_allocator.arena(hot_ref.epoch);
2096 let record: &NodeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
2098 node_records.push((node_id.as_u64(), *record));
2099 node_hot_refs.push((*node_id, *hot_ref));
2100 }
2101 }
2102 }
2103
2104 let mut edge_records: Vec<(u64, EdgeRecord)> = Vec::new();
2106 let mut edge_hot_refs: Vec<(EdgeId, HotVersionRef)> = Vec::new();
2107
2108 {
2109 let versions = self.edge_versions.read();
2110 for (edge_id, index) in versions.iter() {
2111 for hot_ref in index.hot_refs_for_epoch(epoch) {
2112 let arena = self.arena_allocator.arena(hot_ref.epoch);
2113 let record: &EdgeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
2115 edge_records.push((edge_id.as_u64(), *record));
2116 edge_hot_refs.push((*edge_id, *hot_ref));
2117 }
2118 }
2119 }
2120
2121 let total_frozen = node_records.len() + edge_records.len();
2122
2123 if total_frozen == 0 {
2124 return 0;
2125 }
2126
2127 let (node_entries, edge_entries) =
2129 self.epoch_store
2130 .freeze_epoch(epoch, node_records, edge_records);
2131
2132 let node_entry_map: FxHashMap<u64, _> = node_entries
2134 .iter()
2135 .map(|e| (e.entity_id, (e.offset, e.length)))
2136 .collect();
2137 let edge_entry_map: FxHashMap<u64, _> = edge_entries
2138 .iter()
2139 .map(|e| (e.entity_id, (e.offset, e.length)))
2140 .collect();
2141
2142 {
2144 let mut versions = self.node_versions.write();
2145 for (node_id, hot_ref) in &node_hot_refs {
2146 if let Some(index) = versions.get_mut(node_id)
2147 && let Some(&(offset, length)) = node_entry_map.get(&node_id.as_u64())
2148 {
2149 let cold_ref = ColdVersionRef {
2150 epoch,
2151 block_offset: offset,
2152 length,
2153 created_by: hot_ref.created_by,
2154 deleted_epoch: hot_ref.deleted_epoch,
2155 };
2156 index.freeze_epoch(epoch, std::iter::once(cold_ref));
2157 }
2158 }
2159 }
2160
2161 {
2162 let mut versions = self.edge_versions.write();
2163 for (edge_id, hot_ref) in &edge_hot_refs {
2164 if let Some(index) = versions.get_mut(edge_id)
2165 && let Some(&(offset, length)) = edge_entry_map.get(&edge_id.as_u64())
2166 {
2167 let cold_ref = ColdVersionRef {
2168 epoch,
2169 block_offset: offset,
2170 length,
2171 created_by: hot_ref.created_by,
2172 deleted_epoch: hot_ref.deleted_epoch,
2173 };
2174 index.freeze_epoch(epoch, std::iter::once(cold_ref));
2175 }
2176 }
2177 }
2178
2179 total_frozen
2180 }
2181
2182 #[cfg(feature = "tiered-storage")]
2184 #[must_use]
2185 pub fn epoch_store(&self) -> &EpochStore {
2186 &self.epoch_store
2187 }
2188
2189 #[must_use]
2191 pub fn label_count(&self) -> usize {
2192 self.id_to_label.read().len()
2193 }
2194
2195 #[must_use]
2199 pub fn property_key_count(&self) -> usize {
2200 let node_keys = self.node_properties.column_count();
2201 let edge_keys = self.edge_properties.column_count();
2202 node_keys + edge_keys
2206 }
2207
2208 #[must_use]
2210 pub fn edge_type_count(&self) -> usize {
2211 self.id_to_edge_type.read().len()
2212 }
2213
2214 pub fn neighbors(
2221 &self,
2222 node: NodeId,
2223 direction: Direction,
2224 ) -> impl Iterator<Item = NodeId> + '_ {
2225 let forward: Box<dyn Iterator<Item = NodeId>> = match direction {
2226 Direction::Outgoing | Direction::Both => {
2227 Box::new(self.forward_adj.neighbors(node).into_iter())
2228 }
2229 Direction::Incoming => Box::new(std::iter::empty()),
2230 };
2231
2232 let backward: Box<dyn Iterator<Item = NodeId>> = match direction {
2233 Direction::Incoming | Direction::Both => {
2234 if let Some(ref adj) = self.backward_adj {
2235 Box::new(adj.neighbors(node).into_iter())
2236 } else {
2237 Box::new(std::iter::empty())
2238 }
2239 }
2240 Direction::Outgoing => Box::new(std::iter::empty()),
2241 };
2242
2243 forward.chain(backward)
2244 }
2245
2246 pub fn edges_from(
2250 &self,
2251 node: NodeId,
2252 direction: Direction,
2253 ) -> impl Iterator<Item = (NodeId, EdgeId)> + '_ {
2254 let forward: Box<dyn Iterator<Item = (NodeId, EdgeId)>> = match direction {
2255 Direction::Outgoing | Direction::Both => {
2256 Box::new(self.forward_adj.edges_from(node).into_iter())
2257 }
2258 Direction::Incoming => Box::new(std::iter::empty()),
2259 };
2260
2261 let backward: Box<dyn Iterator<Item = (NodeId, EdgeId)>> = match direction {
2262 Direction::Incoming | Direction::Both => {
2263 if let Some(ref adj) = self.backward_adj {
2264 Box::new(adj.edges_from(node).into_iter())
2265 } else {
2266 Box::new(std::iter::empty())
2267 }
2268 }
2269 Direction::Outgoing => Box::new(std::iter::empty()),
2270 };
2271
2272 forward.chain(backward)
2273 }
2274
2275 pub fn edges_to(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
2288 if let Some(ref backward) = self.backward_adj {
2289 backward.edges_from(node)
2290 } else {
2291 self.all_edges()
2293 .filter_map(|edge| {
2294 if edge.dst == node {
2295 Some((edge.src, edge.id))
2296 } else {
2297 None
2298 }
2299 })
2300 .collect()
2301 }
2302 }
2303
2304 #[must_use]
2308 pub fn out_degree(&self, node: NodeId) -> usize {
2309 self.forward_adj.out_degree(node)
2310 }
2311
2312 #[must_use]
2317 pub fn in_degree(&self, node: NodeId) -> usize {
2318 if let Some(ref backward) = self.backward_adj {
2319 backward.in_degree(node)
2320 } else {
2321 self.all_edges().filter(|edge| edge.dst == node).count()
2323 }
2324 }
2325
2326 #[must_use]
2328 #[cfg(not(feature = "tiered-storage"))]
2329 pub fn edge_type(&self, id: EdgeId) -> Option<ArcStr> {
2330 let edges = self.edges.read();
2331 let chain = edges.get(&id)?;
2332 let epoch = self.current_epoch();
2333 let record = chain.visible_at(epoch)?;
2334 let id_to_type = self.id_to_edge_type.read();
2335 id_to_type.get(record.type_id as usize).cloned()
2336 }
2337
2338 #[must_use]
2341 #[cfg(feature = "tiered-storage")]
2342 pub fn edge_type(&self, id: EdgeId) -> Option<ArcStr> {
2343 let versions = self.edge_versions.read();
2344 let index = versions.get(&id)?;
2345 let epoch = self.current_epoch();
2346 let vref = index.visible_at(epoch)?;
2347 let record = self.read_edge_record(&vref)?;
2348 let id_to_type = self.id_to_edge_type.read();
2349 id_to_type.get(record.type_id as usize).cloned()
2350 }
2351
2352 pub fn nodes_by_label(&self, label: &str) -> Vec<NodeId> {
2358 let label_to_id = self.label_to_id.read();
2359 if let Some(&label_id) = label_to_id.get(label) {
2360 let index = self.label_index.read();
2361 if let Some(set) = index.get(label_id as usize) {
2362 let mut ids: Vec<NodeId> = set.keys().copied().collect();
2363 ids.sort_unstable();
2364 return ids;
2365 }
2366 }
2367 Vec::new()
2368 }
2369
2370 #[cfg(not(feature = "tiered-storage"))]
2377 pub fn all_nodes(&self) -> impl Iterator<Item = Node> + '_ {
2378 let epoch = self.current_epoch();
2379 let node_ids: Vec<NodeId> = self
2380 .nodes
2381 .read()
2382 .iter()
2383 .filter_map(|(id, chain)| {
2384 chain
2385 .visible_at(epoch)
2386 .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
2387 })
2388 .collect();
2389
2390 node_ids.into_iter().filter_map(move |id| self.get_node(id))
2391 }
2392
2393 #[cfg(feature = "tiered-storage")]
2396 pub fn all_nodes(&self) -> impl Iterator<Item = Node> + '_ {
2397 let node_ids = self.node_ids();
2398 node_ids.into_iter().filter_map(move |id| self.get_node(id))
2399 }
2400
2401 #[cfg(not(feature = "tiered-storage"))]
2406 pub fn all_edges(&self) -> impl Iterator<Item = Edge> + '_ {
2407 let epoch = self.current_epoch();
2408 let edge_ids: Vec<EdgeId> = self
2409 .edges
2410 .read()
2411 .iter()
2412 .filter_map(|(id, chain)| {
2413 chain
2414 .visible_at(epoch)
2415 .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
2416 })
2417 .collect();
2418
2419 edge_ids.into_iter().filter_map(move |id| self.get_edge(id))
2420 }
2421
2422 #[cfg(feature = "tiered-storage")]
2425 pub fn all_edges(&self) -> impl Iterator<Item = Edge> + '_ {
2426 let epoch = self.current_epoch();
2427 let versions = self.edge_versions.read();
2428 let edge_ids: Vec<EdgeId> = versions
2429 .iter()
2430 .filter_map(|(id, index)| {
2431 index.visible_at(epoch).and_then(|vref| {
2432 self.read_edge_record(&vref)
2433 .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
2434 })
2435 })
2436 .collect();
2437
2438 edge_ids.into_iter().filter_map(move |id| self.get_edge(id))
2439 }
2440
2441 pub fn all_labels(&self) -> Vec<String> {
2443 self.id_to_label
2444 .read()
2445 .iter()
2446 .map(|s| s.to_string())
2447 .collect()
2448 }
2449
2450 pub fn all_edge_types(&self) -> Vec<String> {
2452 self.id_to_edge_type
2453 .read()
2454 .iter()
2455 .map(|s| s.to_string())
2456 .collect()
2457 }
2458
2459 pub fn all_property_keys(&self) -> Vec<String> {
2461 let mut keys = std::collections::HashSet::new();
2462 for key in self.node_properties.keys() {
2463 keys.insert(key.to_string());
2464 }
2465 for key in self.edge_properties.keys() {
2466 keys.insert(key.to_string());
2467 }
2468 keys.into_iter().collect()
2469 }
2470
2471 pub fn nodes_with_label<'a>(&'a self, label: &str) -> impl Iterator<Item = Node> + 'a {
2473 let node_ids = self.nodes_by_label(label);
2474 node_ids.into_iter().filter_map(move |id| self.get_node(id))
2475 }
2476
2477 #[cfg(not(feature = "tiered-storage"))]
2479 pub fn edges_with_type<'a>(&'a self, edge_type: &str) -> impl Iterator<Item = Edge> + 'a {
2480 let epoch = self.current_epoch();
2481 let type_to_id = self.edge_type_to_id.read();
2482
2483 if let Some(&type_id) = type_to_id.get(edge_type) {
2484 let edge_ids: Vec<EdgeId> = self
2485 .edges
2486 .read()
2487 .iter()
2488 .filter_map(|(id, chain)| {
2489 chain.visible_at(epoch).and_then(|r| {
2490 if !r.is_deleted() && r.type_id == type_id {
2491 Some(*id)
2492 } else {
2493 None
2494 }
2495 })
2496 })
2497 .collect();
2498
2499 Box::new(edge_ids.into_iter().filter_map(move |id| self.get_edge(id)))
2501 as Box<dyn Iterator<Item = Edge> + 'a>
2502 } else {
2503 Box::new(std::iter::empty()) as Box<dyn Iterator<Item = Edge> + 'a>
2505 }
2506 }
2507
2508 #[cfg(feature = "tiered-storage")]
2511 pub fn edges_with_type<'a>(&'a self, edge_type: &str) -> impl Iterator<Item = Edge> + 'a {
2512 let epoch = self.current_epoch();
2513 let type_to_id = self.edge_type_to_id.read();
2514
2515 if let Some(&type_id) = type_to_id.get(edge_type) {
2516 let versions = self.edge_versions.read();
2517 let edge_ids: Vec<EdgeId> = versions
2518 .iter()
2519 .filter_map(|(id, index)| {
2520 index.visible_at(epoch).and_then(|vref| {
2521 self.read_edge_record(&vref).and_then(|r| {
2522 if !r.is_deleted() && r.type_id == type_id {
2523 Some(*id)
2524 } else {
2525 None
2526 }
2527 })
2528 })
2529 })
2530 .collect();
2531
2532 Box::new(edge_ids.into_iter().filter_map(move |id| self.get_edge(id)))
2533 as Box<dyn Iterator<Item = Edge> + 'a>
2534 } else {
2535 Box::new(std::iter::empty()) as Box<dyn Iterator<Item = Edge> + 'a>
2536 }
2537 }
2538
2539 #[must_use]
2546 pub fn node_property_might_match(
2547 &self,
2548 property: &PropertyKey,
2549 op: CompareOp,
2550 value: &Value,
2551 ) -> bool {
2552 self.node_properties.might_match(property, op, value)
2553 }
2554
2555 #[must_use]
2557 pub fn edge_property_might_match(
2558 &self,
2559 property: &PropertyKey,
2560 op: CompareOp,
2561 value: &Value,
2562 ) -> bool {
2563 self.edge_properties.might_match(property, op, value)
2564 }
2565
2566 #[must_use]
2568 pub fn node_property_zone_map(&self, property: &PropertyKey) -> Option<ZoneMapEntry> {
2569 self.node_properties.zone_map(property)
2570 }
2571
2572 #[must_use]
2574 pub fn edge_property_zone_map(&self, property: &PropertyKey) -> Option<ZoneMapEntry> {
2575 self.edge_properties.zone_map(property)
2576 }
2577
2578 pub fn rebuild_zone_maps(&self) {
2580 self.node_properties.rebuild_zone_maps();
2581 self.edge_properties.rebuild_zone_maps();
2582 }
2583
2584 #[must_use]
2588 pub fn statistics(&self) -> Statistics {
2589 self.statistics.read().clone()
2590 }
2591
2592 pub fn ensure_statistics_fresh(&self) {
2597 if self.needs_stats_recompute.swap(false, Ordering::Relaxed) {
2598 self.compute_statistics();
2599 }
2600 }
2601
2602 #[cfg(not(feature = "tiered-storage"))]
2607 pub fn compute_statistics(&self) {
2608 let mut stats = Statistics::new();
2609
2610 stats.total_nodes = self.node_count() as u64;
2612 stats.total_edges = self.edge_count() as u64;
2613
2614 let id_to_label = self.id_to_label.read();
2616 let label_index = self.label_index.read();
2617
2618 for (label_id, label_name) in id_to_label.iter().enumerate() {
2619 let node_count = label_index.get(label_id).map_or(0, |set| set.len() as u64);
2620
2621 if node_count > 0 {
2622 let avg_out_degree = if stats.total_nodes > 0 {
2624 stats.total_edges as f64 / stats.total_nodes as f64
2625 } else {
2626 0.0
2627 };
2628
2629 let label_stats =
2630 LabelStatistics::new(node_count).with_degrees(avg_out_degree, avg_out_degree);
2631
2632 stats.update_label(label_name.as_ref(), label_stats);
2633 }
2634 }
2635
2636 let id_to_edge_type = self.id_to_edge_type.read();
2638 let edges = self.edges.read();
2639 let epoch = self.current_epoch();
2640
2641 let mut edge_type_counts: FxHashMap<u32, u64> = FxHashMap::default();
2642 for chain in edges.values() {
2643 if let Some(record) = chain.visible_at(epoch)
2644 && !record.is_deleted()
2645 {
2646 *edge_type_counts.entry(record.type_id).or_default() += 1;
2647 }
2648 }
2649
2650 for (type_id, count) in edge_type_counts {
2651 if let Some(type_name) = id_to_edge_type.get(type_id as usize) {
2652 let avg_degree = if stats.total_nodes > 0 {
2653 count as f64 / stats.total_nodes as f64
2654 } else {
2655 0.0
2656 };
2657
2658 let edge_stats = EdgeTypeStatistics::new(count, avg_degree, avg_degree);
2659 stats.update_edge_type(type_name.as_ref(), edge_stats);
2660 }
2661 }
2662
2663 *self.statistics.write() = stats;
2664 }
2665
2666 #[cfg(feature = "tiered-storage")]
2669 pub fn compute_statistics(&self) {
2670 let mut stats = Statistics::new();
2671
2672 stats.total_nodes = self.node_count() as u64;
2674 stats.total_edges = self.edge_count() as u64;
2675
2676 let id_to_label = self.id_to_label.read();
2678 let label_index = self.label_index.read();
2679
2680 for (label_id, label_name) in id_to_label.iter().enumerate() {
2681 let node_count = label_index.get(label_id).map_or(0, |set| set.len() as u64);
2682
2683 if node_count > 0 {
2684 let avg_out_degree = if stats.total_nodes > 0 {
2685 stats.total_edges as f64 / stats.total_nodes as f64
2686 } else {
2687 0.0
2688 };
2689
2690 let label_stats =
2691 LabelStatistics::new(node_count).with_degrees(avg_out_degree, avg_out_degree);
2692
2693 stats.update_label(label_name.as_ref(), label_stats);
2694 }
2695 }
2696
2697 let id_to_edge_type = self.id_to_edge_type.read();
2699 let versions = self.edge_versions.read();
2700 let epoch = self.current_epoch();
2701
2702 let mut edge_type_counts: FxHashMap<u32, u64> = FxHashMap::default();
2703 for index in versions.values() {
2704 if let Some(vref) = index.visible_at(epoch)
2705 && let Some(record) = self.read_edge_record(&vref)
2706 && !record.is_deleted()
2707 {
2708 *edge_type_counts.entry(record.type_id).or_default() += 1;
2709 }
2710 }
2711
2712 for (type_id, count) in edge_type_counts {
2713 if let Some(type_name) = id_to_edge_type.get(type_id as usize) {
2714 let avg_degree = if stats.total_nodes > 0 {
2715 count as f64 / stats.total_nodes as f64
2716 } else {
2717 0.0
2718 };
2719
2720 let edge_stats = EdgeTypeStatistics::new(count, avg_degree, avg_degree);
2721 stats.update_edge_type(type_name.as_ref(), edge_stats);
2722 }
2723 }
2724
2725 *self.statistics.write() = stats;
2726 }
2727
2728 #[must_use]
2730 pub fn estimate_label_cardinality(&self, label: &str) -> f64 {
2731 self.statistics.read().estimate_label_cardinality(label)
2732 }
2733
2734 #[must_use]
2736 pub fn estimate_avg_degree(&self, edge_type: &str, outgoing: bool) -> f64 {
2737 self.statistics
2738 .read()
2739 .estimate_avg_degree(edge_type, outgoing)
2740 }
2741
2742 fn get_or_create_label_id(&self, label: &str) -> u32 {
2745 {
2746 let label_to_id = self.label_to_id.read();
2747 if let Some(&id) = label_to_id.get(label) {
2748 return id;
2749 }
2750 }
2751
2752 let mut label_to_id = self.label_to_id.write();
2753 let mut id_to_label = self.id_to_label.write();
2754
2755 if let Some(&id) = label_to_id.get(label) {
2757 return id;
2758 }
2759
2760 let id = id_to_label.len() as u32;
2761
2762 let label: ArcStr = label.into();
2763 label_to_id.insert(label.clone(), id);
2764 id_to_label.push(label);
2765
2766 id
2767 }
2768
2769 fn get_or_create_edge_type_id(&self, edge_type: &str) -> u32 {
2770 {
2771 let type_to_id = self.edge_type_to_id.read();
2772 if let Some(&id) = type_to_id.get(edge_type) {
2773 return id;
2774 }
2775 }
2776
2777 let mut type_to_id = self.edge_type_to_id.write();
2778 let mut id_to_type = self.id_to_edge_type.write();
2779
2780 if let Some(&id) = type_to_id.get(edge_type) {
2782 return id;
2783 }
2784
2785 let id = id_to_type.len() as u32;
2786 let edge_type: ArcStr = edge_type.into();
2787 type_to_id.insert(edge_type.clone(), id);
2788 id_to_type.push(edge_type);
2789
2790 id
2791 }
2792
2793 #[cfg(not(feature = "tiered-storage"))]
2800 pub fn create_node_with_id(&self, id: NodeId, labels: &[&str]) {
2801 let epoch = self.current_epoch();
2802 let mut record = NodeRecord::new(id, epoch);
2803 record.set_label_count(labels.len() as u16);
2804
2805 let mut node_label_set = FxHashSet::default();
2807 for label in labels {
2808 let label_id = self.get_or_create_label_id(*label);
2809 node_label_set.insert(label_id);
2810
2811 let mut index = self.label_index.write();
2813 while index.len() <= label_id as usize {
2814 index.push(FxHashMap::default());
2815 }
2816 index[label_id as usize].insert(id, ());
2817 }
2818
2819 self.node_labels.write().insert(id, node_label_set);
2821
2822 let chain = VersionChain::with_initial(record, epoch, TxId::SYSTEM);
2824 self.nodes.write().insert(id, chain);
2825
2826 let id_val = id.as_u64();
2828 let _ = self
2829 .next_node_id
2830 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
2831 if id_val >= current {
2832 Some(id_val + 1)
2833 } else {
2834 None
2835 }
2836 });
2837 }
2838
2839 #[cfg(feature = "tiered-storage")]
2842 pub fn create_node_with_id(&self, id: NodeId, labels: &[&str]) {
2843 let epoch = self.current_epoch();
2844 let mut record = NodeRecord::new(id, epoch);
2845 record.set_label_count(labels.len() as u16);
2846
2847 let mut node_label_set = FxHashSet::default();
2849 for label in labels {
2850 let label_id = self.get_or_create_label_id(*label);
2851 node_label_set.insert(label_id);
2852
2853 let mut index = self.label_index.write();
2855 while index.len() <= label_id as usize {
2856 index.push(FxHashMap::default());
2857 }
2858 index[label_id as usize].insert(id, ());
2859 }
2860
2861 self.node_labels.write().insert(id, node_label_set);
2863
2864 let arena = self.arena_allocator.arena_or_create(epoch);
2866 let (offset, _stored) = arena.alloc_value_with_offset(record);
2867
2868 let hot_ref = HotVersionRef::new(epoch, offset, TxId::SYSTEM);
2870 let mut versions = self.node_versions.write();
2871 versions.insert(id, VersionIndex::with_initial(hot_ref));
2872
2873 let id_val = id.as_u64();
2875 let _ = self
2876 .next_node_id
2877 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
2878 if id_val >= current {
2879 Some(id_val + 1)
2880 } else {
2881 None
2882 }
2883 });
2884 }
2885
2886 #[cfg(not(feature = "tiered-storage"))]
2890 pub fn create_edge_with_id(&self, id: EdgeId, src: NodeId, dst: NodeId, edge_type: &str) {
2891 let epoch = self.current_epoch();
2892 let type_id = self.get_or_create_edge_type_id(edge_type);
2893
2894 let record = EdgeRecord::new(id, src, dst, type_id, epoch);
2895 let chain = VersionChain::with_initial(record, epoch, TxId::SYSTEM);
2896 self.edges.write().insert(id, chain);
2897
2898 self.forward_adj.add_edge(src, dst, id);
2900 if let Some(ref backward) = self.backward_adj {
2901 backward.add_edge(dst, src, id);
2902 }
2903
2904 let id_val = id.as_u64();
2906 let _ = self
2907 .next_edge_id
2908 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
2909 if id_val >= current {
2910 Some(id_val + 1)
2911 } else {
2912 None
2913 }
2914 });
2915 }
2916
2917 #[cfg(feature = "tiered-storage")]
2920 pub fn create_edge_with_id(&self, id: EdgeId, src: NodeId, dst: NodeId, edge_type: &str) {
2921 let epoch = self.current_epoch();
2922 let type_id = self.get_or_create_edge_type_id(edge_type);
2923
2924 let record = EdgeRecord::new(id, src, dst, type_id, epoch);
2925
2926 let arena = self.arena_allocator.arena_or_create(epoch);
2928 let (offset, _stored) = arena.alloc_value_with_offset(record);
2929
2930 let hot_ref = HotVersionRef::new(epoch, offset, TxId::SYSTEM);
2932 let mut versions = self.edge_versions.write();
2933 versions.insert(id, VersionIndex::with_initial(hot_ref));
2934
2935 self.forward_adj.add_edge(src, dst, id);
2937 if let Some(ref backward) = self.backward_adj {
2938 backward.add_edge(dst, src, id);
2939 }
2940
2941 let id_val = id.as_u64();
2943 let _ = self
2944 .next_edge_id
2945 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
2946 if id_val >= current {
2947 Some(id_val + 1)
2948 } else {
2949 None
2950 }
2951 });
2952 }
2953
2954 pub fn set_epoch(&self, epoch: EpochId) {
2956 self.current_epoch.store(epoch.as_u64(), Ordering::SeqCst);
2957 }
2958}
2959
2960impl Default for LpgStore {
2961 fn default() -> Self {
2962 Self::new()
2963 }
2964}
2965
2966#[cfg(test)]
2967mod tests {
2968 use super::*;
2969
2970 #[test]
2971 fn test_create_node() {
2972 let store = LpgStore::new();
2973
2974 let id = store.create_node(&["Person"]);
2975 assert!(id.is_valid());
2976
2977 let node = store.get_node(id).unwrap();
2978 assert!(node.has_label("Person"));
2979 assert!(!node.has_label("Animal"));
2980 }
2981
2982 #[test]
2983 fn test_create_node_with_props() {
2984 let store = LpgStore::new();
2985
2986 let id = store.create_node_with_props(
2987 &["Person"],
2988 [("name", Value::from("Alice")), ("age", Value::from(30i64))],
2989 );
2990
2991 let node = store.get_node(id).unwrap();
2992 assert_eq!(
2993 node.get_property("name").and_then(|v| v.as_str()),
2994 Some("Alice")
2995 );
2996 assert_eq!(
2997 node.get_property("age").and_then(|v| v.as_int64()),
2998 Some(30)
2999 );
3000 }
3001
3002 #[test]
3003 fn test_delete_node() {
3004 let store = LpgStore::new();
3005
3006 let id = store.create_node(&["Person"]);
3007 assert_eq!(store.node_count(), 1);
3008
3009 assert!(store.delete_node(id));
3010 assert_eq!(store.node_count(), 0);
3011 assert!(store.get_node(id).is_none());
3012
3013 assert!(!store.delete_node(id));
3015 }
3016
3017 #[test]
3018 fn test_create_edge() {
3019 let store = LpgStore::new();
3020
3021 let alice = store.create_node(&["Person"]);
3022 let bob = store.create_node(&["Person"]);
3023
3024 let edge_id = store.create_edge(alice, bob, "KNOWS");
3025 assert!(edge_id.is_valid());
3026
3027 let edge = store.get_edge(edge_id).unwrap();
3028 assert_eq!(edge.src, alice);
3029 assert_eq!(edge.dst, bob);
3030 assert_eq!(edge.edge_type.as_str(), "KNOWS");
3031 }
3032
3033 #[test]
3034 fn test_neighbors() {
3035 let store = LpgStore::new();
3036
3037 let a = store.create_node(&["Person"]);
3038 let b = store.create_node(&["Person"]);
3039 let c = store.create_node(&["Person"]);
3040
3041 store.create_edge(a, b, "KNOWS");
3042 store.create_edge(a, c, "KNOWS");
3043
3044 let outgoing: Vec<_> = store.neighbors(a, Direction::Outgoing).collect();
3045 assert_eq!(outgoing.len(), 2);
3046 assert!(outgoing.contains(&b));
3047 assert!(outgoing.contains(&c));
3048
3049 let incoming: Vec<_> = store.neighbors(b, Direction::Incoming).collect();
3050 assert_eq!(incoming.len(), 1);
3051 assert!(incoming.contains(&a));
3052 }
3053
3054 #[test]
3055 fn test_nodes_by_label() {
3056 let store = LpgStore::new();
3057
3058 let p1 = store.create_node(&["Person"]);
3059 let p2 = store.create_node(&["Person"]);
3060 let _a = store.create_node(&["Animal"]);
3061
3062 let persons = store.nodes_by_label("Person");
3063 assert_eq!(persons.len(), 2);
3064 assert!(persons.contains(&p1));
3065 assert!(persons.contains(&p2));
3066
3067 let animals = store.nodes_by_label("Animal");
3068 assert_eq!(animals.len(), 1);
3069 }
3070
3071 #[test]
3072 fn test_delete_edge() {
3073 let store = LpgStore::new();
3074
3075 let a = store.create_node(&["Person"]);
3076 let b = store.create_node(&["Person"]);
3077 let edge_id = store.create_edge(a, b, "KNOWS");
3078
3079 assert_eq!(store.edge_count(), 1);
3080
3081 assert!(store.delete_edge(edge_id));
3082 assert_eq!(store.edge_count(), 0);
3083 assert!(store.get_edge(edge_id).is_none());
3084 }
3085
3086 #[test]
3089 fn test_lpg_store_config() {
3090 let config = LpgStoreConfig {
3092 backward_edges: false,
3093 initial_node_capacity: 100,
3094 initial_edge_capacity: 200,
3095 };
3096 let store = LpgStore::with_config(config);
3097
3098 let a = store.create_node(&["Person"]);
3100 let b = store.create_node(&["Person"]);
3101 store.create_edge(a, b, "KNOWS");
3102
3103 let outgoing: Vec<_> = store.neighbors(a, Direction::Outgoing).collect();
3105 assert_eq!(outgoing.len(), 1);
3106
3107 let incoming: Vec<_> = store.neighbors(b, Direction::Incoming).collect();
3109 assert_eq!(incoming.len(), 0);
3110 }
3111
3112 #[test]
3113 fn test_epoch_management() {
3114 let store = LpgStore::new();
3115
3116 let epoch0 = store.current_epoch();
3117 assert_eq!(epoch0.as_u64(), 0);
3118
3119 let epoch1 = store.new_epoch();
3120 assert_eq!(epoch1.as_u64(), 1);
3121
3122 let current = store.current_epoch();
3123 assert_eq!(current.as_u64(), 1);
3124 }
3125
3126 #[test]
3127 fn test_node_properties() {
3128 let store = LpgStore::new();
3129 let id = store.create_node(&["Person"]);
3130
3131 store.set_node_property(id, "name", Value::from("Alice"));
3133 let name = store.get_node_property(id, &"name".into());
3134 assert!(matches!(name, Some(Value::String(s)) if s.as_str() == "Alice"));
3135
3136 store.set_node_property(id, "name", Value::from("Bob"));
3138 let name = store.get_node_property(id, &"name".into());
3139 assert!(matches!(name, Some(Value::String(s)) if s.as_str() == "Bob"));
3140
3141 let old = store.remove_node_property(id, "name");
3143 assert!(matches!(old, Some(Value::String(s)) if s.as_str() == "Bob"));
3144
3145 let name = store.get_node_property(id, &"name".into());
3147 assert!(name.is_none());
3148
3149 let none = store.remove_node_property(id, "nonexistent");
3151 assert!(none.is_none());
3152 }
3153
3154 #[test]
3155 fn test_edge_properties() {
3156 let store = LpgStore::new();
3157 let a = store.create_node(&["Person"]);
3158 let b = store.create_node(&["Person"]);
3159 let edge_id = store.create_edge(a, b, "KNOWS");
3160
3161 store.set_edge_property(edge_id, "since", Value::from(2020i64));
3163 let since = store.get_edge_property(edge_id, &"since".into());
3164 assert_eq!(since.and_then(|v| v.as_int64()), Some(2020));
3165
3166 let old = store.remove_edge_property(edge_id, "since");
3168 assert_eq!(old.and_then(|v| v.as_int64()), Some(2020));
3169
3170 let since = store.get_edge_property(edge_id, &"since".into());
3171 assert!(since.is_none());
3172 }
3173
3174 #[test]
3175 fn test_add_remove_label() {
3176 let store = LpgStore::new();
3177 let id = store.create_node(&["Person"]);
3178
3179 assert!(store.add_label(id, "Employee"));
3181
3182 let node = store.get_node(id).unwrap();
3183 assert!(node.has_label("Person"));
3184 assert!(node.has_label("Employee"));
3185
3186 assert!(!store.add_label(id, "Employee"));
3188
3189 assert!(store.remove_label(id, "Employee"));
3191
3192 let node = store.get_node(id).unwrap();
3193 assert!(node.has_label("Person"));
3194 assert!(!node.has_label("Employee"));
3195
3196 assert!(!store.remove_label(id, "Employee"));
3198 assert!(!store.remove_label(id, "NonExistent"));
3199 }
3200
3201 #[test]
3202 fn test_add_label_to_nonexistent_node() {
3203 let store = LpgStore::new();
3204 let fake_id = NodeId::new(999);
3205 assert!(!store.add_label(fake_id, "Label"));
3206 }
3207
3208 #[test]
3209 fn test_remove_label_from_nonexistent_node() {
3210 let store = LpgStore::new();
3211 let fake_id = NodeId::new(999);
3212 assert!(!store.remove_label(fake_id, "Label"));
3213 }
3214
3215 #[test]
3216 fn test_node_ids() {
3217 let store = LpgStore::new();
3218
3219 let n1 = store.create_node(&["Person"]);
3220 let n2 = store.create_node(&["Person"]);
3221 let n3 = store.create_node(&["Person"]);
3222
3223 let ids = store.node_ids();
3224 assert_eq!(ids.len(), 3);
3225 assert!(ids.contains(&n1));
3226 assert!(ids.contains(&n2));
3227 assert!(ids.contains(&n3));
3228
3229 store.delete_node(n2);
3231 let ids = store.node_ids();
3232 assert_eq!(ids.len(), 2);
3233 assert!(!ids.contains(&n2));
3234 }
3235
3236 #[test]
3237 fn test_delete_node_nonexistent() {
3238 let store = LpgStore::new();
3239 let fake_id = NodeId::new(999);
3240 assert!(!store.delete_node(fake_id));
3241 }
3242
3243 #[test]
3244 fn test_delete_edge_nonexistent() {
3245 let store = LpgStore::new();
3246 let fake_id = EdgeId::new(999);
3247 assert!(!store.delete_edge(fake_id));
3248 }
3249
3250 #[test]
3251 fn test_delete_edge_double() {
3252 let store = LpgStore::new();
3253 let a = store.create_node(&["Person"]);
3254 let b = store.create_node(&["Person"]);
3255 let edge_id = store.create_edge(a, b, "KNOWS");
3256
3257 assert!(store.delete_edge(edge_id));
3258 assert!(!store.delete_edge(edge_id)); }
3260
3261 #[test]
3262 fn test_create_edge_with_props() {
3263 let store = LpgStore::new();
3264 let a = store.create_node(&["Person"]);
3265 let b = store.create_node(&["Person"]);
3266
3267 let edge_id = store.create_edge_with_props(
3268 a,
3269 b,
3270 "KNOWS",
3271 [
3272 ("since", Value::from(2020i64)),
3273 ("weight", Value::from(1.0)),
3274 ],
3275 );
3276
3277 let edge = store.get_edge(edge_id).unwrap();
3278 assert_eq!(
3279 edge.get_property("since").and_then(|v| v.as_int64()),
3280 Some(2020)
3281 );
3282 assert_eq!(
3283 edge.get_property("weight").and_then(|v| v.as_float64()),
3284 Some(1.0)
3285 );
3286 }
3287
3288 #[test]
3289 fn test_delete_node_edges() {
3290 let store = LpgStore::new();
3291
3292 let a = store.create_node(&["Person"]);
3293 let b = store.create_node(&["Person"]);
3294 let c = store.create_node(&["Person"]);
3295
3296 store.create_edge(a, b, "KNOWS"); store.create_edge(c, a, "KNOWS"); assert_eq!(store.edge_count(), 2);
3300
3301 store.delete_node_edges(a);
3303
3304 assert_eq!(store.edge_count(), 0);
3305 }
3306
3307 #[test]
3308 fn test_neighbors_both_directions() {
3309 let store = LpgStore::new();
3310
3311 let a = store.create_node(&["Person"]);
3312 let b = store.create_node(&["Person"]);
3313 let c = store.create_node(&["Person"]);
3314
3315 store.create_edge(a, b, "KNOWS"); store.create_edge(c, a, "KNOWS"); let neighbors: Vec<_> = store.neighbors(a, Direction::Both).collect();
3320 assert_eq!(neighbors.len(), 2);
3321 assert!(neighbors.contains(&b)); assert!(neighbors.contains(&c)); }
3324
3325 #[test]
3326 fn test_edges_from() {
3327 let store = LpgStore::new();
3328
3329 let a = store.create_node(&["Person"]);
3330 let b = store.create_node(&["Person"]);
3331 let c = store.create_node(&["Person"]);
3332
3333 let e1 = store.create_edge(a, b, "KNOWS");
3334 let e2 = store.create_edge(a, c, "KNOWS");
3335
3336 let edges: Vec<_> = store.edges_from(a, Direction::Outgoing).collect();
3337 assert_eq!(edges.len(), 2);
3338 assert!(edges.iter().any(|(_, e)| *e == e1));
3339 assert!(edges.iter().any(|(_, e)| *e == e2));
3340
3341 let incoming: Vec<_> = store.edges_from(b, Direction::Incoming).collect();
3343 assert_eq!(incoming.len(), 1);
3344 assert_eq!(incoming[0].1, e1);
3345 }
3346
3347 #[test]
3348 fn test_edges_to() {
3349 let store = LpgStore::new();
3350
3351 let a = store.create_node(&["Person"]);
3352 let b = store.create_node(&["Person"]);
3353 let c = store.create_node(&["Person"]);
3354
3355 let e1 = store.create_edge(a, b, "KNOWS");
3356 let e2 = store.create_edge(c, b, "KNOWS");
3357
3358 let to_b = store.edges_to(b);
3360 assert_eq!(to_b.len(), 2);
3361 assert!(to_b.iter().any(|(src, e)| *src == a && *e == e1));
3362 assert!(to_b.iter().any(|(src, e)| *src == c && *e == e2));
3363 }
3364
3365 #[test]
3366 fn test_out_degree_in_degree() {
3367 let store = LpgStore::new();
3368
3369 let a = store.create_node(&["Person"]);
3370 let b = store.create_node(&["Person"]);
3371 let c = store.create_node(&["Person"]);
3372
3373 store.create_edge(a, b, "KNOWS");
3374 store.create_edge(a, c, "KNOWS");
3375 store.create_edge(c, b, "KNOWS");
3376
3377 assert_eq!(store.out_degree(a), 2);
3378 assert_eq!(store.out_degree(b), 0);
3379 assert_eq!(store.out_degree(c), 1);
3380
3381 assert_eq!(store.in_degree(a), 0);
3382 assert_eq!(store.in_degree(b), 2);
3383 assert_eq!(store.in_degree(c), 1);
3384 }
3385
3386 #[test]
3387 fn test_edge_type() {
3388 let store = LpgStore::new();
3389
3390 let a = store.create_node(&["Person"]);
3391 let b = store.create_node(&["Person"]);
3392 let edge_id = store.create_edge(a, b, "KNOWS");
3393
3394 let edge_type = store.edge_type(edge_id);
3395 assert_eq!(edge_type.as_deref(), Some("KNOWS"));
3396
3397 let fake_id = EdgeId::new(999);
3399 assert!(store.edge_type(fake_id).is_none());
3400 }
3401
3402 #[test]
3403 fn test_count_methods() {
3404 let store = LpgStore::new();
3405
3406 assert_eq!(store.label_count(), 0);
3407 assert_eq!(store.edge_type_count(), 0);
3408 assert_eq!(store.property_key_count(), 0);
3409
3410 let a = store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
3411 let b = store.create_node(&["Company"]);
3412 store.create_edge_with_props(a, b, "WORKS_AT", [("since", Value::from(2020i64))]);
3413
3414 assert_eq!(store.label_count(), 2); assert_eq!(store.edge_type_count(), 1); assert_eq!(store.property_key_count(), 2); }
3418
3419 #[test]
3420 fn test_all_nodes_and_edges() {
3421 let store = LpgStore::new();
3422
3423 let a = store.create_node(&["Person"]);
3424 let b = store.create_node(&["Person"]);
3425 store.create_edge(a, b, "KNOWS");
3426
3427 let nodes: Vec<_> = store.all_nodes().collect();
3428 assert_eq!(nodes.len(), 2);
3429
3430 let edges: Vec<_> = store.all_edges().collect();
3431 assert_eq!(edges.len(), 1);
3432 }
3433
3434 #[test]
3435 fn test_all_labels_and_edge_types() {
3436 let store = LpgStore::new();
3437
3438 store.create_node(&["Person"]);
3439 store.create_node(&["Company"]);
3440 let a = store.create_node(&["Animal"]);
3441 let b = store.create_node(&["Animal"]);
3442 store.create_edge(a, b, "EATS");
3443
3444 let labels = store.all_labels();
3445 assert_eq!(labels.len(), 3);
3446 assert!(labels.contains(&"Person".to_string()));
3447 assert!(labels.contains(&"Company".to_string()));
3448 assert!(labels.contains(&"Animal".to_string()));
3449
3450 let edge_types = store.all_edge_types();
3451 assert_eq!(edge_types.len(), 1);
3452 assert!(edge_types.contains(&"EATS".to_string()));
3453 }
3454
3455 #[test]
3456 fn test_all_property_keys() {
3457 let store = LpgStore::new();
3458
3459 let a = store.create_node_with_props(&["Person"], [("name", Value::from("Alice"))]);
3460 let b = store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
3461 store.create_edge_with_props(a, b, "KNOWS", [("since", Value::from(2020i64))]);
3462
3463 let keys = store.all_property_keys();
3464 assert!(keys.contains(&"name".to_string()));
3465 assert!(keys.contains(&"age".to_string()));
3466 assert!(keys.contains(&"since".to_string()));
3467 }
3468
3469 #[test]
3470 fn test_nodes_with_label() {
3471 let store = LpgStore::new();
3472
3473 store.create_node(&["Person"]);
3474 store.create_node(&["Person"]);
3475 store.create_node(&["Company"]);
3476
3477 let persons: Vec<_> = store.nodes_with_label("Person").collect();
3478 assert_eq!(persons.len(), 2);
3479
3480 let companies: Vec<_> = store.nodes_with_label("Company").collect();
3481 assert_eq!(companies.len(), 1);
3482
3483 let none: Vec<_> = store.nodes_with_label("NonExistent").collect();
3484 assert_eq!(none.len(), 0);
3485 }
3486
3487 #[test]
3488 fn test_edges_with_type() {
3489 let store = LpgStore::new();
3490
3491 let a = store.create_node(&["Person"]);
3492 let b = store.create_node(&["Person"]);
3493 let c = store.create_node(&["Company"]);
3494
3495 store.create_edge(a, b, "KNOWS");
3496 store.create_edge(a, c, "WORKS_AT");
3497
3498 let knows: Vec<_> = store.edges_with_type("KNOWS").collect();
3499 assert_eq!(knows.len(), 1);
3500
3501 let works_at: Vec<_> = store.edges_with_type("WORKS_AT").collect();
3502 assert_eq!(works_at.len(), 1);
3503
3504 let none: Vec<_> = store.edges_with_type("NonExistent").collect();
3505 assert_eq!(none.len(), 0);
3506 }
3507
3508 #[test]
3509 fn test_nodes_by_label_nonexistent() {
3510 let store = LpgStore::new();
3511 store.create_node(&["Person"]);
3512
3513 let empty = store.nodes_by_label("NonExistent");
3514 assert!(empty.is_empty());
3515 }
3516
3517 #[test]
3518 fn test_statistics() {
3519 let store = LpgStore::new();
3520
3521 let a = store.create_node(&["Person"]);
3522 let b = store.create_node(&["Person"]);
3523 let c = store.create_node(&["Company"]);
3524
3525 store.create_edge(a, b, "KNOWS");
3526 store.create_edge(a, c, "WORKS_AT");
3527
3528 store.compute_statistics();
3529 let stats = store.statistics();
3530
3531 assert_eq!(stats.total_nodes, 3);
3532 assert_eq!(stats.total_edges, 2);
3533
3534 let person_card = store.estimate_label_cardinality("Person");
3536 assert!(person_card > 0.0);
3537
3538 let avg_degree = store.estimate_avg_degree("KNOWS", true);
3539 assert!(avg_degree >= 0.0);
3540 }
3541
3542 #[test]
3543 fn test_zone_maps() {
3544 let store = LpgStore::new();
3545
3546 store.create_node_with_props(&["Person"], [("age", Value::from(25i64))]);
3547 store.create_node_with_props(&["Person"], [("age", Value::from(35i64))]);
3548
3549 let might_match =
3551 store.node_property_might_match(&"age".into(), CompareOp::Eq, &Value::from(30i64));
3552 assert!(might_match);
3554
3555 let zone = store.node_property_zone_map(&"age".into());
3556 assert!(zone.is_some());
3557
3558 let no_zone = store.node_property_zone_map(&"nonexistent".into());
3560 assert!(no_zone.is_none());
3561
3562 let a = store.create_node(&["A"]);
3564 let b = store.create_node(&["B"]);
3565 store.create_edge_with_props(a, b, "REL", [("weight", Value::from(1.0))]);
3566
3567 let edge_zone = store.edge_property_zone_map(&"weight".into());
3568 assert!(edge_zone.is_some());
3569 }
3570
3571 #[test]
3572 fn test_rebuild_zone_maps() {
3573 let store = LpgStore::new();
3574 store.create_node_with_props(&["Person"], [("age", Value::from(25i64))]);
3575
3576 store.rebuild_zone_maps();
3578 }
3579
3580 #[test]
3581 fn test_create_node_with_id() {
3582 let store = LpgStore::new();
3583
3584 let specific_id = NodeId::new(100);
3585 store.create_node_with_id(specific_id, &["Person", "Employee"]);
3586
3587 let node = store.get_node(specific_id).unwrap();
3588 assert!(node.has_label("Person"));
3589 assert!(node.has_label("Employee"));
3590
3591 let next = store.create_node(&["Other"]);
3593 assert!(next.as_u64() > 100);
3594 }
3595
3596 #[test]
3597 fn test_create_edge_with_id() {
3598 let store = LpgStore::new();
3599
3600 let a = store.create_node(&["A"]);
3601 let b = store.create_node(&["B"]);
3602
3603 let specific_id = EdgeId::new(500);
3604 store.create_edge_with_id(specific_id, a, b, "REL");
3605
3606 let edge = store.get_edge(specific_id).unwrap();
3607 assert_eq!(edge.src, a);
3608 assert_eq!(edge.dst, b);
3609 assert_eq!(edge.edge_type.as_str(), "REL");
3610
3611 let next = store.create_edge(a, b, "OTHER");
3613 assert!(next.as_u64() > 500);
3614 }
3615
3616 #[test]
3617 fn test_set_epoch() {
3618 let store = LpgStore::new();
3619
3620 assert_eq!(store.current_epoch().as_u64(), 0);
3621
3622 store.set_epoch(EpochId::new(42));
3623 assert_eq!(store.current_epoch().as_u64(), 42);
3624 }
3625
3626 #[test]
3627 fn test_get_node_nonexistent() {
3628 let store = LpgStore::new();
3629 let fake_id = NodeId::new(999);
3630 assert!(store.get_node(fake_id).is_none());
3631 }
3632
3633 #[test]
3634 fn test_get_edge_nonexistent() {
3635 let store = LpgStore::new();
3636 let fake_id = EdgeId::new(999);
3637 assert!(store.get_edge(fake_id).is_none());
3638 }
3639
3640 #[test]
3641 fn test_multiple_labels() {
3642 let store = LpgStore::new();
3643
3644 let id = store.create_node(&["Person", "Employee", "Manager"]);
3645 let node = store.get_node(id).unwrap();
3646
3647 assert!(node.has_label("Person"));
3648 assert!(node.has_label("Employee"));
3649 assert!(node.has_label("Manager"));
3650 assert!(!node.has_label("Other"));
3651 }
3652
3653 #[test]
3654 fn test_default_impl() {
3655 let store: LpgStore = Default::default();
3656 assert_eq!(store.node_count(), 0);
3657 assert_eq!(store.edge_count(), 0);
3658 }
3659
3660 #[test]
3661 fn test_edges_from_both_directions() {
3662 let store = LpgStore::new();
3663
3664 let a = store.create_node(&["A"]);
3665 let b = store.create_node(&["B"]);
3666 let c = store.create_node(&["C"]);
3667
3668 let e1 = store.create_edge(a, b, "R1"); let e2 = store.create_edge(c, a, "R2"); let edges: Vec<_> = store.edges_from(a, Direction::Both).collect();
3673 assert_eq!(edges.len(), 2);
3674 assert!(edges.iter().any(|(_, e)| *e == e1)); assert!(edges.iter().any(|(_, e)| *e == e2)); }
3677
3678 #[test]
3679 fn test_no_backward_adj_in_degree() {
3680 let config = LpgStoreConfig {
3681 backward_edges: false,
3682 initial_node_capacity: 10,
3683 initial_edge_capacity: 10,
3684 };
3685 let store = LpgStore::with_config(config);
3686
3687 let a = store.create_node(&["A"]);
3688 let b = store.create_node(&["B"]);
3689 store.create_edge(a, b, "R");
3690
3691 let degree = store.in_degree(b);
3693 assert_eq!(degree, 1);
3694 }
3695
3696 #[test]
3697 fn test_no_backward_adj_edges_to() {
3698 let config = LpgStoreConfig {
3699 backward_edges: false,
3700 initial_node_capacity: 10,
3701 initial_edge_capacity: 10,
3702 };
3703 let store = LpgStore::with_config(config);
3704
3705 let a = store.create_node(&["A"]);
3706 let b = store.create_node(&["B"]);
3707 let e = store.create_edge(a, b, "R");
3708
3709 let edges = store.edges_to(b);
3711 assert_eq!(edges.len(), 1);
3712 assert_eq!(edges[0].1, e);
3713 }
3714
3715 #[test]
3716 fn test_node_versioned_creation() {
3717 let store = LpgStore::new();
3718
3719 let epoch = store.new_epoch();
3720 let tx_id = TxId::new(1);
3721
3722 let id = store.create_node_versioned(&["Person"], epoch, tx_id);
3723 assert!(store.get_node(id).is_some());
3724 }
3725
3726 #[test]
3727 fn test_edge_versioned_creation() {
3728 let store = LpgStore::new();
3729
3730 let a = store.create_node(&["A"]);
3731 let b = store.create_node(&["B"]);
3732
3733 let epoch = store.new_epoch();
3734 let tx_id = TxId::new(1);
3735
3736 let edge_id = store.create_edge_versioned(a, b, "REL", epoch, tx_id);
3737 assert!(store.get_edge(edge_id).is_some());
3738 }
3739
3740 #[test]
3741 fn test_node_with_props_versioned() {
3742 let store = LpgStore::new();
3743
3744 let epoch = store.new_epoch();
3745 let tx_id = TxId::new(1);
3746
3747 let id = store.create_node_with_props_versioned(
3748 &["Person"],
3749 [("name", Value::from("Alice"))],
3750 epoch,
3751 tx_id,
3752 );
3753
3754 let node = store.get_node(id).unwrap();
3755 assert_eq!(
3756 node.get_property("name").and_then(|v| v.as_str()),
3757 Some("Alice")
3758 );
3759 }
3760
3761 #[test]
3762 fn test_discard_uncommitted_versions() {
3763 let store = LpgStore::new();
3764
3765 let epoch = store.new_epoch();
3766 let tx_id = TxId::new(42);
3767
3768 let node_id = store.create_node_versioned(&["Person"], epoch, tx_id);
3770 assert!(store.get_node(node_id).is_some());
3771
3772 store.discard_uncommitted_versions(tx_id);
3774
3775 assert!(store.get_node(node_id).is_none());
3777 }
3778
3779 #[test]
3782 fn test_property_index_create_and_lookup() {
3783 let store = LpgStore::new();
3784
3785 let alice = store.create_node(&["Person"]);
3787 let bob = store.create_node(&["Person"]);
3788 let charlie = store.create_node(&["Person"]);
3789
3790 store.set_node_property(alice, "city", Value::from("NYC"));
3791 store.set_node_property(bob, "city", Value::from("NYC"));
3792 store.set_node_property(charlie, "city", Value::from("LA"));
3793
3794 let nyc_people = store.find_nodes_by_property("city", &Value::from("NYC"));
3796 assert_eq!(nyc_people.len(), 2);
3797
3798 store.create_property_index("city");
3800 assert!(store.has_property_index("city"));
3801
3802 let nyc_people = store.find_nodes_by_property("city", &Value::from("NYC"));
3804 assert_eq!(nyc_people.len(), 2);
3805 assert!(nyc_people.contains(&alice));
3806 assert!(nyc_people.contains(&bob));
3807
3808 let la_people = store.find_nodes_by_property("city", &Value::from("LA"));
3809 assert_eq!(la_people.len(), 1);
3810 assert!(la_people.contains(&charlie));
3811 }
3812
3813 #[test]
3814 fn test_property_index_maintained_on_update() {
3815 let store = LpgStore::new();
3816
3817 store.create_property_index("status");
3819
3820 let node = store.create_node(&["Task"]);
3821 store.set_node_property(node, "status", Value::from("pending"));
3822
3823 let pending = store.find_nodes_by_property("status", &Value::from("pending"));
3825 assert_eq!(pending.len(), 1);
3826 assert!(pending.contains(&node));
3827
3828 store.set_node_property(node, "status", Value::from("done"));
3830
3831 let pending = store.find_nodes_by_property("status", &Value::from("pending"));
3833 assert!(pending.is_empty());
3834
3835 let done = store.find_nodes_by_property("status", &Value::from("done"));
3837 assert_eq!(done.len(), 1);
3838 assert!(done.contains(&node));
3839 }
3840
3841 #[test]
3842 fn test_property_index_maintained_on_remove() {
3843 let store = LpgStore::new();
3844
3845 store.create_property_index("tag");
3846
3847 let node = store.create_node(&["Item"]);
3848 store.set_node_property(node, "tag", Value::from("important"));
3849
3850 let found = store.find_nodes_by_property("tag", &Value::from("important"));
3852 assert_eq!(found.len(), 1);
3853
3854 store.remove_node_property(node, "tag");
3856
3857 let found = store.find_nodes_by_property("tag", &Value::from("important"));
3859 assert!(found.is_empty());
3860 }
3861
3862 #[test]
3863 fn test_property_index_drop() {
3864 let store = LpgStore::new();
3865
3866 store.create_property_index("key");
3867 assert!(store.has_property_index("key"));
3868
3869 assert!(store.drop_property_index("key"));
3870 assert!(!store.has_property_index("key"));
3871
3872 assert!(!store.drop_property_index("key"));
3874 }
3875
3876 #[test]
3877 fn test_property_index_multiple_values() {
3878 let store = LpgStore::new();
3879
3880 store.create_property_index("age");
3881
3882 let n1 = store.create_node(&["Person"]);
3884 let n2 = store.create_node(&["Person"]);
3885 let n3 = store.create_node(&["Person"]);
3886 let n4 = store.create_node(&["Person"]);
3887
3888 store.set_node_property(n1, "age", Value::from(25i64));
3889 store.set_node_property(n2, "age", Value::from(25i64));
3890 store.set_node_property(n3, "age", Value::from(30i64));
3891 store.set_node_property(n4, "age", Value::from(25i64));
3892
3893 let age_25 = store.find_nodes_by_property("age", &Value::from(25i64));
3894 assert_eq!(age_25.len(), 3);
3895
3896 let age_30 = store.find_nodes_by_property("age", &Value::from(30i64));
3897 assert_eq!(age_30.len(), 1);
3898
3899 let age_40 = store.find_nodes_by_property("age", &Value::from(40i64));
3900 assert!(age_40.is_empty());
3901 }
3902
3903 #[test]
3904 fn test_property_index_builds_from_existing_data() {
3905 let store = LpgStore::new();
3906
3907 let n1 = store.create_node(&["Person"]);
3909 let n2 = store.create_node(&["Person"]);
3910 store.set_node_property(n1, "email", Value::from("alice@example.com"));
3911 store.set_node_property(n2, "email", Value::from("bob@example.com"));
3912
3913 store.create_property_index("email");
3915
3916 let alice = store.find_nodes_by_property("email", &Value::from("alice@example.com"));
3918 assert_eq!(alice.len(), 1);
3919 assert!(alice.contains(&n1));
3920
3921 let bob = store.find_nodes_by_property("email", &Value::from("bob@example.com"));
3922 assert_eq!(bob.len(), 1);
3923 assert!(bob.contains(&n2));
3924 }
3925
3926 #[test]
3927 fn test_get_node_property_batch() {
3928 let store = LpgStore::new();
3929
3930 let n1 = store.create_node(&["Person"]);
3931 let n2 = store.create_node(&["Person"]);
3932 let n3 = store.create_node(&["Person"]);
3933
3934 store.set_node_property(n1, "age", Value::from(25i64));
3935 store.set_node_property(n2, "age", Value::from(30i64));
3936 let age_key = PropertyKey::new("age");
3939 let values = store.get_node_property_batch(&[n1, n2, n3], &age_key);
3940
3941 assert_eq!(values.len(), 3);
3942 assert_eq!(values[0], Some(Value::from(25i64)));
3943 assert_eq!(values[1], Some(Value::from(30i64)));
3944 assert_eq!(values[2], None);
3945 }
3946
3947 #[test]
3948 fn test_get_node_property_batch_empty() {
3949 let store = LpgStore::new();
3950 let key = PropertyKey::new("any");
3951
3952 let values = store.get_node_property_batch(&[], &key);
3953 assert!(values.is_empty());
3954 }
3955
3956 #[test]
3957 fn test_get_nodes_properties_batch() {
3958 let store = LpgStore::new();
3959
3960 let n1 = store.create_node(&["Person"]);
3961 let n2 = store.create_node(&["Person"]);
3962 let n3 = store.create_node(&["Person"]);
3963
3964 store.set_node_property(n1, "name", Value::from("Alice"));
3965 store.set_node_property(n1, "age", Value::from(25i64));
3966 store.set_node_property(n2, "name", Value::from("Bob"));
3967 let all_props = store.get_nodes_properties_batch(&[n1, n2, n3]);
3970
3971 assert_eq!(all_props.len(), 3);
3972 assert_eq!(all_props[0].len(), 2); assert_eq!(all_props[1].len(), 1); assert_eq!(all_props[2].len(), 0); assert_eq!(
3977 all_props[0].get(&PropertyKey::new("name")),
3978 Some(&Value::from("Alice"))
3979 );
3980 assert_eq!(
3981 all_props[1].get(&PropertyKey::new("name")),
3982 Some(&Value::from("Bob"))
3983 );
3984 }
3985
3986 #[test]
3987 fn test_get_nodes_properties_batch_empty() {
3988 let store = LpgStore::new();
3989
3990 let all_props = store.get_nodes_properties_batch(&[]);
3991 assert!(all_props.is_empty());
3992 }
3993
3994 #[test]
3995 fn test_get_nodes_properties_selective_batch() {
3996 let store = LpgStore::new();
3997
3998 let n1 = store.create_node(&["Person"]);
3999 let n2 = store.create_node(&["Person"]);
4000
4001 store.set_node_property(n1, "name", Value::from("Alice"));
4003 store.set_node_property(n1, "age", Value::from(25i64));
4004 store.set_node_property(n1, "email", Value::from("alice@example.com"));
4005 store.set_node_property(n2, "name", Value::from("Bob"));
4006 store.set_node_property(n2, "age", Value::from(30i64));
4007 store.set_node_property(n2, "city", Value::from("NYC"));
4008
4009 let keys = vec![PropertyKey::new("name"), PropertyKey::new("age")];
4011 let props = store.get_nodes_properties_selective_batch(&[n1, n2], &keys);
4012
4013 assert_eq!(props.len(), 2);
4014
4015 assert_eq!(props[0].len(), 2);
4017 assert_eq!(
4018 props[0].get(&PropertyKey::new("name")),
4019 Some(&Value::from("Alice"))
4020 );
4021 assert_eq!(
4022 props[0].get(&PropertyKey::new("age")),
4023 Some(&Value::from(25i64))
4024 );
4025 assert_eq!(props[0].get(&PropertyKey::new("email")), None);
4026
4027 assert_eq!(props[1].len(), 2);
4029 assert_eq!(
4030 props[1].get(&PropertyKey::new("name")),
4031 Some(&Value::from("Bob"))
4032 );
4033 assert_eq!(
4034 props[1].get(&PropertyKey::new("age")),
4035 Some(&Value::from(30i64))
4036 );
4037 assert_eq!(props[1].get(&PropertyKey::new("city")), None);
4038 }
4039
4040 #[test]
4041 fn test_get_nodes_properties_selective_batch_empty_keys() {
4042 let store = LpgStore::new();
4043
4044 let n1 = store.create_node(&["Person"]);
4045 store.set_node_property(n1, "name", Value::from("Alice"));
4046
4047 let props = store.get_nodes_properties_selective_batch(&[n1], &[]);
4049
4050 assert_eq!(props.len(), 1);
4051 assert!(props[0].is_empty()); }
4053
4054 #[test]
4055 fn test_get_nodes_properties_selective_batch_missing_keys() {
4056 let store = LpgStore::new();
4057
4058 let n1 = store.create_node(&["Person"]);
4059 store.set_node_property(n1, "name", Value::from("Alice"));
4060
4061 let keys = vec![PropertyKey::new("nonexistent"), PropertyKey::new("name")];
4063 let props = store.get_nodes_properties_selective_batch(&[n1], &keys);
4064
4065 assert_eq!(props.len(), 1);
4066 assert_eq!(props[0].len(), 1); assert_eq!(
4068 props[0].get(&PropertyKey::new("name")),
4069 Some(&Value::from("Alice"))
4070 );
4071 }
4072
4073 #[test]
4076 fn test_find_nodes_in_range_inclusive() {
4077 let store = LpgStore::new();
4078
4079 let n1 = store.create_node_with_props(&["Person"], [("age", Value::from(20i64))]);
4080 let n2 = store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
4081 let n3 = store.create_node_with_props(&["Person"], [("age", Value::from(40i64))]);
4082 let _n4 = store.create_node_with_props(&["Person"], [("age", Value::from(50i64))]);
4083
4084 let result = store.find_nodes_in_range(
4086 "age",
4087 Some(&Value::from(20i64)),
4088 Some(&Value::from(40i64)),
4089 true,
4090 true,
4091 );
4092 assert_eq!(result.len(), 3);
4093 assert!(result.contains(&n1));
4094 assert!(result.contains(&n2));
4095 assert!(result.contains(&n3));
4096 }
4097
4098 #[test]
4099 fn test_find_nodes_in_range_exclusive() {
4100 let store = LpgStore::new();
4101
4102 store.create_node_with_props(&["Person"], [("age", Value::from(20i64))]);
4103 let n2 = store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
4104 store.create_node_with_props(&["Person"], [("age", Value::from(40i64))]);
4105
4106 let result = store.find_nodes_in_range(
4108 "age",
4109 Some(&Value::from(20i64)),
4110 Some(&Value::from(40i64)),
4111 false,
4112 false,
4113 );
4114 assert_eq!(result.len(), 1);
4115 assert!(result.contains(&n2));
4116 }
4117
4118 #[test]
4119 fn test_find_nodes_in_range_open_ended() {
4120 let store = LpgStore::new();
4121
4122 store.create_node_with_props(&["Person"], [("age", Value::from(20i64))]);
4123 store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
4124 let n3 = store.create_node_with_props(&["Person"], [("age", Value::from(40i64))]);
4125 let n4 = store.create_node_with_props(&["Person"], [("age", Value::from(50i64))]);
4126
4127 let result = store.find_nodes_in_range("age", Some(&Value::from(35i64)), None, true, true);
4129 assert_eq!(result.len(), 2);
4130 assert!(result.contains(&n3));
4131 assert!(result.contains(&n4));
4132
4133 let result = store.find_nodes_in_range("age", None, Some(&Value::from(25i64)), true, true);
4135 assert_eq!(result.len(), 1);
4136 }
4137
4138 #[test]
4139 fn test_find_nodes_in_range_empty_result() {
4140 let store = LpgStore::new();
4141
4142 store.create_node_with_props(&["Person"], [("age", Value::from(20i64))]);
4143
4144 let result = store.find_nodes_in_range(
4146 "age",
4147 Some(&Value::from(100i64)),
4148 Some(&Value::from(200i64)),
4149 true,
4150 true,
4151 );
4152 assert!(result.is_empty());
4153 }
4154
4155 #[test]
4156 fn test_find_nodes_in_range_nonexistent_property() {
4157 let store = LpgStore::new();
4158
4159 store.create_node_with_props(&["Person"], [("age", Value::from(20i64))]);
4160
4161 let result = store.find_nodes_in_range(
4162 "weight",
4163 Some(&Value::from(50i64)),
4164 Some(&Value::from(100i64)),
4165 true,
4166 true,
4167 );
4168 assert!(result.is_empty());
4169 }
4170
4171 #[test]
4174 fn test_find_nodes_by_properties_multiple_conditions() {
4175 let store = LpgStore::new();
4176
4177 let alice = store.create_node_with_props(
4178 &["Person"],
4179 [("name", Value::from("Alice")), ("city", Value::from("NYC"))],
4180 );
4181 store.create_node_with_props(
4182 &["Person"],
4183 [("name", Value::from("Bob")), ("city", Value::from("NYC"))],
4184 );
4185 store.create_node_with_props(
4186 &["Person"],
4187 [("name", Value::from("Alice")), ("city", Value::from("LA"))],
4188 );
4189
4190 let result = store.find_nodes_by_properties(&[
4192 ("name", Value::from("Alice")),
4193 ("city", Value::from("NYC")),
4194 ]);
4195 assert_eq!(result.len(), 1);
4196 assert!(result.contains(&alice));
4197 }
4198
4199 #[test]
4200 fn test_find_nodes_by_properties_empty_conditions() {
4201 let store = LpgStore::new();
4202
4203 store.create_node(&["Person"]);
4204 store.create_node(&["Person"]);
4205
4206 let result = store.find_nodes_by_properties(&[]);
4208 assert_eq!(result.len(), 2);
4209 }
4210
4211 #[test]
4212 fn test_find_nodes_by_properties_no_match() {
4213 let store = LpgStore::new();
4214
4215 store.create_node_with_props(&["Person"], [("name", Value::from("Alice"))]);
4216
4217 let result = store.find_nodes_by_properties(&[("name", Value::from("Nobody"))]);
4218 assert!(result.is_empty());
4219 }
4220
4221 #[test]
4222 fn test_find_nodes_by_properties_with_index() {
4223 let store = LpgStore::new();
4224
4225 store.create_property_index("name");
4227
4228 let alice = store.create_node_with_props(
4229 &["Person"],
4230 [("name", Value::from("Alice")), ("age", Value::from(30i64))],
4231 );
4232 store.create_node_with_props(
4233 &["Person"],
4234 [("name", Value::from("Bob")), ("age", Value::from(30i64))],
4235 );
4236
4237 let result = store.find_nodes_by_properties(&[
4239 ("name", Value::from("Alice")),
4240 ("age", Value::from(30i64)),
4241 ]);
4242 assert_eq!(result.len(), 1);
4243 assert!(result.contains(&alice));
4244 }
4245
4246 #[test]
4249 fn test_estimate_label_cardinality() {
4250 let store = LpgStore::new();
4251
4252 store.create_node(&["Person"]);
4253 store.create_node(&["Person"]);
4254 store.create_node(&["Animal"]);
4255
4256 store.ensure_statistics_fresh();
4257
4258 let person_est = store.estimate_label_cardinality("Person");
4259 let animal_est = store.estimate_label_cardinality("Animal");
4260 let unknown_est = store.estimate_label_cardinality("Unknown");
4261
4262 assert!(
4263 person_est >= 1.0,
4264 "Person should have cardinality >= 1, got {person_est}"
4265 );
4266 assert!(
4267 animal_est >= 1.0,
4268 "Animal should have cardinality >= 1, got {animal_est}"
4269 );
4270 assert!(unknown_est >= 0.0);
4272 }
4273
4274 #[test]
4275 fn test_estimate_avg_degree() {
4276 let store = LpgStore::new();
4277
4278 let a = store.create_node(&["Person"]);
4279 let b = store.create_node(&["Person"]);
4280 let c = store.create_node(&["Person"]);
4281
4282 store.create_edge(a, b, "KNOWS");
4283 store.create_edge(a, c, "KNOWS");
4284 store.create_edge(b, c, "KNOWS");
4285
4286 store.ensure_statistics_fresh();
4287
4288 let outgoing = store.estimate_avg_degree("KNOWS", true);
4289 let incoming = store.estimate_avg_degree("KNOWS", false);
4290
4291 assert!(
4292 outgoing > 0.0,
4293 "Outgoing degree should be > 0, got {outgoing}"
4294 );
4295 assert!(
4296 incoming > 0.0,
4297 "Incoming degree should be > 0, got {incoming}"
4298 );
4299 }
4300
4301 #[test]
4304 fn test_delete_node_does_not_cascade() {
4305 let store = LpgStore::new();
4306
4307 let a = store.create_node(&["A"]);
4308 let b = store.create_node(&["B"]);
4309 let e = store.create_edge(a, b, "KNOWS");
4310
4311 assert!(store.delete_node(a));
4312 assert!(store.get_node(a).is_none());
4313
4314 assert!(
4316 store.get_edge(e).is_some(),
4317 "Edge should survive non-detach node delete"
4318 );
4319 }
4320
4321 #[test]
4322 fn test_delete_already_deleted_node() {
4323 let store = LpgStore::new();
4324 let a = store.create_node(&["A"]);
4325
4326 assert!(store.delete_node(a));
4327 assert!(!store.delete_node(a));
4329 }
4330
4331 #[test]
4332 fn test_delete_nonexistent_node() {
4333 let store = LpgStore::new();
4334 assert!(!store.delete_node(NodeId::new(999)));
4335 }
4336}