1use super::property::CompareOp;
13use super::{Edge, EdgeRecord, Node, NodeRecord, PropertyStorage};
14use crate::graph::Direction;
15use crate::index::adjacency::ChunkedAdjacency;
16use crate::index::zone_map::ZoneMapEntry;
17use crate::statistics::{EdgeTypeStatistics, LabelStatistics, Statistics};
18use arcstr::ArcStr;
19use dashmap::DashMap;
20#[cfg(not(feature = "tiered-storage"))]
21use grafeo_common::mvcc::VersionChain;
22use grafeo_common::types::{EdgeId, EpochId, HashableValue, NodeId, PropertyKey, TxId, Value};
23use grafeo_common::utils::hash::{FxHashMap, FxHashSet};
24use parking_lot::RwLock;
25use std::cmp::Ordering as CmpOrdering;
26#[cfg(any(feature = "tiered-storage", feature = "vector-index"))]
27use std::sync::Arc;
28use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
29
30#[cfg(feature = "vector-index")]
31use crate::index::vector::HnswIndex;
32
33fn compare_values_for_range(a: &Value, b: &Value) -> Option<CmpOrdering> {
35 match (a, b) {
36 (Value::Int64(a), Value::Int64(b)) => Some(a.cmp(b)),
37 (Value::Float64(a), Value::Float64(b)) => a.partial_cmp(b),
38 (Value::String(a), Value::String(b)) => Some(a.cmp(b)),
39 (Value::Bool(a), Value::Bool(b)) => Some(a.cmp(b)),
40 _ => None,
41 }
42}
43
44fn value_in_range(
46 value: &Value,
47 min: Option<&Value>,
48 max: Option<&Value>,
49 min_inclusive: bool,
50 max_inclusive: bool,
51) -> bool {
52 if let Some(min_val) = min {
54 match compare_values_for_range(value, min_val) {
55 Some(CmpOrdering::Less) => return false,
56 Some(CmpOrdering::Equal) if !min_inclusive => return false,
57 None => return false, _ => {}
59 }
60 }
61
62 if let Some(max_val) = max {
64 match compare_values_for_range(value, max_val) {
65 Some(CmpOrdering::Greater) => return false,
66 Some(CmpOrdering::Equal) if !max_inclusive => return false,
67 None => return false,
68 _ => {}
69 }
70 }
71
72 true
73}
74
75#[cfg(feature = "tiered-storage")]
77use crate::storage::EpochStore;
78#[cfg(feature = "tiered-storage")]
79use grafeo_common::memory::arena::ArenaAllocator;
80#[cfg(feature = "tiered-storage")]
81use grafeo_common::mvcc::{ColdVersionRef, HotVersionRef, VersionIndex, VersionRef};
82
83#[derive(Debug, Clone)]
89pub struct LpgStoreConfig {
90 pub backward_edges: bool,
93 pub initial_node_capacity: usize,
95 pub initial_edge_capacity: usize,
97}
98
99impl Default for LpgStoreConfig {
100 fn default() -> Self {
101 Self {
102 backward_edges: true,
103 initial_node_capacity: 1024,
104 initial_edge_capacity: 4096,
105 }
106 }
107}
108
109pub struct LpgStore {
168 #[allow(dead_code)]
170 config: LpgStoreConfig,
171
172 #[cfg(not(feature = "tiered-storage"))]
176 nodes: RwLock<FxHashMap<NodeId, VersionChain<NodeRecord>>>,
177
178 #[cfg(not(feature = "tiered-storage"))]
182 edges: RwLock<FxHashMap<EdgeId, VersionChain<EdgeRecord>>>,
183
184 #[cfg(feature = "tiered-storage")]
198 arena_allocator: Arc<ArenaAllocator>,
199
200 #[cfg(feature = "tiered-storage")]
204 node_versions: RwLock<FxHashMap<NodeId, VersionIndex>>,
205
206 #[cfg(feature = "tiered-storage")]
210 edge_versions: RwLock<FxHashMap<EdgeId, VersionIndex>>,
211
212 #[cfg(feature = "tiered-storage")]
215 epoch_store: Arc<EpochStore>,
216
217 node_properties: PropertyStorage<NodeId>,
219
220 edge_properties: PropertyStorage<EdgeId>,
222
223 label_to_id: RwLock<FxHashMap<ArcStr, u32>>,
226
227 id_to_label: RwLock<Vec<ArcStr>>,
230
231 edge_type_to_id: RwLock<FxHashMap<ArcStr, u32>>,
234
235 id_to_edge_type: RwLock<Vec<ArcStr>>,
238
239 forward_adj: ChunkedAdjacency,
241
242 backward_adj: Option<ChunkedAdjacency>,
245
246 label_index: RwLock<Vec<FxHashMap<NodeId, ()>>>,
249
250 node_labels: RwLock<FxHashMap<NodeId, FxHashSet<u32>>>,
254
255 property_indexes: RwLock<FxHashMap<PropertyKey, DashMap<HashableValue, FxHashSet<NodeId>>>>,
261
262 #[cfg(feature = "vector-index")]
267 vector_indexes: RwLock<FxHashMap<String, Arc<HnswIndex>>>,
268
269 next_node_id: AtomicU64,
271
272 next_edge_id: AtomicU64,
274
275 current_epoch: AtomicU64,
277
278 statistics: RwLock<Statistics>,
281
282 needs_stats_recompute: AtomicBool,
284}
285
286impl LpgStore {
287 #[must_use]
289 pub fn new() -> Self {
290 Self::with_config(LpgStoreConfig::default())
291 }
292
293 #[must_use]
295 pub fn with_config(config: LpgStoreConfig) -> Self {
296 let backward_adj = if config.backward_edges {
297 Some(ChunkedAdjacency::new())
298 } else {
299 None
300 };
301
302 Self {
303 #[cfg(not(feature = "tiered-storage"))]
304 nodes: RwLock::new(FxHashMap::default()),
305 #[cfg(not(feature = "tiered-storage"))]
306 edges: RwLock::new(FxHashMap::default()),
307 #[cfg(feature = "tiered-storage")]
308 arena_allocator: Arc::new(ArenaAllocator::new()),
309 #[cfg(feature = "tiered-storage")]
310 node_versions: RwLock::new(FxHashMap::default()),
311 #[cfg(feature = "tiered-storage")]
312 edge_versions: RwLock::new(FxHashMap::default()),
313 #[cfg(feature = "tiered-storage")]
314 epoch_store: Arc::new(EpochStore::new()),
315 node_properties: PropertyStorage::new(),
316 edge_properties: PropertyStorage::new(),
317 label_to_id: RwLock::new(FxHashMap::default()),
318 id_to_label: RwLock::new(Vec::new()),
319 edge_type_to_id: RwLock::new(FxHashMap::default()),
320 id_to_edge_type: RwLock::new(Vec::new()),
321 forward_adj: ChunkedAdjacency::new(),
322 backward_adj,
323 label_index: RwLock::new(Vec::new()),
324 node_labels: RwLock::new(FxHashMap::default()),
325 property_indexes: RwLock::new(FxHashMap::default()),
326 #[cfg(feature = "vector-index")]
327 vector_indexes: RwLock::new(FxHashMap::default()),
328 next_node_id: AtomicU64::new(0),
329 next_edge_id: AtomicU64::new(0),
330 current_epoch: AtomicU64::new(0),
331 statistics: RwLock::new(Statistics::new()),
332 needs_stats_recompute: AtomicBool::new(true),
333 config,
334 }
335 }
336
337 #[must_use]
339 pub fn current_epoch(&self) -> EpochId {
340 EpochId::new(self.current_epoch.load(Ordering::Acquire))
341 }
342
343 pub fn new_epoch(&self) -> EpochId {
345 let id = self.current_epoch.fetch_add(1, Ordering::AcqRel) + 1;
346 EpochId::new(id)
347 }
348
349 pub fn create_node(&self, labels: &[&str]) -> NodeId {
355 self.needs_stats_recompute.store(true, Ordering::Relaxed);
356 self.create_node_versioned(labels, self.current_epoch(), TxId::SYSTEM)
357 }
358
359 #[cfg(not(feature = "tiered-storage"))]
361 pub fn create_node_versioned(&self, labels: &[&str], epoch: EpochId, tx_id: TxId) -> NodeId {
362 let id = NodeId::new(self.next_node_id.fetch_add(1, Ordering::Relaxed));
363
364 let mut record = NodeRecord::new(id, epoch);
365 record.set_label_count(labels.len() as u16);
366
367 let mut node_label_set = FxHashSet::default();
369 for label in labels {
370 let label_id = self.get_or_create_label_id(*label);
371 node_label_set.insert(label_id);
372
373 let mut index = self.label_index.write();
375 while index.len() <= label_id as usize {
376 index.push(FxHashMap::default());
377 }
378 index[label_id as usize].insert(id, ());
379 }
380
381 self.node_labels.write().insert(id, node_label_set);
383
384 let chain = VersionChain::with_initial(record, epoch, tx_id);
386 self.nodes.write().insert(id, chain);
387 id
388 }
389
390 #[cfg(feature = "tiered-storage")]
393 pub fn create_node_versioned(&self, labels: &[&str], epoch: EpochId, tx_id: TxId) -> NodeId {
394 let id = NodeId::new(self.next_node_id.fetch_add(1, Ordering::Relaxed));
395
396 let mut record = NodeRecord::new(id, epoch);
397 record.set_label_count(labels.len() as u16);
398
399 let mut node_label_set = FxHashSet::default();
401 for label in labels {
402 let label_id = self.get_or_create_label_id(*label);
403 node_label_set.insert(label_id);
404
405 let mut index = self.label_index.write();
407 while index.len() <= label_id as usize {
408 index.push(FxHashMap::default());
409 }
410 index[label_id as usize].insert(id, ());
411 }
412
413 self.node_labels.write().insert(id, node_label_set);
415
416 let arena = self.arena_allocator.arena_or_create(epoch);
418 let (offset, _stored) = arena.alloc_value_with_offset(record);
419
420 let hot_ref = HotVersionRef::new(epoch, offset, tx_id);
422
423 let mut versions = self.node_versions.write();
425 if let Some(index) = versions.get_mut(&id) {
426 index.add_hot(hot_ref);
427 } else {
428 versions.insert(id, VersionIndex::with_initial(hot_ref));
429 }
430
431 id
432 }
433
434 pub fn create_node_with_props(
436 &self,
437 labels: &[&str],
438 properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
439 ) -> NodeId {
440 self.create_node_with_props_versioned(
441 labels,
442 properties,
443 self.current_epoch(),
444 TxId::SYSTEM,
445 )
446 }
447
448 #[cfg(not(feature = "tiered-storage"))]
450 pub fn create_node_with_props_versioned(
451 &self,
452 labels: &[&str],
453 properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
454 epoch: EpochId,
455 tx_id: TxId,
456 ) -> NodeId {
457 let id = self.create_node_versioned(labels, epoch, tx_id);
458
459 for (key, value) in properties {
460 let prop_key: PropertyKey = key.into();
461 let prop_value: Value = value.into();
462 self.update_property_index_on_set(id, &prop_key, &prop_value);
464 self.node_properties.set(id, prop_key, prop_value);
465 }
466
467 let count = self.node_properties.get_all(id).len() as u16;
469 if let Some(chain) = self.nodes.write().get_mut(&id)
470 && let Some(record) = chain.latest_mut()
471 {
472 record.props_count = count;
473 }
474
475 id
476 }
477
478 #[cfg(feature = "tiered-storage")]
481 pub fn create_node_with_props_versioned(
482 &self,
483 labels: &[&str],
484 properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
485 epoch: EpochId,
486 tx_id: TxId,
487 ) -> NodeId {
488 let id = self.create_node_versioned(labels, epoch, tx_id);
489
490 for (key, value) in properties {
491 let prop_key: PropertyKey = key.into();
492 let prop_value: Value = value.into();
493 self.update_property_index_on_set(id, &prop_key, &prop_value);
495 self.node_properties.set(id, prop_key, prop_value);
496 }
497
498 id
502 }
503
504 #[must_use]
506 pub fn get_node(&self, id: NodeId) -> Option<Node> {
507 self.get_node_at_epoch(id, self.current_epoch())
508 }
509
510 #[must_use]
512 #[cfg(not(feature = "tiered-storage"))]
513 pub fn get_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> Option<Node> {
514 let nodes = self.nodes.read();
515 let chain = nodes.get(&id)?;
516 let record = chain.visible_at(epoch)?;
517
518 if record.is_deleted() {
519 return None;
520 }
521
522 let mut node = Node::new(id);
523
524 let id_to_label = self.id_to_label.read();
526 let node_labels = self.node_labels.read();
527 if let Some(label_ids) = node_labels.get(&id) {
528 for &label_id in label_ids {
529 if let Some(label) = id_to_label.get(label_id as usize) {
530 node.labels.push(label.clone());
531 }
532 }
533 }
534
535 node.properties = self.node_properties.get_all(id).into_iter().collect();
537
538 Some(node)
539 }
540
541 #[must_use]
544 #[cfg(feature = "tiered-storage")]
545 pub fn get_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> Option<Node> {
546 let versions = self.node_versions.read();
547 let index = versions.get(&id)?;
548 let version_ref = index.visible_at(epoch)?;
549
550 let record = self.read_node_record(&version_ref)?;
552
553 if record.is_deleted() {
554 return None;
555 }
556
557 let mut node = Node::new(id);
558
559 let id_to_label = self.id_to_label.read();
561 let node_labels = self.node_labels.read();
562 if let Some(label_ids) = node_labels.get(&id) {
563 for &label_id in label_ids {
564 if let Some(label) = id_to_label.get(label_id as usize) {
565 node.labels.push(label.clone());
566 }
567 }
568 }
569
570 node.properties = self.node_properties.get_all(id).into_iter().collect();
572
573 Some(node)
574 }
575
576 #[must_use]
578 #[cfg(not(feature = "tiered-storage"))]
579 pub fn get_node_versioned(&self, id: NodeId, epoch: EpochId, tx_id: TxId) -> Option<Node> {
580 let nodes = self.nodes.read();
581 let chain = nodes.get(&id)?;
582 let record = chain.visible_to(epoch, tx_id)?;
583
584 if record.is_deleted() {
585 return None;
586 }
587
588 let mut node = Node::new(id);
589
590 let id_to_label = self.id_to_label.read();
592 let node_labels = self.node_labels.read();
593 if let Some(label_ids) = node_labels.get(&id) {
594 for &label_id in label_ids {
595 if let Some(label) = id_to_label.get(label_id as usize) {
596 node.labels.push(label.clone());
597 }
598 }
599 }
600
601 node.properties = self.node_properties.get_all(id).into_iter().collect();
603
604 Some(node)
605 }
606
607 #[must_use]
610 #[cfg(feature = "tiered-storage")]
611 pub fn get_node_versioned(&self, id: NodeId, epoch: EpochId, tx_id: TxId) -> Option<Node> {
612 let versions = self.node_versions.read();
613 let index = versions.get(&id)?;
614 let version_ref = index.visible_to(epoch, tx_id)?;
615
616 let record = self.read_node_record(&version_ref)?;
618
619 if record.is_deleted() {
620 return None;
621 }
622
623 let mut node = Node::new(id);
624
625 let id_to_label = self.id_to_label.read();
627 let node_labels = self.node_labels.read();
628 if let Some(label_ids) = node_labels.get(&id) {
629 for &label_id in label_ids {
630 if let Some(label) = id_to_label.get(label_id as usize) {
631 node.labels.push(label.clone());
632 }
633 }
634 }
635
636 node.properties = self.node_properties.get_all(id).into_iter().collect();
638
639 Some(node)
640 }
641
642 #[cfg(feature = "tiered-storage")]
644 #[allow(unsafe_code)]
645 fn read_node_record(&self, version_ref: &VersionRef) -> Option<NodeRecord> {
646 match version_ref {
647 VersionRef::Hot(hot_ref) => {
648 let arena = self.arena_allocator.arena(hot_ref.epoch);
649 let record: &NodeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
651 Some(*record)
652 }
653 VersionRef::Cold(cold_ref) => {
654 self.epoch_store
656 .get_node(cold_ref.epoch, cold_ref.block_offset, cold_ref.length)
657 }
658 }
659 }
660
661 pub fn delete_node(&self, id: NodeId) -> bool {
663 self.needs_stats_recompute.store(true, Ordering::Relaxed);
664 self.delete_node_at_epoch(id, self.current_epoch())
665 }
666
667 #[cfg(not(feature = "tiered-storage"))]
669 pub fn delete_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> bool {
670 let mut nodes = self.nodes.write();
671 if let Some(chain) = nodes.get_mut(&id) {
672 if let Some(record) = chain.visible_at(epoch) {
674 if record.is_deleted() {
675 return false;
676 }
677 } else {
678 return false;
680 }
681
682 chain.mark_deleted(epoch);
684
685 let mut index = self.label_index.write();
687 let mut node_labels = self.node_labels.write();
688 if let Some(label_ids) = node_labels.remove(&id) {
689 for label_id in label_ids {
690 if let Some(set) = index.get_mut(label_id as usize) {
691 set.remove(&id);
692 }
693 }
694 }
695
696 drop(nodes); drop(index);
699 drop(node_labels);
700 self.node_properties.remove_all(id);
701
702 true
705 } else {
706 false
707 }
708 }
709
710 #[cfg(feature = "tiered-storage")]
713 pub fn delete_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> bool {
714 let mut versions = self.node_versions.write();
715 if let Some(index) = versions.get_mut(&id) {
716 if let Some(version_ref) = index.visible_at(epoch) {
718 if let Some(record) = self.read_node_record(&version_ref) {
719 if record.is_deleted() {
720 return false;
721 }
722 } else {
723 return false;
724 }
725 } else {
726 return false;
727 }
728
729 index.mark_deleted(epoch);
731
732 let mut label_index = self.label_index.write();
734 let mut node_labels = self.node_labels.write();
735 if let Some(label_ids) = node_labels.remove(&id) {
736 for label_id in label_ids {
737 if let Some(set) = label_index.get_mut(label_id as usize) {
738 set.remove(&id);
739 }
740 }
741 }
742
743 drop(versions);
745 drop(label_index);
746 drop(node_labels);
747 self.node_properties.remove_all(id);
748
749 true
750 } else {
751 false
752 }
753 }
754
755 #[cfg(not(feature = "tiered-storage"))]
760 pub fn delete_node_edges(&self, node_id: NodeId) {
761 let outgoing: Vec<EdgeId> = self
763 .forward_adj
764 .edges_from(node_id)
765 .into_iter()
766 .map(|(_, edge_id)| edge_id)
767 .collect();
768
769 let incoming: Vec<EdgeId> = if let Some(ref backward) = self.backward_adj {
771 backward
772 .edges_from(node_id)
773 .into_iter()
774 .map(|(_, edge_id)| edge_id)
775 .collect()
776 } else {
777 let epoch = self.current_epoch();
779 self.edges
780 .read()
781 .iter()
782 .filter_map(|(id, chain)| {
783 chain.visible_at(epoch).and_then(|r| {
784 if !r.is_deleted() && r.dst == node_id {
785 Some(*id)
786 } else {
787 None
788 }
789 })
790 })
791 .collect()
792 };
793
794 for edge_id in outgoing.into_iter().chain(incoming) {
796 self.delete_edge(edge_id);
797 }
798 }
799
800 #[cfg(feature = "tiered-storage")]
803 pub fn delete_node_edges(&self, node_id: NodeId) {
804 let outgoing: Vec<EdgeId> = self
806 .forward_adj
807 .edges_from(node_id)
808 .into_iter()
809 .map(|(_, edge_id)| edge_id)
810 .collect();
811
812 let incoming: Vec<EdgeId> = if let Some(ref backward) = self.backward_adj {
814 backward
815 .edges_from(node_id)
816 .into_iter()
817 .map(|(_, edge_id)| edge_id)
818 .collect()
819 } else {
820 let epoch = self.current_epoch();
822 let versions = self.edge_versions.read();
823 versions
824 .iter()
825 .filter_map(|(id, index)| {
826 index.visible_at(epoch).and_then(|vref| {
827 self.read_edge_record(&vref).and_then(|r| {
828 if !r.is_deleted() && r.dst == node_id {
829 Some(*id)
830 } else {
831 None
832 }
833 })
834 })
835 })
836 .collect()
837 };
838
839 for edge_id in outgoing.into_iter().chain(incoming) {
841 self.delete_edge(edge_id);
842 }
843 }
844
845 #[cfg(not(feature = "tiered-storage"))]
847 pub fn set_node_property(&self, id: NodeId, key: &str, value: Value) {
848 let prop_key: PropertyKey = key.into();
849
850 self.update_property_index_on_set(id, &prop_key, &value);
852
853 self.node_properties.set(id, prop_key, value);
854
855 let count = self.node_properties.get_all(id).len() as u16;
857 if let Some(chain) = self.nodes.write().get_mut(&id)
858 && let Some(record) = chain.latest_mut()
859 {
860 record.props_count = count;
861 }
862 }
863
864 #[cfg(feature = "tiered-storage")]
867 pub fn set_node_property(&self, id: NodeId, key: &str, value: Value) {
868 let prop_key: PropertyKey = key.into();
869
870 self.update_property_index_on_set(id, &prop_key, &value);
872
873 self.node_properties.set(id, prop_key, value);
874 }
878
879 pub fn set_edge_property(&self, id: EdgeId, key: &str, value: Value) {
881 self.edge_properties.set(id, key.into(), value);
882 }
883
884 #[cfg(not(feature = "tiered-storage"))]
888 pub fn remove_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
889 let prop_key: PropertyKey = key.into();
890
891 self.update_property_index_on_remove(id, &prop_key);
893
894 let result = self.node_properties.remove(id, &prop_key);
895
896 let count = self.node_properties.get_all(id).len() as u16;
898 if let Some(chain) = self.nodes.write().get_mut(&id)
899 && let Some(record) = chain.latest_mut()
900 {
901 record.props_count = count;
902 }
903
904 result
905 }
906
907 #[cfg(feature = "tiered-storage")]
910 pub fn remove_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
911 let prop_key: PropertyKey = key.into();
912
913 self.update_property_index_on_remove(id, &prop_key);
915
916 self.node_properties.remove(id, &prop_key)
917 }
919
920 pub fn remove_edge_property(&self, id: EdgeId, key: &str) -> Option<Value> {
924 self.edge_properties.remove(id, &key.into())
925 }
926
927 #[must_use]
942 pub fn get_node_property(&self, id: NodeId, key: &PropertyKey) -> Option<Value> {
943 self.node_properties.get(id, key)
944 }
945
946 #[must_use]
950 pub fn get_edge_property(&self, id: EdgeId, key: &PropertyKey) -> Option<Value> {
951 self.edge_properties.get(id, key)
952 }
953
954 #[must_use]
977 pub fn get_node_property_batch(&self, ids: &[NodeId], key: &PropertyKey) -> Vec<Option<Value>> {
978 self.node_properties.get_batch(ids, key)
979 }
980
981 #[must_use]
986 pub fn get_nodes_properties_batch(&self, ids: &[NodeId]) -> Vec<FxHashMap<PropertyKey, Value>> {
987 self.node_properties.get_all_batch(ids)
988 }
989
990 #[must_use]
1018 pub fn get_nodes_properties_selective_batch(
1019 &self,
1020 ids: &[NodeId],
1021 keys: &[PropertyKey],
1022 ) -> Vec<FxHashMap<PropertyKey, Value>> {
1023 self.node_properties.get_selective_batch(ids, keys)
1024 }
1025
1026 #[must_use]
1030 pub fn get_edges_properties_selective_batch(
1031 &self,
1032 ids: &[EdgeId],
1033 keys: &[PropertyKey],
1034 ) -> Vec<FxHashMap<PropertyKey, Value>> {
1035 self.edge_properties.get_selective_batch(ids, keys)
1036 }
1037
1038 #[must_use]
1074 pub fn find_nodes_in_range(
1075 &self,
1076 property: &str,
1077 min: Option<&Value>,
1078 max: Option<&Value>,
1079 min_inclusive: bool,
1080 max_inclusive: bool,
1081 ) -> Vec<NodeId> {
1082 let key = PropertyKey::new(property);
1083
1084 if !self
1086 .node_properties
1087 .might_match_range(&key, min, max, min_inclusive, max_inclusive)
1088 {
1089 return Vec::new();
1090 }
1091
1092 self.node_ids()
1094 .into_iter()
1095 .filter(|&node_id| {
1096 self.node_properties
1097 .get(node_id, &key)
1098 .is_some_and(|v| value_in_range(&v, min, max, min_inclusive, max_inclusive))
1099 })
1100 .collect()
1101 }
1102
1103 #[must_use]
1128 pub fn find_nodes_by_properties(&self, conditions: &[(&str, Value)]) -> Vec<NodeId> {
1129 if conditions.is_empty() {
1130 return self.node_ids();
1131 }
1132
1133 let mut best_start: Option<(usize, Vec<NodeId>)> = None;
1136 let indexes = self.property_indexes.read();
1137
1138 for (i, (prop, value)) in conditions.iter().enumerate() {
1139 let key = PropertyKey::new(*prop);
1140 let hv = HashableValue::new(value.clone());
1141
1142 if let Some(index) = indexes.get(&key) {
1143 let matches: Vec<NodeId> = index
1144 .get(&hv)
1145 .map(|nodes| nodes.iter().copied().collect())
1146 .unwrap_or_default();
1147
1148 if matches.is_empty() {
1150 return Vec::new();
1151 }
1152
1153 if best_start
1155 .as_ref()
1156 .is_none_or(|(_, best)| matches.len() < best.len())
1157 {
1158 best_start = Some((i, matches));
1159 }
1160 }
1161 }
1162 drop(indexes);
1163
1164 let (start_idx, mut candidates) = best_start.unwrap_or_else(|| {
1166 let (prop, value) = &conditions[0];
1168 (0, self.find_nodes_by_property(prop, value))
1169 });
1170
1171 for (i, (prop, value)) in conditions.iter().enumerate() {
1173 if i == start_idx {
1174 continue;
1175 }
1176
1177 let key = PropertyKey::new(*prop);
1178 candidates.retain(|&node_id| {
1179 self.node_properties
1180 .get(node_id, &key)
1181 .is_some_and(|v| v == *value)
1182 });
1183
1184 if candidates.is_empty() {
1186 return Vec::new();
1187 }
1188 }
1189
1190 candidates
1191 }
1192
1193 pub fn create_property_index(&self, property: &str) {
1221 let key = PropertyKey::new(property);
1222
1223 let mut indexes = self.property_indexes.write();
1224 if indexes.contains_key(&key) {
1225 return; }
1227
1228 let index: DashMap<HashableValue, FxHashSet<NodeId>> = DashMap::new();
1230
1231 for node_id in self.node_ids() {
1233 if let Some(value) = self.node_properties.get(node_id, &key) {
1234 let hv = HashableValue::new(value);
1235 index.entry(hv).or_default().insert(node_id);
1236 }
1237 }
1238
1239 indexes.insert(key, index);
1240 }
1241
1242 pub fn drop_property_index(&self, property: &str) -> bool {
1246 let key = PropertyKey::new(property);
1247 self.property_indexes.write().remove(&key).is_some()
1248 }
1249
1250 #[must_use]
1252 pub fn has_property_index(&self, property: &str) -> bool {
1253 let key = PropertyKey::new(property);
1254 self.property_indexes.read().contains_key(&key)
1255 }
1256
1257 #[cfg(feature = "vector-index")]
1259 pub fn add_vector_index(&self, label: &str, property: &str, index: Arc<HnswIndex>) {
1260 let key = format!("{label}:{property}");
1261 self.vector_indexes.write().insert(key, index);
1262 }
1263
1264 #[cfg(feature = "vector-index")]
1266 #[must_use]
1267 pub fn get_vector_index(&self, label: &str, property: &str) -> Option<Arc<HnswIndex>> {
1268 let key = format!("{label}:{property}");
1269 self.vector_indexes.read().get(&key).cloned()
1270 }
1271
1272 #[must_use]
1295 pub fn find_nodes_by_property(&self, property: &str, value: &Value) -> Vec<NodeId> {
1296 let key = PropertyKey::new(property);
1297 let hv = HashableValue::new(value.clone());
1298
1299 let indexes = self.property_indexes.read();
1301 if let Some(index) = indexes.get(&key) {
1302 if let Some(nodes) = index.get(&hv) {
1303 return nodes.iter().copied().collect();
1304 }
1305 return Vec::new();
1306 }
1307 drop(indexes);
1308
1309 self.node_ids()
1311 .into_iter()
1312 .filter(|&node_id| {
1313 self.node_properties
1314 .get(node_id, &key)
1315 .is_some_and(|v| v == *value)
1316 })
1317 .collect()
1318 }
1319
1320 fn update_property_index_on_set(&self, node_id: NodeId, key: &PropertyKey, new_value: &Value) {
1322 let indexes = self.property_indexes.read();
1323 if let Some(index) = indexes.get(key) {
1324 if let Some(old_value) = self.node_properties.get(node_id, key) {
1326 let old_hv = HashableValue::new(old_value);
1327 if let Some(mut nodes) = index.get_mut(&old_hv) {
1328 nodes.remove(&node_id);
1329 if nodes.is_empty() {
1330 drop(nodes);
1331 index.remove(&old_hv);
1332 }
1333 }
1334 }
1335
1336 let new_hv = HashableValue::new(new_value.clone());
1338 index
1339 .entry(new_hv)
1340 .or_insert_with(FxHashSet::default)
1341 .insert(node_id);
1342 }
1343 }
1344
1345 fn update_property_index_on_remove(&self, node_id: NodeId, key: &PropertyKey) {
1347 let indexes = self.property_indexes.read();
1348 if let Some(index) = indexes.get(key) {
1349 if let Some(old_value) = self.node_properties.get(node_id, key) {
1351 let old_hv = HashableValue::new(old_value);
1352 if let Some(mut nodes) = index.get_mut(&old_hv) {
1353 nodes.remove(&node_id);
1354 if nodes.is_empty() {
1355 drop(nodes);
1356 index.remove(&old_hv);
1357 }
1358 }
1359 }
1360 }
1361 }
1362
1363 #[cfg(not(feature = "tiered-storage"))]
1368 pub fn add_label(&self, node_id: NodeId, label: &str) -> bool {
1369 let epoch = self.current_epoch();
1370
1371 let nodes = self.nodes.read();
1373 if let Some(chain) = nodes.get(&node_id) {
1374 if chain.visible_at(epoch).map_or(true, |r| r.is_deleted()) {
1375 return false;
1376 }
1377 } else {
1378 return false;
1379 }
1380 drop(nodes);
1381
1382 let label_id = self.get_or_create_label_id(label);
1384
1385 let mut node_labels = self.node_labels.write();
1387 let label_set = node_labels.entry(node_id).or_default();
1388
1389 if label_set.contains(&label_id) {
1390 return false; }
1392
1393 label_set.insert(label_id);
1394 drop(node_labels);
1395
1396 let mut index = self.label_index.write();
1398 if (label_id as usize) >= index.len() {
1399 index.resize(label_id as usize + 1, FxHashMap::default());
1400 }
1401 index[label_id as usize].insert(node_id, ());
1402
1403 if let Some(chain) = self.nodes.write().get_mut(&node_id)
1405 && let Some(record) = chain.latest_mut()
1406 {
1407 let count = self.node_labels.read().get(&node_id).map_or(0, |s| s.len());
1408 record.set_label_count(count as u16);
1409 }
1410
1411 true
1412 }
1413
1414 #[cfg(feature = "tiered-storage")]
1417 pub fn add_label(&self, node_id: NodeId, label: &str) -> bool {
1418 let epoch = self.current_epoch();
1419
1420 let versions = self.node_versions.read();
1422 if let Some(index) = versions.get(&node_id) {
1423 if let Some(vref) = index.visible_at(epoch) {
1424 if let Some(record) = self.read_node_record(&vref) {
1425 if record.is_deleted() {
1426 return false;
1427 }
1428 } else {
1429 return false;
1430 }
1431 } else {
1432 return false;
1433 }
1434 } else {
1435 return false;
1436 }
1437 drop(versions);
1438
1439 let label_id = self.get_or_create_label_id(label);
1441
1442 let mut node_labels = self.node_labels.write();
1444 let label_set = node_labels.entry(node_id).or_default();
1445
1446 if label_set.contains(&label_id) {
1447 return false; }
1449
1450 label_set.insert(label_id);
1451 drop(node_labels);
1452
1453 let mut index = self.label_index.write();
1455 if (label_id as usize) >= index.len() {
1456 index.resize(label_id as usize + 1, FxHashMap::default());
1457 }
1458 index[label_id as usize].insert(node_id, ());
1459
1460 true
1464 }
1465
1466 #[cfg(not(feature = "tiered-storage"))]
1471 pub fn remove_label(&self, node_id: NodeId, label: &str) -> bool {
1472 let epoch = self.current_epoch();
1473
1474 let nodes = self.nodes.read();
1476 if let Some(chain) = nodes.get(&node_id) {
1477 if chain.visible_at(epoch).map_or(true, |r| r.is_deleted()) {
1478 return false;
1479 }
1480 } else {
1481 return false;
1482 }
1483 drop(nodes);
1484
1485 let label_id = {
1487 let label_ids = self.label_to_id.read();
1488 match label_ids.get(label) {
1489 Some(&id) => id,
1490 None => return false, }
1492 };
1493
1494 let mut node_labels = self.node_labels.write();
1496 if let Some(label_set) = node_labels.get_mut(&node_id) {
1497 if !label_set.remove(&label_id) {
1498 return false; }
1500 } else {
1501 return false;
1502 }
1503 drop(node_labels);
1504
1505 let mut index = self.label_index.write();
1507 if (label_id as usize) < index.len() {
1508 index[label_id as usize].remove(&node_id);
1509 }
1510
1511 if let Some(chain) = self.nodes.write().get_mut(&node_id)
1513 && let Some(record) = chain.latest_mut()
1514 {
1515 let count = self.node_labels.read().get(&node_id).map_or(0, |s| s.len());
1516 record.set_label_count(count as u16);
1517 }
1518
1519 true
1520 }
1521
1522 #[cfg(feature = "tiered-storage")]
1525 pub fn remove_label(&self, node_id: NodeId, label: &str) -> bool {
1526 let epoch = self.current_epoch();
1527
1528 let versions = self.node_versions.read();
1530 if let Some(index) = versions.get(&node_id) {
1531 if let Some(vref) = index.visible_at(epoch) {
1532 if let Some(record) = self.read_node_record(&vref) {
1533 if record.is_deleted() {
1534 return false;
1535 }
1536 } else {
1537 return false;
1538 }
1539 } else {
1540 return false;
1541 }
1542 } else {
1543 return false;
1544 }
1545 drop(versions);
1546
1547 let label_id = {
1549 let label_ids = self.label_to_id.read();
1550 match label_ids.get(label) {
1551 Some(&id) => id,
1552 None => return false, }
1554 };
1555
1556 let mut node_labels = self.node_labels.write();
1558 if let Some(label_set) = node_labels.get_mut(&node_id) {
1559 if !label_set.remove(&label_id) {
1560 return false; }
1562 } else {
1563 return false;
1564 }
1565 drop(node_labels);
1566
1567 let mut index = self.label_index.write();
1569 if (label_id as usize) < index.len() {
1570 index[label_id as usize].remove(&node_id);
1571 }
1572
1573 true
1576 }
1577
1578 #[must_use]
1580 #[cfg(not(feature = "tiered-storage"))]
1581 pub fn node_count(&self) -> usize {
1582 let epoch = self.current_epoch();
1583 self.nodes
1584 .read()
1585 .values()
1586 .filter_map(|chain| chain.visible_at(epoch))
1587 .filter(|r| !r.is_deleted())
1588 .count()
1589 }
1590
1591 #[must_use]
1594 #[cfg(feature = "tiered-storage")]
1595 pub fn node_count(&self) -> usize {
1596 let epoch = self.current_epoch();
1597 let versions = self.node_versions.read();
1598 versions
1599 .iter()
1600 .filter(|(_, index)| {
1601 index.visible_at(epoch).map_or(false, |vref| {
1602 self.read_node_record(&vref)
1603 .map_or(false, |r| !r.is_deleted())
1604 })
1605 })
1606 .count()
1607 }
1608
1609 #[must_use]
1615 #[cfg(not(feature = "tiered-storage"))]
1616 pub fn node_ids(&self) -> Vec<NodeId> {
1617 let epoch = self.current_epoch();
1618 let mut ids: Vec<NodeId> = self
1619 .nodes
1620 .read()
1621 .iter()
1622 .filter_map(|(id, chain)| {
1623 chain
1624 .visible_at(epoch)
1625 .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
1626 })
1627 .collect();
1628 ids.sort_unstable();
1629 ids
1630 }
1631
1632 #[must_use]
1635 #[cfg(feature = "tiered-storage")]
1636 pub fn node_ids(&self) -> Vec<NodeId> {
1637 let epoch = self.current_epoch();
1638 let versions = self.node_versions.read();
1639 let mut ids: Vec<NodeId> = versions
1640 .iter()
1641 .filter_map(|(id, index)| {
1642 index.visible_at(epoch).and_then(|vref| {
1643 self.read_node_record(&vref)
1644 .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
1645 })
1646 })
1647 .collect();
1648 ids.sort_unstable();
1649 ids
1650 }
1651
1652 pub fn create_edge(&self, src: NodeId, dst: NodeId, edge_type: &str) -> EdgeId {
1656 self.needs_stats_recompute.store(true, Ordering::Relaxed);
1657 self.create_edge_versioned(src, dst, edge_type, self.current_epoch(), TxId::SYSTEM)
1658 }
1659
1660 #[cfg(not(feature = "tiered-storage"))]
1662 pub fn create_edge_versioned(
1663 &self,
1664 src: NodeId,
1665 dst: NodeId,
1666 edge_type: &str,
1667 epoch: EpochId,
1668 tx_id: TxId,
1669 ) -> EdgeId {
1670 let id = EdgeId::new(self.next_edge_id.fetch_add(1, Ordering::Relaxed));
1671 let type_id = self.get_or_create_edge_type_id(edge_type);
1672
1673 let record = EdgeRecord::new(id, src, dst, type_id, epoch);
1674 let chain = VersionChain::with_initial(record, epoch, tx_id);
1675 self.edges.write().insert(id, chain);
1676
1677 self.forward_adj.add_edge(src, dst, id);
1679 if let Some(ref backward) = self.backward_adj {
1680 backward.add_edge(dst, src, id);
1681 }
1682
1683 id
1684 }
1685
1686 #[cfg(feature = "tiered-storage")]
1689 pub fn create_edge_versioned(
1690 &self,
1691 src: NodeId,
1692 dst: NodeId,
1693 edge_type: &str,
1694 epoch: EpochId,
1695 tx_id: TxId,
1696 ) -> EdgeId {
1697 let id = EdgeId::new(self.next_edge_id.fetch_add(1, Ordering::Relaxed));
1698 let type_id = self.get_or_create_edge_type_id(edge_type);
1699
1700 let record = EdgeRecord::new(id, src, dst, type_id, epoch);
1701
1702 let arena = self.arena_allocator.arena_or_create(epoch);
1704 let (offset, _stored) = arena.alloc_value_with_offset(record);
1705
1706 let hot_ref = HotVersionRef::new(epoch, offset, tx_id);
1708
1709 let mut versions = self.edge_versions.write();
1711 if let Some(index) = versions.get_mut(&id) {
1712 index.add_hot(hot_ref);
1713 } else {
1714 versions.insert(id, VersionIndex::with_initial(hot_ref));
1715 }
1716
1717 self.forward_adj.add_edge(src, dst, id);
1719 if let Some(ref backward) = self.backward_adj {
1720 backward.add_edge(dst, src, id);
1721 }
1722
1723 id
1724 }
1725
1726 pub fn create_edge_with_props(
1728 &self,
1729 src: NodeId,
1730 dst: NodeId,
1731 edge_type: &str,
1732 properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
1733 ) -> EdgeId {
1734 let id = self.create_edge(src, dst, edge_type);
1735
1736 for (key, value) in properties {
1737 self.edge_properties.set(id, key.into(), value.into());
1738 }
1739
1740 id
1741 }
1742
1743 #[must_use]
1745 pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
1746 self.get_edge_at_epoch(id, self.current_epoch())
1747 }
1748
1749 #[must_use]
1751 #[cfg(not(feature = "tiered-storage"))]
1752 pub fn get_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> Option<Edge> {
1753 let edges = self.edges.read();
1754 let chain = edges.get(&id)?;
1755 let record = chain.visible_at(epoch)?;
1756
1757 if record.is_deleted() {
1758 return None;
1759 }
1760
1761 let edge_type = {
1762 let id_to_type = self.id_to_edge_type.read();
1763 id_to_type.get(record.type_id as usize)?.clone()
1764 };
1765
1766 let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1767
1768 edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1770
1771 Some(edge)
1772 }
1773
1774 #[must_use]
1777 #[cfg(feature = "tiered-storage")]
1778 pub fn get_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> Option<Edge> {
1779 let versions = self.edge_versions.read();
1780 let index = versions.get(&id)?;
1781 let version_ref = index.visible_at(epoch)?;
1782
1783 let record = self.read_edge_record(&version_ref)?;
1784
1785 if record.is_deleted() {
1786 return None;
1787 }
1788
1789 let edge_type = {
1790 let id_to_type = self.id_to_edge_type.read();
1791 id_to_type.get(record.type_id as usize)?.clone()
1792 };
1793
1794 let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1795
1796 edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1798
1799 Some(edge)
1800 }
1801
1802 #[must_use]
1804 #[cfg(not(feature = "tiered-storage"))]
1805 pub fn get_edge_versioned(&self, id: EdgeId, epoch: EpochId, tx_id: TxId) -> Option<Edge> {
1806 let edges = self.edges.read();
1807 let chain = edges.get(&id)?;
1808 let record = chain.visible_to(epoch, tx_id)?;
1809
1810 if record.is_deleted() {
1811 return None;
1812 }
1813
1814 let edge_type = {
1815 let id_to_type = self.id_to_edge_type.read();
1816 id_to_type.get(record.type_id as usize)?.clone()
1817 };
1818
1819 let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1820
1821 edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1823
1824 Some(edge)
1825 }
1826
1827 #[must_use]
1830 #[cfg(feature = "tiered-storage")]
1831 pub fn get_edge_versioned(&self, id: EdgeId, epoch: EpochId, tx_id: TxId) -> Option<Edge> {
1832 let versions = self.edge_versions.read();
1833 let index = versions.get(&id)?;
1834 let version_ref = index.visible_to(epoch, tx_id)?;
1835
1836 let record = self.read_edge_record(&version_ref)?;
1837
1838 if record.is_deleted() {
1839 return None;
1840 }
1841
1842 let edge_type = {
1843 let id_to_type = self.id_to_edge_type.read();
1844 id_to_type.get(record.type_id as usize)?.clone()
1845 };
1846
1847 let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1848
1849 edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1851
1852 Some(edge)
1853 }
1854
1855 #[cfg(feature = "tiered-storage")]
1857 #[allow(unsafe_code)]
1858 fn read_edge_record(&self, version_ref: &VersionRef) -> Option<EdgeRecord> {
1859 match version_ref {
1860 VersionRef::Hot(hot_ref) => {
1861 let arena = self.arena_allocator.arena(hot_ref.epoch);
1862 let record: &EdgeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
1864 Some(*record)
1865 }
1866 VersionRef::Cold(cold_ref) => {
1867 self.epoch_store
1869 .get_edge(cold_ref.epoch, cold_ref.block_offset, cold_ref.length)
1870 }
1871 }
1872 }
1873
1874 pub fn delete_edge(&self, id: EdgeId) -> bool {
1876 self.needs_stats_recompute.store(true, Ordering::Relaxed);
1877 self.delete_edge_at_epoch(id, self.current_epoch())
1878 }
1879
1880 #[cfg(not(feature = "tiered-storage"))]
1882 pub fn delete_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> bool {
1883 let mut edges = self.edges.write();
1884 if let Some(chain) = edges.get_mut(&id) {
1885 let (src, dst) = {
1887 match chain.visible_at(epoch) {
1888 Some(record) => {
1889 if record.is_deleted() {
1890 return false;
1891 }
1892 (record.src, record.dst)
1893 }
1894 None => return false, }
1896 };
1897
1898 chain.mark_deleted(epoch);
1900
1901 drop(edges); self.forward_adj.mark_deleted(src, id);
1905 if let Some(ref backward) = self.backward_adj {
1906 backward.mark_deleted(dst, id);
1907 }
1908
1909 self.edge_properties.remove_all(id);
1911
1912 true
1913 } else {
1914 false
1915 }
1916 }
1917
1918 #[cfg(feature = "tiered-storage")]
1921 pub fn delete_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> bool {
1922 let mut versions = self.edge_versions.write();
1923 if let Some(index) = versions.get_mut(&id) {
1924 let (src, dst) = {
1926 match index.visible_at(epoch) {
1927 Some(version_ref) => {
1928 if let Some(record) = self.read_edge_record(&version_ref) {
1929 if record.is_deleted() {
1930 return false;
1931 }
1932 (record.src, record.dst)
1933 } else {
1934 return false;
1935 }
1936 }
1937 None => return false,
1938 }
1939 };
1940
1941 index.mark_deleted(epoch);
1943
1944 drop(versions); self.forward_adj.mark_deleted(src, id);
1948 if let Some(ref backward) = self.backward_adj {
1949 backward.mark_deleted(dst, id);
1950 }
1951
1952 self.edge_properties.remove_all(id);
1954
1955 true
1956 } else {
1957 false
1958 }
1959 }
1960
1961 #[must_use]
1963 #[cfg(not(feature = "tiered-storage"))]
1964 pub fn edge_count(&self) -> usize {
1965 let epoch = self.current_epoch();
1966 self.edges
1967 .read()
1968 .values()
1969 .filter_map(|chain| chain.visible_at(epoch))
1970 .filter(|r| !r.is_deleted())
1971 .count()
1972 }
1973
1974 #[must_use]
1977 #[cfg(feature = "tiered-storage")]
1978 pub fn edge_count(&self) -> usize {
1979 let epoch = self.current_epoch();
1980 let versions = self.edge_versions.read();
1981 versions
1982 .iter()
1983 .filter(|(_, index)| {
1984 index.visible_at(epoch).map_or(false, |vref| {
1985 self.read_edge_record(&vref)
1986 .map_or(false, |r| !r.is_deleted())
1987 })
1988 })
1989 .count()
1990 }
1991
1992 #[cfg(not(feature = "tiered-storage"))]
1997 pub fn discard_uncommitted_versions(&self, tx_id: TxId) {
1998 {
2000 let mut nodes = self.nodes.write();
2001 for chain in nodes.values_mut() {
2002 chain.remove_versions_by(tx_id);
2003 }
2004 nodes.retain(|_, chain| !chain.is_empty());
2006 }
2007
2008 {
2010 let mut edges = self.edges.write();
2011 for chain in edges.values_mut() {
2012 chain.remove_versions_by(tx_id);
2013 }
2014 edges.retain(|_, chain| !chain.is_empty());
2016 }
2017 }
2018
2019 #[cfg(feature = "tiered-storage")]
2022 pub fn discard_uncommitted_versions(&self, tx_id: TxId) {
2023 {
2025 let mut versions = self.node_versions.write();
2026 for index in versions.values_mut() {
2027 index.remove_versions_by(tx_id);
2028 }
2029 versions.retain(|_, index| !index.is_empty());
2031 }
2032
2033 {
2035 let mut versions = self.edge_versions.write();
2036 for index in versions.values_mut() {
2037 index.remove_versions_by(tx_id);
2038 }
2039 versions.retain(|_, index| !index.is_empty());
2041 }
2042 }
2043
2044 #[cfg(feature = "tiered-storage")]
2063 #[allow(unsafe_code)]
2064 pub fn freeze_epoch(&self, epoch: EpochId) -> usize {
2065 let mut node_records: Vec<(u64, NodeRecord)> = Vec::new();
2067 let mut node_hot_refs: Vec<(NodeId, HotVersionRef)> = Vec::new();
2068
2069 {
2070 let versions = self.node_versions.read();
2071 for (node_id, index) in versions.iter() {
2072 for hot_ref in index.hot_refs_for_epoch(epoch) {
2073 let arena = self.arena_allocator.arena(hot_ref.epoch);
2074 let record: &NodeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
2076 node_records.push((node_id.as_u64(), *record));
2077 node_hot_refs.push((*node_id, *hot_ref));
2078 }
2079 }
2080 }
2081
2082 let mut edge_records: Vec<(u64, EdgeRecord)> = Vec::new();
2084 let mut edge_hot_refs: Vec<(EdgeId, HotVersionRef)> = Vec::new();
2085
2086 {
2087 let versions = self.edge_versions.read();
2088 for (edge_id, index) in versions.iter() {
2089 for hot_ref in index.hot_refs_for_epoch(epoch) {
2090 let arena = self.arena_allocator.arena(hot_ref.epoch);
2091 let record: &EdgeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
2093 edge_records.push((edge_id.as_u64(), *record));
2094 edge_hot_refs.push((*edge_id, *hot_ref));
2095 }
2096 }
2097 }
2098
2099 let total_frozen = node_records.len() + edge_records.len();
2100
2101 if total_frozen == 0 {
2102 return 0;
2103 }
2104
2105 let (node_entries, edge_entries) =
2107 self.epoch_store
2108 .freeze_epoch(epoch, node_records, edge_records);
2109
2110 let node_entry_map: FxHashMap<u64, _> = node_entries
2112 .iter()
2113 .map(|e| (e.entity_id, (e.offset, e.length)))
2114 .collect();
2115 let edge_entry_map: FxHashMap<u64, _> = edge_entries
2116 .iter()
2117 .map(|e| (e.entity_id, (e.offset, e.length)))
2118 .collect();
2119
2120 {
2122 let mut versions = self.node_versions.write();
2123 for (node_id, hot_ref) in &node_hot_refs {
2124 if let Some(index) = versions.get_mut(node_id)
2125 && let Some(&(offset, length)) = node_entry_map.get(&node_id.as_u64())
2126 {
2127 let cold_ref = ColdVersionRef {
2128 epoch,
2129 block_offset: offset,
2130 length,
2131 created_by: hot_ref.created_by,
2132 deleted_epoch: hot_ref.deleted_epoch,
2133 };
2134 index.freeze_epoch(epoch, std::iter::once(cold_ref));
2135 }
2136 }
2137 }
2138
2139 {
2140 let mut versions = self.edge_versions.write();
2141 for (edge_id, hot_ref) in &edge_hot_refs {
2142 if let Some(index) = versions.get_mut(edge_id)
2143 && let Some(&(offset, length)) = edge_entry_map.get(&edge_id.as_u64())
2144 {
2145 let cold_ref = ColdVersionRef {
2146 epoch,
2147 block_offset: offset,
2148 length,
2149 created_by: hot_ref.created_by,
2150 deleted_epoch: hot_ref.deleted_epoch,
2151 };
2152 index.freeze_epoch(epoch, std::iter::once(cold_ref));
2153 }
2154 }
2155 }
2156
2157 total_frozen
2158 }
2159
2160 #[cfg(feature = "tiered-storage")]
2162 #[must_use]
2163 pub fn epoch_store(&self) -> &EpochStore {
2164 &self.epoch_store
2165 }
2166
2167 #[must_use]
2169 pub fn label_count(&self) -> usize {
2170 self.id_to_label.read().len()
2171 }
2172
2173 #[must_use]
2177 pub fn property_key_count(&self) -> usize {
2178 let node_keys = self.node_properties.column_count();
2179 let edge_keys = self.edge_properties.column_count();
2180 node_keys + edge_keys
2184 }
2185
2186 #[must_use]
2188 pub fn edge_type_count(&self) -> usize {
2189 self.id_to_edge_type.read().len()
2190 }
2191
2192 pub fn neighbors(
2199 &self,
2200 node: NodeId,
2201 direction: Direction,
2202 ) -> impl Iterator<Item = NodeId> + '_ {
2203 let forward: Box<dyn Iterator<Item = NodeId>> = match direction {
2204 Direction::Outgoing | Direction::Both => {
2205 Box::new(self.forward_adj.neighbors(node).into_iter())
2206 }
2207 Direction::Incoming => Box::new(std::iter::empty()),
2208 };
2209
2210 let backward: Box<dyn Iterator<Item = NodeId>> = match direction {
2211 Direction::Incoming | Direction::Both => {
2212 if let Some(ref adj) = self.backward_adj {
2213 Box::new(adj.neighbors(node).into_iter())
2214 } else {
2215 Box::new(std::iter::empty())
2216 }
2217 }
2218 Direction::Outgoing => Box::new(std::iter::empty()),
2219 };
2220
2221 forward.chain(backward)
2222 }
2223
2224 pub fn edges_from(
2228 &self,
2229 node: NodeId,
2230 direction: Direction,
2231 ) -> impl Iterator<Item = (NodeId, EdgeId)> + '_ {
2232 let forward: Box<dyn Iterator<Item = (NodeId, EdgeId)>> = match direction {
2233 Direction::Outgoing | Direction::Both => {
2234 Box::new(self.forward_adj.edges_from(node).into_iter())
2235 }
2236 Direction::Incoming => Box::new(std::iter::empty()),
2237 };
2238
2239 let backward: Box<dyn Iterator<Item = (NodeId, EdgeId)>> = match direction {
2240 Direction::Incoming | Direction::Both => {
2241 if let Some(ref adj) = self.backward_adj {
2242 Box::new(adj.edges_from(node).into_iter())
2243 } else {
2244 Box::new(std::iter::empty())
2245 }
2246 }
2247 Direction::Outgoing => Box::new(std::iter::empty()),
2248 };
2249
2250 forward.chain(backward)
2251 }
2252
2253 pub fn edges_to(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
2266 if let Some(ref backward) = self.backward_adj {
2267 backward.edges_from(node)
2268 } else {
2269 self.all_edges()
2271 .filter_map(|edge| {
2272 if edge.dst == node {
2273 Some((edge.src, edge.id))
2274 } else {
2275 None
2276 }
2277 })
2278 .collect()
2279 }
2280 }
2281
2282 #[must_use]
2286 pub fn out_degree(&self, node: NodeId) -> usize {
2287 self.forward_adj.out_degree(node)
2288 }
2289
2290 #[must_use]
2295 pub fn in_degree(&self, node: NodeId) -> usize {
2296 if let Some(ref backward) = self.backward_adj {
2297 backward.in_degree(node)
2298 } else {
2299 self.all_edges().filter(|edge| edge.dst == node).count()
2301 }
2302 }
2303
2304 #[must_use]
2306 #[cfg(not(feature = "tiered-storage"))]
2307 pub fn edge_type(&self, id: EdgeId) -> Option<ArcStr> {
2308 let edges = self.edges.read();
2309 let chain = edges.get(&id)?;
2310 let epoch = self.current_epoch();
2311 let record = chain.visible_at(epoch)?;
2312 let id_to_type = self.id_to_edge_type.read();
2313 id_to_type.get(record.type_id as usize).cloned()
2314 }
2315
2316 #[must_use]
2319 #[cfg(feature = "tiered-storage")]
2320 pub fn edge_type(&self, id: EdgeId) -> Option<ArcStr> {
2321 let versions = self.edge_versions.read();
2322 let index = versions.get(&id)?;
2323 let epoch = self.current_epoch();
2324 let vref = index.visible_at(epoch)?;
2325 let record = self.read_edge_record(&vref)?;
2326 let id_to_type = self.id_to_edge_type.read();
2327 id_to_type.get(record.type_id as usize).cloned()
2328 }
2329
2330 pub fn nodes_by_label(&self, label: &str) -> Vec<NodeId> {
2336 let label_to_id = self.label_to_id.read();
2337 if let Some(&label_id) = label_to_id.get(label) {
2338 let index = self.label_index.read();
2339 if let Some(set) = index.get(label_id as usize) {
2340 let mut ids: Vec<NodeId> = set.keys().copied().collect();
2341 ids.sort_unstable();
2342 return ids;
2343 }
2344 }
2345 Vec::new()
2346 }
2347
2348 #[cfg(not(feature = "tiered-storage"))]
2355 pub fn all_nodes(&self) -> impl Iterator<Item = Node> + '_ {
2356 let epoch = self.current_epoch();
2357 let node_ids: Vec<NodeId> = self
2358 .nodes
2359 .read()
2360 .iter()
2361 .filter_map(|(id, chain)| {
2362 chain
2363 .visible_at(epoch)
2364 .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
2365 })
2366 .collect();
2367
2368 node_ids.into_iter().filter_map(move |id| self.get_node(id))
2369 }
2370
2371 #[cfg(feature = "tiered-storage")]
2374 pub fn all_nodes(&self) -> impl Iterator<Item = Node> + '_ {
2375 let node_ids = self.node_ids();
2376 node_ids.into_iter().filter_map(move |id| self.get_node(id))
2377 }
2378
2379 #[cfg(not(feature = "tiered-storage"))]
2384 pub fn all_edges(&self) -> impl Iterator<Item = Edge> + '_ {
2385 let epoch = self.current_epoch();
2386 let edge_ids: Vec<EdgeId> = self
2387 .edges
2388 .read()
2389 .iter()
2390 .filter_map(|(id, chain)| {
2391 chain
2392 .visible_at(epoch)
2393 .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
2394 })
2395 .collect();
2396
2397 edge_ids.into_iter().filter_map(move |id| self.get_edge(id))
2398 }
2399
2400 #[cfg(feature = "tiered-storage")]
2403 pub fn all_edges(&self) -> impl Iterator<Item = Edge> + '_ {
2404 let epoch = self.current_epoch();
2405 let versions = self.edge_versions.read();
2406 let edge_ids: Vec<EdgeId> = versions
2407 .iter()
2408 .filter_map(|(id, index)| {
2409 index.visible_at(epoch).and_then(|vref| {
2410 self.read_edge_record(&vref)
2411 .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
2412 })
2413 })
2414 .collect();
2415
2416 edge_ids.into_iter().filter_map(move |id| self.get_edge(id))
2417 }
2418
2419 pub fn all_labels(&self) -> Vec<String> {
2421 self.id_to_label
2422 .read()
2423 .iter()
2424 .map(|s| s.to_string())
2425 .collect()
2426 }
2427
2428 pub fn all_edge_types(&self) -> Vec<String> {
2430 self.id_to_edge_type
2431 .read()
2432 .iter()
2433 .map(|s| s.to_string())
2434 .collect()
2435 }
2436
2437 pub fn all_property_keys(&self) -> Vec<String> {
2439 let mut keys = std::collections::HashSet::new();
2440 for key in self.node_properties.keys() {
2441 keys.insert(key.to_string());
2442 }
2443 for key in self.edge_properties.keys() {
2444 keys.insert(key.to_string());
2445 }
2446 keys.into_iter().collect()
2447 }
2448
2449 pub fn nodes_with_label<'a>(&'a self, label: &str) -> impl Iterator<Item = Node> + 'a {
2451 let node_ids = self.nodes_by_label(label);
2452 node_ids.into_iter().filter_map(move |id| self.get_node(id))
2453 }
2454
2455 #[cfg(not(feature = "tiered-storage"))]
2457 pub fn edges_with_type<'a>(&'a self, edge_type: &str) -> impl Iterator<Item = Edge> + 'a {
2458 let epoch = self.current_epoch();
2459 let type_to_id = self.edge_type_to_id.read();
2460
2461 if let Some(&type_id) = type_to_id.get(edge_type) {
2462 let edge_ids: Vec<EdgeId> = self
2463 .edges
2464 .read()
2465 .iter()
2466 .filter_map(|(id, chain)| {
2467 chain.visible_at(epoch).and_then(|r| {
2468 if !r.is_deleted() && r.type_id == type_id {
2469 Some(*id)
2470 } else {
2471 None
2472 }
2473 })
2474 })
2475 .collect();
2476
2477 Box::new(edge_ids.into_iter().filter_map(move |id| self.get_edge(id)))
2479 as Box<dyn Iterator<Item = Edge> + 'a>
2480 } else {
2481 Box::new(std::iter::empty()) as Box<dyn Iterator<Item = Edge> + 'a>
2483 }
2484 }
2485
2486 #[cfg(feature = "tiered-storage")]
2489 pub fn edges_with_type<'a>(&'a self, edge_type: &str) -> impl Iterator<Item = Edge> + 'a {
2490 let epoch = self.current_epoch();
2491 let type_to_id = self.edge_type_to_id.read();
2492
2493 if let Some(&type_id) = type_to_id.get(edge_type) {
2494 let versions = self.edge_versions.read();
2495 let edge_ids: Vec<EdgeId> = versions
2496 .iter()
2497 .filter_map(|(id, index)| {
2498 index.visible_at(epoch).and_then(|vref| {
2499 self.read_edge_record(&vref).and_then(|r| {
2500 if !r.is_deleted() && r.type_id == type_id {
2501 Some(*id)
2502 } else {
2503 None
2504 }
2505 })
2506 })
2507 })
2508 .collect();
2509
2510 Box::new(edge_ids.into_iter().filter_map(move |id| self.get_edge(id)))
2511 as Box<dyn Iterator<Item = Edge> + 'a>
2512 } else {
2513 Box::new(std::iter::empty()) as Box<dyn Iterator<Item = Edge> + 'a>
2514 }
2515 }
2516
2517 #[must_use]
2524 pub fn node_property_might_match(
2525 &self,
2526 property: &PropertyKey,
2527 op: CompareOp,
2528 value: &Value,
2529 ) -> bool {
2530 self.node_properties.might_match(property, op, value)
2531 }
2532
2533 #[must_use]
2535 pub fn edge_property_might_match(
2536 &self,
2537 property: &PropertyKey,
2538 op: CompareOp,
2539 value: &Value,
2540 ) -> bool {
2541 self.edge_properties.might_match(property, op, value)
2542 }
2543
2544 #[must_use]
2546 pub fn node_property_zone_map(&self, property: &PropertyKey) -> Option<ZoneMapEntry> {
2547 self.node_properties.zone_map(property)
2548 }
2549
2550 #[must_use]
2552 pub fn edge_property_zone_map(&self, property: &PropertyKey) -> Option<ZoneMapEntry> {
2553 self.edge_properties.zone_map(property)
2554 }
2555
2556 pub fn rebuild_zone_maps(&self) {
2558 self.node_properties.rebuild_zone_maps();
2559 self.edge_properties.rebuild_zone_maps();
2560 }
2561
2562 #[must_use]
2566 pub fn statistics(&self) -> Statistics {
2567 self.statistics.read().clone()
2568 }
2569
2570 pub fn ensure_statistics_fresh(&self) {
2575 if self.needs_stats_recompute.swap(false, Ordering::Relaxed) {
2576 self.compute_statistics();
2577 }
2578 }
2579
2580 #[cfg(not(feature = "tiered-storage"))]
2585 pub fn compute_statistics(&self) {
2586 let mut stats = Statistics::new();
2587
2588 stats.total_nodes = self.node_count() as u64;
2590 stats.total_edges = self.edge_count() as u64;
2591
2592 let id_to_label = self.id_to_label.read();
2594 let label_index = self.label_index.read();
2595
2596 for (label_id, label_name) in id_to_label.iter().enumerate() {
2597 let node_count = label_index.get(label_id).map_or(0, |set| set.len() as u64);
2598
2599 if node_count > 0 {
2600 let avg_out_degree = if stats.total_nodes > 0 {
2602 stats.total_edges as f64 / stats.total_nodes as f64
2603 } else {
2604 0.0
2605 };
2606
2607 let label_stats =
2608 LabelStatistics::new(node_count).with_degrees(avg_out_degree, avg_out_degree);
2609
2610 stats.update_label(label_name.as_ref(), label_stats);
2611 }
2612 }
2613
2614 let id_to_edge_type = self.id_to_edge_type.read();
2616 let edges = self.edges.read();
2617 let epoch = self.current_epoch();
2618
2619 let mut edge_type_counts: FxHashMap<u32, u64> = FxHashMap::default();
2620 for chain in edges.values() {
2621 if let Some(record) = chain.visible_at(epoch)
2622 && !record.is_deleted()
2623 {
2624 *edge_type_counts.entry(record.type_id).or_default() += 1;
2625 }
2626 }
2627
2628 for (type_id, count) in edge_type_counts {
2629 if let Some(type_name) = id_to_edge_type.get(type_id as usize) {
2630 let avg_degree = if stats.total_nodes > 0 {
2631 count as f64 / stats.total_nodes as f64
2632 } else {
2633 0.0
2634 };
2635
2636 let edge_stats = EdgeTypeStatistics::new(count, avg_degree, avg_degree);
2637 stats.update_edge_type(type_name.as_ref(), edge_stats);
2638 }
2639 }
2640
2641 *self.statistics.write() = stats;
2642 }
2643
2644 #[cfg(feature = "tiered-storage")]
2647 pub fn compute_statistics(&self) {
2648 let mut stats = Statistics::new();
2649
2650 stats.total_nodes = self.node_count() as u64;
2652 stats.total_edges = self.edge_count() as u64;
2653
2654 let id_to_label = self.id_to_label.read();
2656 let label_index = self.label_index.read();
2657
2658 for (label_id, label_name) in id_to_label.iter().enumerate() {
2659 let node_count = label_index.get(label_id).map_or(0, |set| set.len() as u64);
2660
2661 if node_count > 0 {
2662 let avg_out_degree = if stats.total_nodes > 0 {
2663 stats.total_edges as f64 / stats.total_nodes as f64
2664 } else {
2665 0.0
2666 };
2667
2668 let label_stats =
2669 LabelStatistics::new(node_count).with_degrees(avg_out_degree, avg_out_degree);
2670
2671 stats.update_label(label_name.as_ref(), label_stats);
2672 }
2673 }
2674
2675 let id_to_edge_type = self.id_to_edge_type.read();
2677 let versions = self.edge_versions.read();
2678 let epoch = self.current_epoch();
2679
2680 let mut edge_type_counts: FxHashMap<u32, u64> = FxHashMap::default();
2681 for index in versions.values() {
2682 if let Some(vref) = index.visible_at(epoch)
2683 && let Some(record) = self.read_edge_record(&vref)
2684 && !record.is_deleted()
2685 {
2686 *edge_type_counts.entry(record.type_id).or_default() += 1;
2687 }
2688 }
2689
2690 for (type_id, count) in edge_type_counts {
2691 if let Some(type_name) = id_to_edge_type.get(type_id as usize) {
2692 let avg_degree = if stats.total_nodes > 0 {
2693 count as f64 / stats.total_nodes as f64
2694 } else {
2695 0.0
2696 };
2697
2698 let edge_stats = EdgeTypeStatistics::new(count, avg_degree, avg_degree);
2699 stats.update_edge_type(type_name.as_ref(), edge_stats);
2700 }
2701 }
2702
2703 *self.statistics.write() = stats;
2704 }
2705
2706 #[must_use]
2708 pub fn estimate_label_cardinality(&self, label: &str) -> f64 {
2709 self.statistics.read().estimate_label_cardinality(label)
2710 }
2711
2712 #[must_use]
2714 pub fn estimate_avg_degree(&self, edge_type: &str, outgoing: bool) -> f64 {
2715 self.statistics
2716 .read()
2717 .estimate_avg_degree(edge_type, outgoing)
2718 }
2719
2720 fn get_or_create_label_id(&self, label: &str) -> u32 {
2723 {
2724 let label_to_id = self.label_to_id.read();
2725 if let Some(&id) = label_to_id.get(label) {
2726 return id;
2727 }
2728 }
2729
2730 let mut label_to_id = self.label_to_id.write();
2731 let mut id_to_label = self.id_to_label.write();
2732
2733 if let Some(&id) = label_to_id.get(label) {
2735 return id;
2736 }
2737
2738 let id = id_to_label.len() as u32;
2739
2740 let label: ArcStr = label.into();
2741 label_to_id.insert(label.clone(), id);
2742 id_to_label.push(label);
2743
2744 id
2745 }
2746
2747 fn get_or_create_edge_type_id(&self, edge_type: &str) -> u32 {
2748 {
2749 let type_to_id = self.edge_type_to_id.read();
2750 if let Some(&id) = type_to_id.get(edge_type) {
2751 return id;
2752 }
2753 }
2754
2755 let mut type_to_id = self.edge_type_to_id.write();
2756 let mut id_to_type = self.id_to_edge_type.write();
2757
2758 if let Some(&id) = type_to_id.get(edge_type) {
2760 return id;
2761 }
2762
2763 let id = id_to_type.len() as u32;
2764 let edge_type: ArcStr = edge_type.into();
2765 type_to_id.insert(edge_type.clone(), id);
2766 id_to_type.push(edge_type);
2767
2768 id
2769 }
2770
2771 #[cfg(not(feature = "tiered-storage"))]
2778 pub fn create_node_with_id(&self, id: NodeId, labels: &[&str]) {
2779 let epoch = self.current_epoch();
2780 let mut record = NodeRecord::new(id, epoch);
2781 record.set_label_count(labels.len() as u16);
2782
2783 let mut node_label_set = FxHashSet::default();
2785 for label in labels {
2786 let label_id = self.get_or_create_label_id(*label);
2787 node_label_set.insert(label_id);
2788
2789 let mut index = self.label_index.write();
2791 while index.len() <= label_id as usize {
2792 index.push(FxHashMap::default());
2793 }
2794 index[label_id as usize].insert(id, ());
2795 }
2796
2797 self.node_labels.write().insert(id, node_label_set);
2799
2800 let chain = VersionChain::with_initial(record, epoch, TxId::SYSTEM);
2802 self.nodes.write().insert(id, chain);
2803
2804 let id_val = id.as_u64();
2806 let _ = self
2807 .next_node_id
2808 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
2809 if id_val >= current {
2810 Some(id_val + 1)
2811 } else {
2812 None
2813 }
2814 });
2815 }
2816
2817 #[cfg(feature = "tiered-storage")]
2820 pub fn create_node_with_id(&self, id: NodeId, labels: &[&str]) {
2821 let epoch = self.current_epoch();
2822 let mut record = NodeRecord::new(id, epoch);
2823 record.set_label_count(labels.len() as u16);
2824
2825 let mut node_label_set = FxHashSet::default();
2827 for label in labels {
2828 let label_id = self.get_or_create_label_id(*label);
2829 node_label_set.insert(label_id);
2830
2831 let mut index = self.label_index.write();
2833 while index.len() <= label_id as usize {
2834 index.push(FxHashMap::default());
2835 }
2836 index[label_id as usize].insert(id, ());
2837 }
2838
2839 self.node_labels.write().insert(id, node_label_set);
2841
2842 let arena = self.arena_allocator.arena_or_create(epoch);
2844 let (offset, _stored) = arena.alloc_value_with_offset(record);
2845
2846 let hot_ref = HotVersionRef::new(epoch, offset, TxId::SYSTEM);
2848 let mut versions = self.node_versions.write();
2849 versions.insert(id, VersionIndex::with_initial(hot_ref));
2850
2851 let id_val = id.as_u64();
2853 let _ = self
2854 .next_node_id
2855 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
2856 if id_val >= current {
2857 Some(id_val + 1)
2858 } else {
2859 None
2860 }
2861 });
2862 }
2863
2864 #[cfg(not(feature = "tiered-storage"))]
2868 pub fn create_edge_with_id(&self, id: EdgeId, src: NodeId, dst: NodeId, edge_type: &str) {
2869 let epoch = self.current_epoch();
2870 let type_id = self.get_or_create_edge_type_id(edge_type);
2871
2872 let record = EdgeRecord::new(id, src, dst, type_id, epoch);
2873 let chain = VersionChain::with_initial(record, epoch, TxId::SYSTEM);
2874 self.edges.write().insert(id, chain);
2875
2876 self.forward_adj.add_edge(src, dst, id);
2878 if let Some(ref backward) = self.backward_adj {
2879 backward.add_edge(dst, src, id);
2880 }
2881
2882 let id_val = id.as_u64();
2884 let _ = self
2885 .next_edge_id
2886 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
2887 if id_val >= current {
2888 Some(id_val + 1)
2889 } else {
2890 None
2891 }
2892 });
2893 }
2894
2895 #[cfg(feature = "tiered-storage")]
2898 pub fn create_edge_with_id(&self, id: EdgeId, src: NodeId, dst: NodeId, edge_type: &str) {
2899 let epoch = self.current_epoch();
2900 let type_id = self.get_or_create_edge_type_id(edge_type);
2901
2902 let record = EdgeRecord::new(id, src, dst, type_id, epoch);
2903
2904 let arena = self.arena_allocator.arena_or_create(epoch);
2906 let (offset, _stored) = arena.alloc_value_with_offset(record);
2907
2908 let hot_ref = HotVersionRef::new(epoch, offset, TxId::SYSTEM);
2910 let mut versions = self.edge_versions.write();
2911 versions.insert(id, VersionIndex::with_initial(hot_ref));
2912
2913 self.forward_adj.add_edge(src, dst, id);
2915 if let Some(ref backward) = self.backward_adj {
2916 backward.add_edge(dst, src, id);
2917 }
2918
2919 let id_val = id.as_u64();
2921 let _ = self
2922 .next_edge_id
2923 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
2924 if id_val >= current {
2925 Some(id_val + 1)
2926 } else {
2927 None
2928 }
2929 });
2930 }
2931
2932 pub fn set_epoch(&self, epoch: EpochId) {
2934 self.current_epoch.store(epoch.as_u64(), Ordering::SeqCst);
2935 }
2936}
2937
2938impl Default for LpgStore {
2939 fn default() -> Self {
2940 Self::new()
2941 }
2942}
2943
2944#[cfg(test)]
2945mod tests {
2946 use super::*;
2947
2948 #[test]
2949 fn test_create_node() {
2950 let store = LpgStore::new();
2951
2952 let id = store.create_node(&["Person"]);
2953 assert!(id.is_valid());
2954
2955 let node = store.get_node(id).unwrap();
2956 assert!(node.has_label("Person"));
2957 assert!(!node.has_label("Animal"));
2958 }
2959
2960 #[test]
2961 fn test_create_node_with_props() {
2962 let store = LpgStore::new();
2963
2964 let id = store.create_node_with_props(
2965 &["Person"],
2966 [("name", Value::from("Alice")), ("age", Value::from(30i64))],
2967 );
2968
2969 let node = store.get_node(id).unwrap();
2970 assert_eq!(
2971 node.get_property("name").and_then(|v| v.as_str()),
2972 Some("Alice")
2973 );
2974 assert_eq!(
2975 node.get_property("age").and_then(|v| v.as_int64()),
2976 Some(30)
2977 );
2978 }
2979
2980 #[test]
2981 fn test_delete_node() {
2982 let store = LpgStore::new();
2983
2984 let id = store.create_node(&["Person"]);
2985 assert_eq!(store.node_count(), 1);
2986
2987 assert!(store.delete_node(id));
2988 assert_eq!(store.node_count(), 0);
2989 assert!(store.get_node(id).is_none());
2990
2991 assert!(!store.delete_node(id));
2993 }
2994
2995 #[test]
2996 fn test_create_edge() {
2997 let store = LpgStore::new();
2998
2999 let alice = store.create_node(&["Person"]);
3000 let bob = store.create_node(&["Person"]);
3001
3002 let edge_id = store.create_edge(alice, bob, "KNOWS");
3003 assert!(edge_id.is_valid());
3004
3005 let edge = store.get_edge(edge_id).unwrap();
3006 assert_eq!(edge.src, alice);
3007 assert_eq!(edge.dst, bob);
3008 assert_eq!(edge.edge_type.as_str(), "KNOWS");
3009 }
3010
3011 #[test]
3012 fn test_neighbors() {
3013 let store = LpgStore::new();
3014
3015 let a = store.create_node(&["Person"]);
3016 let b = store.create_node(&["Person"]);
3017 let c = store.create_node(&["Person"]);
3018
3019 store.create_edge(a, b, "KNOWS");
3020 store.create_edge(a, c, "KNOWS");
3021
3022 let outgoing: Vec<_> = store.neighbors(a, Direction::Outgoing).collect();
3023 assert_eq!(outgoing.len(), 2);
3024 assert!(outgoing.contains(&b));
3025 assert!(outgoing.contains(&c));
3026
3027 let incoming: Vec<_> = store.neighbors(b, Direction::Incoming).collect();
3028 assert_eq!(incoming.len(), 1);
3029 assert!(incoming.contains(&a));
3030 }
3031
3032 #[test]
3033 fn test_nodes_by_label() {
3034 let store = LpgStore::new();
3035
3036 let p1 = store.create_node(&["Person"]);
3037 let p2 = store.create_node(&["Person"]);
3038 let _a = store.create_node(&["Animal"]);
3039
3040 let persons = store.nodes_by_label("Person");
3041 assert_eq!(persons.len(), 2);
3042 assert!(persons.contains(&p1));
3043 assert!(persons.contains(&p2));
3044
3045 let animals = store.nodes_by_label("Animal");
3046 assert_eq!(animals.len(), 1);
3047 }
3048
3049 #[test]
3050 fn test_delete_edge() {
3051 let store = LpgStore::new();
3052
3053 let a = store.create_node(&["Person"]);
3054 let b = store.create_node(&["Person"]);
3055 let edge_id = store.create_edge(a, b, "KNOWS");
3056
3057 assert_eq!(store.edge_count(), 1);
3058
3059 assert!(store.delete_edge(edge_id));
3060 assert_eq!(store.edge_count(), 0);
3061 assert!(store.get_edge(edge_id).is_none());
3062 }
3063
3064 #[test]
3067 fn test_lpg_store_config() {
3068 let config = LpgStoreConfig {
3070 backward_edges: false,
3071 initial_node_capacity: 100,
3072 initial_edge_capacity: 200,
3073 };
3074 let store = LpgStore::with_config(config);
3075
3076 let a = store.create_node(&["Person"]);
3078 let b = store.create_node(&["Person"]);
3079 store.create_edge(a, b, "KNOWS");
3080
3081 let outgoing: Vec<_> = store.neighbors(a, Direction::Outgoing).collect();
3083 assert_eq!(outgoing.len(), 1);
3084
3085 let incoming: Vec<_> = store.neighbors(b, Direction::Incoming).collect();
3087 assert_eq!(incoming.len(), 0);
3088 }
3089
3090 #[test]
3091 fn test_epoch_management() {
3092 let store = LpgStore::new();
3093
3094 let epoch0 = store.current_epoch();
3095 assert_eq!(epoch0.as_u64(), 0);
3096
3097 let epoch1 = store.new_epoch();
3098 assert_eq!(epoch1.as_u64(), 1);
3099
3100 let current = store.current_epoch();
3101 assert_eq!(current.as_u64(), 1);
3102 }
3103
3104 #[test]
3105 fn test_node_properties() {
3106 let store = LpgStore::new();
3107 let id = store.create_node(&["Person"]);
3108
3109 store.set_node_property(id, "name", Value::from("Alice"));
3111 let name = store.get_node_property(id, &"name".into());
3112 assert!(matches!(name, Some(Value::String(s)) if s.as_str() == "Alice"));
3113
3114 store.set_node_property(id, "name", Value::from("Bob"));
3116 let name = store.get_node_property(id, &"name".into());
3117 assert!(matches!(name, Some(Value::String(s)) if s.as_str() == "Bob"));
3118
3119 let old = store.remove_node_property(id, "name");
3121 assert!(matches!(old, Some(Value::String(s)) if s.as_str() == "Bob"));
3122
3123 let name = store.get_node_property(id, &"name".into());
3125 assert!(name.is_none());
3126
3127 let none = store.remove_node_property(id, "nonexistent");
3129 assert!(none.is_none());
3130 }
3131
3132 #[test]
3133 fn test_edge_properties() {
3134 let store = LpgStore::new();
3135 let a = store.create_node(&["Person"]);
3136 let b = store.create_node(&["Person"]);
3137 let edge_id = store.create_edge(a, b, "KNOWS");
3138
3139 store.set_edge_property(edge_id, "since", Value::from(2020i64));
3141 let since = store.get_edge_property(edge_id, &"since".into());
3142 assert_eq!(since.and_then(|v| v.as_int64()), Some(2020));
3143
3144 let old = store.remove_edge_property(edge_id, "since");
3146 assert_eq!(old.and_then(|v| v.as_int64()), Some(2020));
3147
3148 let since = store.get_edge_property(edge_id, &"since".into());
3149 assert!(since.is_none());
3150 }
3151
3152 #[test]
3153 fn test_add_remove_label() {
3154 let store = LpgStore::new();
3155 let id = store.create_node(&["Person"]);
3156
3157 assert!(store.add_label(id, "Employee"));
3159
3160 let node = store.get_node(id).unwrap();
3161 assert!(node.has_label("Person"));
3162 assert!(node.has_label("Employee"));
3163
3164 assert!(!store.add_label(id, "Employee"));
3166
3167 assert!(store.remove_label(id, "Employee"));
3169
3170 let node = store.get_node(id).unwrap();
3171 assert!(node.has_label("Person"));
3172 assert!(!node.has_label("Employee"));
3173
3174 assert!(!store.remove_label(id, "Employee"));
3176 assert!(!store.remove_label(id, "NonExistent"));
3177 }
3178
3179 #[test]
3180 fn test_add_label_to_nonexistent_node() {
3181 let store = LpgStore::new();
3182 let fake_id = NodeId::new(999);
3183 assert!(!store.add_label(fake_id, "Label"));
3184 }
3185
3186 #[test]
3187 fn test_remove_label_from_nonexistent_node() {
3188 let store = LpgStore::new();
3189 let fake_id = NodeId::new(999);
3190 assert!(!store.remove_label(fake_id, "Label"));
3191 }
3192
3193 #[test]
3194 fn test_node_ids() {
3195 let store = LpgStore::new();
3196
3197 let n1 = store.create_node(&["Person"]);
3198 let n2 = store.create_node(&["Person"]);
3199 let n3 = store.create_node(&["Person"]);
3200
3201 let ids = store.node_ids();
3202 assert_eq!(ids.len(), 3);
3203 assert!(ids.contains(&n1));
3204 assert!(ids.contains(&n2));
3205 assert!(ids.contains(&n3));
3206
3207 store.delete_node(n2);
3209 let ids = store.node_ids();
3210 assert_eq!(ids.len(), 2);
3211 assert!(!ids.contains(&n2));
3212 }
3213
3214 #[test]
3215 fn test_delete_node_nonexistent() {
3216 let store = LpgStore::new();
3217 let fake_id = NodeId::new(999);
3218 assert!(!store.delete_node(fake_id));
3219 }
3220
3221 #[test]
3222 fn test_delete_edge_nonexistent() {
3223 let store = LpgStore::new();
3224 let fake_id = EdgeId::new(999);
3225 assert!(!store.delete_edge(fake_id));
3226 }
3227
3228 #[test]
3229 fn test_delete_edge_double() {
3230 let store = LpgStore::new();
3231 let a = store.create_node(&["Person"]);
3232 let b = store.create_node(&["Person"]);
3233 let edge_id = store.create_edge(a, b, "KNOWS");
3234
3235 assert!(store.delete_edge(edge_id));
3236 assert!(!store.delete_edge(edge_id)); }
3238
3239 #[test]
3240 fn test_create_edge_with_props() {
3241 let store = LpgStore::new();
3242 let a = store.create_node(&["Person"]);
3243 let b = store.create_node(&["Person"]);
3244
3245 let edge_id = store.create_edge_with_props(
3246 a,
3247 b,
3248 "KNOWS",
3249 [
3250 ("since", Value::from(2020i64)),
3251 ("weight", Value::from(1.0)),
3252 ],
3253 );
3254
3255 let edge = store.get_edge(edge_id).unwrap();
3256 assert_eq!(
3257 edge.get_property("since").and_then(|v| v.as_int64()),
3258 Some(2020)
3259 );
3260 assert_eq!(
3261 edge.get_property("weight").and_then(|v| v.as_float64()),
3262 Some(1.0)
3263 );
3264 }
3265
3266 #[test]
3267 fn test_delete_node_edges() {
3268 let store = LpgStore::new();
3269
3270 let a = store.create_node(&["Person"]);
3271 let b = store.create_node(&["Person"]);
3272 let c = store.create_node(&["Person"]);
3273
3274 store.create_edge(a, b, "KNOWS"); store.create_edge(c, a, "KNOWS"); assert_eq!(store.edge_count(), 2);
3278
3279 store.delete_node_edges(a);
3281
3282 assert_eq!(store.edge_count(), 0);
3283 }
3284
3285 #[test]
3286 fn test_neighbors_both_directions() {
3287 let store = LpgStore::new();
3288
3289 let a = store.create_node(&["Person"]);
3290 let b = store.create_node(&["Person"]);
3291 let c = store.create_node(&["Person"]);
3292
3293 store.create_edge(a, b, "KNOWS"); store.create_edge(c, a, "KNOWS"); let neighbors: Vec<_> = store.neighbors(a, Direction::Both).collect();
3298 assert_eq!(neighbors.len(), 2);
3299 assert!(neighbors.contains(&b)); assert!(neighbors.contains(&c)); }
3302
3303 #[test]
3304 fn test_edges_from() {
3305 let store = LpgStore::new();
3306
3307 let a = store.create_node(&["Person"]);
3308 let b = store.create_node(&["Person"]);
3309 let c = store.create_node(&["Person"]);
3310
3311 let e1 = store.create_edge(a, b, "KNOWS");
3312 let e2 = store.create_edge(a, c, "KNOWS");
3313
3314 let edges: Vec<_> = store.edges_from(a, Direction::Outgoing).collect();
3315 assert_eq!(edges.len(), 2);
3316 assert!(edges.iter().any(|(_, e)| *e == e1));
3317 assert!(edges.iter().any(|(_, e)| *e == e2));
3318
3319 let incoming: Vec<_> = store.edges_from(b, Direction::Incoming).collect();
3321 assert_eq!(incoming.len(), 1);
3322 assert_eq!(incoming[0].1, e1);
3323 }
3324
3325 #[test]
3326 fn test_edges_to() {
3327 let store = LpgStore::new();
3328
3329 let a = store.create_node(&["Person"]);
3330 let b = store.create_node(&["Person"]);
3331 let c = store.create_node(&["Person"]);
3332
3333 let e1 = store.create_edge(a, b, "KNOWS");
3334 let e2 = store.create_edge(c, b, "KNOWS");
3335
3336 let to_b = store.edges_to(b);
3338 assert_eq!(to_b.len(), 2);
3339 assert!(to_b.iter().any(|(src, e)| *src == a && *e == e1));
3340 assert!(to_b.iter().any(|(src, e)| *src == c && *e == e2));
3341 }
3342
3343 #[test]
3344 fn test_out_degree_in_degree() {
3345 let store = LpgStore::new();
3346
3347 let a = store.create_node(&["Person"]);
3348 let b = store.create_node(&["Person"]);
3349 let c = store.create_node(&["Person"]);
3350
3351 store.create_edge(a, b, "KNOWS");
3352 store.create_edge(a, c, "KNOWS");
3353 store.create_edge(c, b, "KNOWS");
3354
3355 assert_eq!(store.out_degree(a), 2);
3356 assert_eq!(store.out_degree(b), 0);
3357 assert_eq!(store.out_degree(c), 1);
3358
3359 assert_eq!(store.in_degree(a), 0);
3360 assert_eq!(store.in_degree(b), 2);
3361 assert_eq!(store.in_degree(c), 1);
3362 }
3363
3364 #[test]
3365 fn test_edge_type() {
3366 let store = LpgStore::new();
3367
3368 let a = store.create_node(&["Person"]);
3369 let b = store.create_node(&["Person"]);
3370 let edge_id = store.create_edge(a, b, "KNOWS");
3371
3372 let edge_type = store.edge_type(edge_id);
3373 assert_eq!(edge_type.as_deref(), Some("KNOWS"));
3374
3375 let fake_id = EdgeId::new(999);
3377 assert!(store.edge_type(fake_id).is_none());
3378 }
3379
3380 #[test]
3381 fn test_count_methods() {
3382 let store = LpgStore::new();
3383
3384 assert_eq!(store.label_count(), 0);
3385 assert_eq!(store.edge_type_count(), 0);
3386 assert_eq!(store.property_key_count(), 0);
3387
3388 let a = store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
3389 let b = store.create_node(&["Company"]);
3390 store.create_edge_with_props(a, b, "WORKS_AT", [("since", Value::from(2020i64))]);
3391
3392 assert_eq!(store.label_count(), 2); assert_eq!(store.edge_type_count(), 1); assert_eq!(store.property_key_count(), 2); }
3396
3397 #[test]
3398 fn test_all_nodes_and_edges() {
3399 let store = LpgStore::new();
3400
3401 let a = store.create_node(&["Person"]);
3402 let b = store.create_node(&["Person"]);
3403 store.create_edge(a, b, "KNOWS");
3404
3405 let nodes: Vec<_> = store.all_nodes().collect();
3406 assert_eq!(nodes.len(), 2);
3407
3408 let edges: Vec<_> = store.all_edges().collect();
3409 assert_eq!(edges.len(), 1);
3410 }
3411
3412 #[test]
3413 fn test_all_labels_and_edge_types() {
3414 let store = LpgStore::new();
3415
3416 store.create_node(&["Person"]);
3417 store.create_node(&["Company"]);
3418 let a = store.create_node(&["Animal"]);
3419 let b = store.create_node(&["Animal"]);
3420 store.create_edge(a, b, "EATS");
3421
3422 let labels = store.all_labels();
3423 assert_eq!(labels.len(), 3);
3424 assert!(labels.contains(&"Person".to_string()));
3425 assert!(labels.contains(&"Company".to_string()));
3426 assert!(labels.contains(&"Animal".to_string()));
3427
3428 let edge_types = store.all_edge_types();
3429 assert_eq!(edge_types.len(), 1);
3430 assert!(edge_types.contains(&"EATS".to_string()));
3431 }
3432
3433 #[test]
3434 fn test_all_property_keys() {
3435 let store = LpgStore::new();
3436
3437 let a = store.create_node_with_props(&["Person"], [("name", Value::from("Alice"))]);
3438 let b = store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
3439 store.create_edge_with_props(a, b, "KNOWS", [("since", Value::from(2020i64))]);
3440
3441 let keys = store.all_property_keys();
3442 assert!(keys.contains(&"name".to_string()));
3443 assert!(keys.contains(&"age".to_string()));
3444 assert!(keys.contains(&"since".to_string()));
3445 }
3446
3447 #[test]
3448 fn test_nodes_with_label() {
3449 let store = LpgStore::new();
3450
3451 store.create_node(&["Person"]);
3452 store.create_node(&["Person"]);
3453 store.create_node(&["Company"]);
3454
3455 let persons: Vec<_> = store.nodes_with_label("Person").collect();
3456 assert_eq!(persons.len(), 2);
3457
3458 let companies: Vec<_> = store.nodes_with_label("Company").collect();
3459 assert_eq!(companies.len(), 1);
3460
3461 let none: Vec<_> = store.nodes_with_label("NonExistent").collect();
3462 assert_eq!(none.len(), 0);
3463 }
3464
3465 #[test]
3466 fn test_edges_with_type() {
3467 let store = LpgStore::new();
3468
3469 let a = store.create_node(&["Person"]);
3470 let b = store.create_node(&["Person"]);
3471 let c = store.create_node(&["Company"]);
3472
3473 store.create_edge(a, b, "KNOWS");
3474 store.create_edge(a, c, "WORKS_AT");
3475
3476 let knows: Vec<_> = store.edges_with_type("KNOWS").collect();
3477 assert_eq!(knows.len(), 1);
3478
3479 let works_at: Vec<_> = store.edges_with_type("WORKS_AT").collect();
3480 assert_eq!(works_at.len(), 1);
3481
3482 let none: Vec<_> = store.edges_with_type("NonExistent").collect();
3483 assert_eq!(none.len(), 0);
3484 }
3485
3486 #[test]
3487 fn test_nodes_by_label_nonexistent() {
3488 let store = LpgStore::new();
3489 store.create_node(&["Person"]);
3490
3491 let empty = store.nodes_by_label("NonExistent");
3492 assert!(empty.is_empty());
3493 }
3494
3495 #[test]
3496 fn test_statistics() {
3497 let store = LpgStore::new();
3498
3499 let a = store.create_node(&["Person"]);
3500 let b = store.create_node(&["Person"]);
3501 let c = store.create_node(&["Company"]);
3502
3503 store.create_edge(a, b, "KNOWS");
3504 store.create_edge(a, c, "WORKS_AT");
3505
3506 store.compute_statistics();
3507 let stats = store.statistics();
3508
3509 assert_eq!(stats.total_nodes, 3);
3510 assert_eq!(stats.total_edges, 2);
3511
3512 let person_card = store.estimate_label_cardinality("Person");
3514 assert!(person_card > 0.0);
3515
3516 let avg_degree = store.estimate_avg_degree("KNOWS", true);
3517 assert!(avg_degree >= 0.0);
3518 }
3519
3520 #[test]
3521 fn test_zone_maps() {
3522 let store = LpgStore::new();
3523
3524 store.create_node_with_props(&["Person"], [("age", Value::from(25i64))]);
3525 store.create_node_with_props(&["Person"], [("age", Value::from(35i64))]);
3526
3527 let might_match =
3529 store.node_property_might_match(&"age".into(), CompareOp::Eq, &Value::from(30i64));
3530 assert!(might_match);
3532
3533 let zone = store.node_property_zone_map(&"age".into());
3534 assert!(zone.is_some());
3535
3536 let no_zone = store.node_property_zone_map(&"nonexistent".into());
3538 assert!(no_zone.is_none());
3539
3540 let a = store.create_node(&["A"]);
3542 let b = store.create_node(&["B"]);
3543 store.create_edge_with_props(a, b, "REL", [("weight", Value::from(1.0))]);
3544
3545 let edge_zone = store.edge_property_zone_map(&"weight".into());
3546 assert!(edge_zone.is_some());
3547 }
3548
3549 #[test]
3550 fn test_rebuild_zone_maps() {
3551 let store = LpgStore::new();
3552 store.create_node_with_props(&["Person"], [("age", Value::from(25i64))]);
3553
3554 store.rebuild_zone_maps();
3556 }
3557
3558 #[test]
3559 fn test_create_node_with_id() {
3560 let store = LpgStore::new();
3561
3562 let specific_id = NodeId::new(100);
3563 store.create_node_with_id(specific_id, &["Person", "Employee"]);
3564
3565 let node = store.get_node(specific_id).unwrap();
3566 assert!(node.has_label("Person"));
3567 assert!(node.has_label("Employee"));
3568
3569 let next = store.create_node(&["Other"]);
3571 assert!(next.as_u64() > 100);
3572 }
3573
3574 #[test]
3575 fn test_create_edge_with_id() {
3576 let store = LpgStore::new();
3577
3578 let a = store.create_node(&["A"]);
3579 let b = store.create_node(&["B"]);
3580
3581 let specific_id = EdgeId::new(500);
3582 store.create_edge_with_id(specific_id, a, b, "REL");
3583
3584 let edge = store.get_edge(specific_id).unwrap();
3585 assert_eq!(edge.src, a);
3586 assert_eq!(edge.dst, b);
3587 assert_eq!(edge.edge_type.as_str(), "REL");
3588
3589 let next = store.create_edge(a, b, "OTHER");
3591 assert!(next.as_u64() > 500);
3592 }
3593
3594 #[test]
3595 fn test_set_epoch() {
3596 let store = LpgStore::new();
3597
3598 assert_eq!(store.current_epoch().as_u64(), 0);
3599
3600 store.set_epoch(EpochId::new(42));
3601 assert_eq!(store.current_epoch().as_u64(), 42);
3602 }
3603
3604 #[test]
3605 fn test_get_node_nonexistent() {
3606 let store = LpgStore::new();
3607 let fake_id = NodeId::new(999);
3608 assert!(store.get_node(fake_id).is_none());
3609 }
3610
3611 #[test]
3612 fn test_get_edge_nonexistent() {
3613 let store = LpgStore::new();
3614 let fake_id = EdgeId::new(999);
3615 assert!(store.get_edge(fake_id).is_none());
3616 }
3617
3618 #[test]
3619 fn test_multiple_labels() {
3620 let store = LpgStore::new();
3621
3622 let id = store.create_node(&["Person", "Employee", "Manager"]);
3623 let node = store.get_node(id).unwrap();
3624
3625 assert!(node.has_label("Person"));
3626 assert!(node.has_label("Employee"));
3627 assert!(node.has_label("Manager"));
3628 assert!(!node.has_label("Other"));
3629 }
3630
3631 #[test]
3632 fn test_default_impl() {
3633 let store: LpgStore = Default::default();
3634 assert_eq!(store.node_count(), 0);
3635 assert_eq!(store.edge_count(), 0);
3636 }
3637
3638 #[test]
3639 fn test_edges_from_both_directions() {
3640 let store = LpgStore::new();
3641
3642 let a = store.create_node(&["A"]);
3643 let b = store.create_node(&["B"]);
3644 let c = store.create_node(&["C"]);
3645
3646 let e1 = store.create_edge(a, b, "R1"); let e2 = store.create_edge(c, a, "R2"); let edges: Vec<_> = store.edges_from(a, Direction::Both).collect();
3651 assert_eq!(edges.len(), 2);
3652 assert!(edges.iter().any(|(_, e)| *e == e1)); assert!(edges.iter().any(|(_, e)| *e == e2)); }
3655
3656 #[test]
3657 fn test_no_backward_adj_in_degree() {
3658 let config = LpgStoreConfig {
3659 backward_edges: false,
3660 initial_node_capacity: 10,
3661 initial_edge_capacity: 10,
3662 };
3663 let store = LpgStore::with_config(config);
3664
3665 let a = store.create_node(&["A"]);
3666 let b = store.create_node(&["B"]);
3667 store.create_edge(a, b, "R");
3668
3669 let degree = store.in_degree(b);
3671 assert_eq!(degree, 1);
3672 }
3673
3674 #[test]
3675 fn test_no_backward_adj_edges_to() {
3676 let config = LpgStoreConfig {
3677 backward_edges: false,
3678 initial_node_capacity: 10,
3679 initial_edge_capacity: 10,
3680 };
3681 let store = LpgStore::with_config(config);
3682
3683 let a = store.create_node(&["A"]);
3684 let b = store.create_node(&["B"]);
3685 let e = store.create_edge(a, b, "R");
3686
3687 let edges = store.edges_to(b);
3689 assert_eq!(edges.len(), 1);
3690 assert_eq!(edges[0].1, e);
3691 }
3692
3693 #[test]
3694 fn test_node_versioned_creation() {
3695 let store = LpgStore::new();
3696
3697 let epoch = store.new_epoch();
3698 let tx_id = TxId::new(1);
3699
3700 let id = store.create_node_versioned(&["Person"], epoch, tx_id);
3701 assert!(store.get_node(id).is_some());
3702 }
3703
3704 #[test]
3705 fn test_edge_versioned_creation() {
3706 let store = LpgStore::new();
3707
3708 let a = store.create_node(&["A"]);
3709 let b = store.create_node(&["B"]);
3710
3711 let epoch = store.new_epoch();
3712 let tx_id = TxId::new(1);
3713
3714 let edge_id = store.create_edge_versioned(a, b, "REL", epoch, tx_id);
3715 assert!(store.get_edge(edge_id).is_some());
3716 }
3717
3718 #[test]
3719 fn test_node_with_props_versioned() {
3720 let store = LpgStore::new();
3721
3722 let epoch = store.new_epoch();
3723 let tx_id = TxId::new(1);
3724
3725 let id = store.create_node_with_props_versioned(
3726 &["Person"],
3727 [("name", Value::from("Alice"))],
3728 epoch,
3729 tx_id,
3730 );
3731
3732 let node = store.get_node(id).unwrap();
3733 assert_eq!(
3734 node.get_property("name").and_then(|v| v.as_str()),
3735 Some("Alice")
3736 );
3737 }
3738
3739 #[test]
3740 fn test_discard_uncommitted_versions() {
3741 let store = LpgStore::new();
3742
3743 let epoch = store.new_epoch();
3744 let tx_id = TxId::new(42);
3745
3746 let node_id = store.create_node_versioned(&["Person"], epoch, tx_id);
3748 assert!(store.get_node(node_id).is_some());
3749
3750 store.discard_uncommitted_versions(tx_id);
3752
3753 assert!(store.get_node(node_id).is_none());
3755 }
3756
3757 #[test]
3760 fn test_property_index_create_and_lookup() {
3761 let store = LpgStore::new();
3762
3763 let alice = store.create_node(&["Person"]);
3765 let bob = store.create_node(&["Person"]);
3766 let charlie = store.create_node(&["Person"]);
3767
3768 store.set_node_property(alice, "city", Value::from("NYC"));
3769 store.set_node_property(bob, "city", Value::from("NYC"));
3770 store.set_node_property(charlie, "city", Value::from("LA"));
3771
3772 let nyc_people = store.find_nodes_by_property("city", &Value::from("NYC"));
3774 assert_eq!(nyc_people.len(), 2);
3775
3776 store.create_property_index("city");
3778 assert!(store.has_property_index("city"));
3779
3780 let nyc_people = store.find_nodes_by_property("city", &Value::from("NYC"));
3782 assert_eq!(nyc_people.len(), 2);
3783 assert!(nyc_people.contains(&alice));
3784 assert!(nyc_people.contains(&bob));
3785
3786 let la_people = store.find_nodes_by_property("city", &Value::from("LA"));
3787 assert_eq!(la_people.len(), 1);
3788 assert!(la_people.contains(&charlie));
3789 }
3790
3791 #[test]
3792 fn test_property_index_maintained_on_update() {
3793 let store = LpgStore::new();
3794
3795 store.create_property_index("status");
3797
3798 let node = store.create_node(&["Task"]);
3799 store.set_node_property(node, "status", Value::from("pending"));
3800
3801 let pending = store.find_nodes_by_property("status", &Value::from("pending"));
3803 assert_eq!(pending.len(), 1);
3804 assert!(pending.contains(&node));
3805
3806 store.set_node_property(node, "status", Value::from("done"));
3808
3809 let pending = store.find_nodes_by_property("status", &Value::from("pending"));
3811 assert!(pending.is_empty());
3812
3813 let done = store.find_nodes_by_property("status", &Value::from("done"));
3815 assert_eq!(done.len(), 1);
3816 assert!(done.contains(&node));
3817 }
3818
3819 #[test]
3820 fn test_property_index_maintained_on_remove() {
3821 let store = LpgStore::new();
3822
3823 store.create_property_index("tag");
3824
3825 let node = store.create_node(&["Item"]);
3826 store.set_node_property(node, "tag", Value::from("important"));
3827
3828 let found = store.find_nodes_by_property("tag", &Value::from("important"));
3830 assert_eq!(found.len(), 1);
3831
3832 store.remove_node_property(node, "tag");
3834
3835 let found = store.find_nodes_by_property("tag", &Value::from("important"));
3837 assert!(found.is_empty());
3838 }
3839
3840 #[test]
3841 fn test_property_index_drop() {
3842 let store = LpgStore::new();
3843
3844 store.create_property_index("key");
3845 assert!(store.has_property_index("key"));
3846
3847 assert!(store.drop_property_index("key"));
3848 assert!(!store.has_property_index("key"));
3849
3850 assert!(!store.drop_property_index("key"));
3852 }
3853
3854 #[test]
3855 fn test_property_index_multiple_values() {
3856 let store = LpgStore::new();
3857
3858 store.create_property_index("age");
3859
3860 let n1 = store.create_node(&["Person"]);
3862 let n2 = store.create_node(&["Person"]);
3863 let n3 = store.create_node(&["Person"]);
3864 let n4 = store.create_node(&["Person"]);
3865
3866 store.set_node_property(n1, "age", Value::from(25i64));
3867 store.set_node_property(n2, "age", Value::from(25i64));
3868 store.set_node_property(n3, "age", Value::from(30i64));
3869 store.set_node_property(n4, "age", Value::from(25i64));
3870
3871 let age_25 = store.find_nodes_by_property("age", &Value::from(25i64));
3872 assert_eq!(age_25.len(), 3);
3873
3874 let age_30 = store.find_nodes_by_property("age", &Value::from(30i64));
3875 assert_eq!(age_30.len(), 1);
3876
3877 let age_40 = store.find_nodes_by_property("age", &Value::from(40i64));
3878 assert!(age_40.is_empty());
3879 }
3880
3881 #[test]
3882 fn test_property_index_builds_from_existing_data() {
3883 let store = LpgStore::new();
3884
3885 let n1 = store.create_node(&["Person"]);
3887 let n2 = store.create_node(&["Person"]);
3888 store.set_node_property(n1, "email", Value::from("alice@example.com"));
3889 store.set_node_property(n2, "email", Value::from("bob@example.com"));
3890
3891 store.create_property_index("email");
3893
3894 let alice = store.find_nodes_by_property("email", &Value::from("alice@example.com"));
3896 assert_eq!(alice.len(), 1);
3897 assert!(alice.contains(&n1));
3898
3899 let bob = store.find_nodes_by_property("email", &Value::from("bob@example.com"));
3900 assert_eq!(bob.len(), 1);
3901 assert!(bob.contains(&n2));
3902 }
3903
3904 #[test]
3905 fn test_get_node_property_batch() {
3906 let store = LpgStore::new();
3907
3908 let n1 = store.create_node(&["Person"]);
3909 let n2 = store.create_node(&["Person"]);
3910 let n3 = store.create_node(&["Person"]);
3911
3912 store.set_node_property(n1, "age", Value::from(25i64));
3913 store.set_node_property(n2, "age", Value::from(30i64));
3914 let age_key = PropertyKey::new("age");
3917 let values = store.get_node_property_batch(&[n1, n2, n3], &age_key);
3918
3919 assert_eq!(values.len(), 3);
3920 assert_eq!(values[0], Some(Value::from(25i64)));
3921 assert_eq!(values[1], Some(Value::from(30i64)));
3922 assert_eq!(values[2], None);
3923 }
3924
3925 #[test]
3926 fn test_get_node_property_batch_empty() {
3927 let store = LpgStore::new();
3928 let key = PropertyKey::new("any");
3929
3930 let values = store.get_node_property_batch(&[], &key);
3931 assert!(values.is_empty());
3932 }
3933
3934 #[test]
3935 fn test_get_nodes_properties_batch() {
3936 let store = LpgStore::new();
3937
3938 let n1 = store.create_node(&["Person"]);
3939 let n2 = store.create_node(&["Person"]);
3940 let n3 = store.create_node(&["Person"]);
3941
3942 store.set_node_property(n1, "name", Value::from("Alice"));
3943 store.set_node_property(n1, "age", Value::from(25i64));
3944 store.set_node_property(n2, "name", Value::from("Bob"));
3945 let all_props = store.get_nodes_properties_batch(&[n1, n2, n3]);
3948
3949 assert_eq!(all_props.len(), 3);
3950 assert_eq!(all_props[0].len(), 2); assert_eq!(all_props[1].len(), 1); assert_eq!(all_props[2].len(), 0); assert_eq!(
3955 all_props[0].get(&PropertyKey::new("name")),
3956 Some(&Value::from("Alice"))
3957 );
3958 assert_eq!(
3959 all_props[1].get(&PropertyKey::new("name")),
3960 Some(&Value::from("Bob"))
3961 );
3962 }
3963
3964 #[test]
3965 fn test_get_nodes_properties_batch_empty() {
3966 let store = LpgStore::new();
3967
3968 let all_props = store.get_nodes_properties_batch(&[]);
3969 assert!(all_props.is_empty());
3970 }
3971
3972 #[test]
3973 fn test_get_nodes_properties_selective_batch() {
3974 let store = LpgStore::new();
3975
3976 let n1 = store.create_node(&["Person"]);
3977 let n2 = store.create_node(&["Person"]);
3978
3979 store.set_node_property(n1, "name", Value::from("Alice"));
3981 store.set_node_property(n1, "age", Value::from(25i64));
3982 store.set_node_property(n1, "email", Value::from("alice@example.com"));
3983 store.set_node_property(n2, "name", Value::from("Bob"));
3984 store.set_node_property(n2, "age", Value::from(30i64));
3985 store.set_node_property(n2, "city", Value::from("NYC"));
3986
3987 let keys = vec![PropertyKey::new("name"), PropertyKey::new("age")];
3989 let props = store.get_nodes_properties_selective_batch(&[n1, n2], &keys);
3990
3991 assert_eq!(props.len(), 2);
3992
3993 assert_eq!(props[0].len(), 2);
3995 assert_eq!(
3996 props[0].get(&PropertyKey::new("name")),
3997 Some(&Value::from("Alice"))
3998 );
3999 assert_eq!(
4000 props[0].get(&PropertyKey::new("age")),
4001 Some(&Value::from(25i64))
4002 );
4003 assert_eq!(props[0].get(&PropertyKey::new("email")), None);
4004
4005 assert_eq!(props[1].len(), 2);
4007 assert_eq!(
4008 props[1].get(&PropertyKey::new("name")),
4009 Some(&Value::from("Bob"))
4010 );
4011 assert_eq!(
4012 props[1].get(&PropertyKey::new("age")),
4013 Some(&Value::from(30i64))
4014 );
4015 assert_eq!(props[1].get(&PropertyKey::new("city")), None);
4016 }
4017
4018 #[test]
4019 fn test_get_nodes_properties_selective_batch_empty_keys() {
4020 let store = LpgStore::new();
4021
4022 let n1 = store.create_node(&["Person"]);
4023 store.set_node_property(n1, "name", Value::from("Alice"));
4024
4025 let props = store.get_nodes_properties_selective_batch(&[n1], &[]);
4027
4028 assert_eq!(props.len(), 1);
4029 assert!(props[0].is_empty()); }
4031
4032 #[test]
4033 fn test_get_nodes_properties_selective_batch_missing_keys() {
4034 let store = LpgStore::new();
4035
4036 let n1 = store.create_node(&["Person"]);
4037 store.set_node_property(n1, "name", Value::from("Alice"));
4038
4039 let keys = vec![PropertyKey::new("nonexistent"), PropertyKey::new("name")];
4041 let props = store.get_nodes_properties_selective_batch(&[n1], &keys);
4042
4043 assert_eq!(props.len(), 1);
4044 assert_eq!(props[0].len(), 1); assert_eq!(
4046 props[0].get(&PropertyKey::new("name")),
4047 Some(&Value::from("Alice"))
4048 );
4049 }
4050
4051 #[test]
4054 fn test_find_nodes_in_range_inclusive() {
4055 let store = LpgStore::new();
4056
4057 let n1 = store.create_node_with_props(&["Person"], [("age", Value::from(20i64))]);
4058 let n2 = store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
4059 let n3 = store.create_node_with_props(&["Person"], [("age", Value::from(40i64))]);
4060 let _n4 = store.create_node_with_props(&["Person"], [("age", Value::from(50i64))]);
4061
4062 let result = store.find_nodes_in_range(
4064 "age",
4065 Some(&Value::from(20i64)),
4066 Some(&Value::from(40i64)),
4067 true,
4068 true,
4069 );
4070 assert_eq!(result.len(), 3);
4071 assert!(result.contains(&n1));
4072 assert!(result.contains(&n2));
4073 assert!(result.contains(&n3));
4074 }
4075
4076 #[test]
4077 fn test_find_nodes_in_range_exclusive() {
4078 let store = LpgStore::new();
4079
4080 store.create_node_with_props(&["Person"], [("age", Value::from(20i64))]);
4081 let n2 = store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
4082 store.create_node_with_props(&["Person"], [("age", Value::from(40i64))]);
4083
4084 let result = store.find_nodes_in_range(
4086 "age",
4087 Some(&Value::from(20i64)),
4088 Some(&Value::from(40i64)),
4089 false,
4090 false,
4091 );
4092 assert_eq!(result.len(), 1);
4093 assert!(result.contains(&n2));
4094 }
4095
4096 #[test]
4097 fn test_find_nodes_in_range_open_ended() {
4098 let store = LpgStore::new();
4099
4100 store.create_node_with_props(&["Person"], [("age", Value::from(20i64))]);
4101 store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
4102 let n3 = store.create_node_with_props(&["Person"], [("age", Value::from(40i64))]);
4103 let n4 = store.create_node_with_props(&["Person"], [("age", Value::from(50i64))]);
4104
4105 let result = store.find_nodes_in_range("age", Some(&Value::from(35i64)), None, true, true);
4107 assert_eq!(result.len(), 2);
4108 assert!(result.contains(&n3));
4109 assert!(result.contains(&n4));
4110
4111 let result = store.find_nodes_in_range("age", None, Some(&Value::from(25i64)), true, true);
4113 assert_eq!(result.len(), 1);
4114 }
4115
4116 #[test]
4117 fn test_find_nodes_in_range_empty_result() {
4118 let store = LpgStore::new();
4119
4120 store.create_node_with_props(&["Person"], [("age", Value::from(20i64))]);
4121
4122 let result = store.find_nodes_in_range(
4124 "age",
4125 Some(&Value::from(100i64)),
4126 Some(&Value::from(200i64)),
4127 true,
4128 true,
4129 );
4130 assert!(result.is_empty());
4131 }
4132
4133 #[test]
4134 fn test_find_nodes_in_range_nonexistent_property() {
4135 let store = LpgStore::new();
4136
4137 store.create_node_with_props(&["Person"], [("age", Value::from(20i64))]);
4138
4139 let result = store.find_nodes_in_range(
4140 "weight",
4141 Some(&Value::from(50i64)),
4142 Some(&Value::from(100i64)),
4143 true,
4144 true,
4145 );
4146 assert!(result.is_empty());
4147 }
4148
4149 #[test]
4152 fn test_find_nodes_by_properties_multiple_conditions() {
4153 let store = LpgStore::new();
4154
4155 let alice = store.create_node_with_props(
4156 &["Person"],
4157 [("name", Value::from("Alice")), ("city", Value::from("NYC"))],
4158 );
4159 store.create_node_with_props(
4160 &["Person"],
4161 [("name", Value::from("Bob")), ("city", Value::from("NYC"))],
4162 );
4163 store.create_node_with_props(
4164 &["Person"],
4165 [("name", Value::from("Alice")), ("city", Value::from("LA"))],
4166 );
4167
4168 let result = store.find_nodes_by_properties(&[
4170 ("name", Value::from("Alice")),
4171 ("city", Value::from("NYC")),
4172 ]);
4173 assert_eq!(result.len(), 1);
4174 assert!(result.contains(&alice));
4175 }
4176
4177 #[test]
4178 fn test_find_nodes_by_properties_empty_conditions() {
4179 let store = LpgStore::new();
4180
4181 store.create_node(&["Person"]);
4182 store.create_node(&["Person"]);
4183
4184 let result = store.find_nodes_by_properties(&[]);
4186 assert_eq!(result.len(), 2);
4187 }
4188
4189 #[test]
4190 fn test_find_nodes_by_properties_no_match() {
4191 let store = LpgStore::new();
4192
4193 store.create_node_with_props(&["Person"], [("name", Value::from("Alice"))]);
4194
4195 let result = store.find_nodes_by_properties(&[("name", Value::from("Nobody"))]);
4196 assert!(result.is_empty());
4197 }
4198
4199 #[test]
4200 fn test_find_nodes_by_properties_with_index() {
4201 let store = LpgStore::new();
4202
4203 store.create_property_index("name");
4205
4206 let alice = store.create_node_with_props(
4207 &["Person"],
4208 [("name", Value::from("Alice")), ("age", Value::from(30i64))],
4209 );
4210 store.create_node_with_props(
4211 &["Person"],
4212 [("name", Value::from("Bob")), ("age", Value::from(30i64))],
4213 );
4214
4215 let result = store.find_nodes_by_properties(&[
4217 ("name", Value::from("Alice")),
4218 ("age", Value::from(30i64)),
4219 ]);
4220 assert_eq!(result.len(), 1);
4221 assert!(result.contains(&alice));
4222 }
4223
4224 #[test]
4227 fn test_estimate_label_cardinality() {
4228 let store = LpgStore::new();
4229
4230 store.create_node(&["Person"]);
4231 store.create_node(&["Person"]);
4232 store.create_node(&["Animal"]);
4233
4234 store.ensure_statistics_fresh();
4235
4236 let person_est = store.estimate_label_cardinality("Person");
4237 let animal_est = store.estimate_label_cardinality("Animal");
4238 let unknown_est = store.estimate_label_cardinality("Unknown");
4239
4240 assert!(
4241 person_est >= 1.0,
4242 "Person should have cardinality >= 1, got {person_est}"
4243 );
4244 assert!(
4245 animal_est >= 1.0,
4246 "Animal should have cardinality >= 1, got {animal_est}"
4247 );
4248 assert!(unknown_est >= 0.0);
4250 }
4251
4252 #[test]
4253 fn test_estimate_avg_degree() {
4254 let store = LpgStore::new();
4255
4256 let a = store.create_node(&["Person"]);
4257 let b = store.create_node(&["Person"]);
4258 let c = store.create_node(&["Person"]);
4259
4260 store.create_edge(a, b, "KNOWS");
4261 store.create_edge(a, c, "KNOWS");
4262 store.create_edge(b, c, "KNOWS");
4263
4264 store.ensure_statistics_fresh();
4265
4266 let outgoing = store.estimate_avg_degree("KNOWS", true);
4267 let incoming = store.estimate_avg_degree("KNOWS", false);
4268
4269 assert!(
4270 outgoing > 0.0,
4271 "Outgoing degree should be > 0, got {outgoing}"
4272 );
4273 assert!(
4274 incoming > 0.0,
4275 "Incoming degree should be > 0, got {incoming}"
4276 );
4277 }
4278
4279 #[test]
4282 fn test_delete_node_does_not_cascade() {
4283 let store = LpgStore::new();
4284
4285 let a = store.create_node(&["A"]);
4286 let b = store.create_node(&["B"]);
4287 let e = store.create_edge(a, b, "KNOWS");
4288
4289 assert!(store.delete_node(a));
4290 assert!(store.get_node(a).is_none());
4291
4292 assert!(
4294 store.get_edge(e).is_some(),
4295 "Edge should survive non-detach node delete"
4296 );
4297 }
4298
4299 #[test]
4300 fn test_delete_already_deleted_node() {
4301 let store = LpgStore::new();
4302 let a = store.create_node(&["A"]);
4303
4304 assert!(store.delete_node(a));
4305 assert!(!store.delete_node(a));
4307 }
4308
4309 #[test]
4310 fn test_delete_nonexistent_node() {
4311 let store = LpgStore::new();
4312 assert!(!store.delete_node(NodeId::new(999)));
4313 }
4314}