1use super::property::CompareOp;
13use super::{Edge, EdgeRecord, Node, NodeRecord, PropertyStorage};
14use crate::graph::Direction;
15use crate::index::adjacency::ChunkedAdjacency;
16use crate::index::zone_map::ZoneMapEntry;
17use crate::statistics::{EdgeTypeStatistics, LabelStatistics, Statistics};
18use arcstr::ArcStr;
19use dashmap::DashMap;
20#[cfg(not(feature = "tiered-storage"))]
21use grafeo_common::mvcc::VersionChain;
22use grafeo_common::types::{EdgeId, EpochId, HashableValue, NodeId, PropertyKey, TxId, Value};
23use grafeo_common::utils::hash::{FxHashMap, FxHashSet};
24use parking_lot::RwLock;
25use std::cmp::Ordering as CmpOrdering;
26#[cfg(any(feature = "tiered-storage", feature = "vector-index"))]
27use std::sync::Arc;
28use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
29
30#[cfg(feature = "vector-index")]
31use crate::index::vector::HnswIndex;
32
33fn compare_values_for_range(a: &Value, b: &Value) -> Option<CmpOrdering> {
35 match (a, b) {
36 (Value::Int64(a), Value::Int64(b)) => Some(a.cmp(b)),
37 (Value::Float64(a), Value::Float64(b)) => a.partial_cmp(b),
38 (Value::String(a), Value::String(b)) => Some(a.cmp(b)),
39 (Value::Bool(a), Value::Bool(b)) => Some(a.cmp(b)),
40 _ => None,
41 }
42}
43
44fn value_in_range(
46 value: &Value,
47 min: Option<&Value>,
48 max: Option<&Value>,
49 min_inclusive: bool,
50 max_inclusive: bool,
51) -> bool {
52 if let Some(min_val) = min {
54 match compare_values_for_range(value, min_val) {
55 Some(CmpOrdering::Less) => return false,
56 Some(CmpOrdering::Equal) if !min_inclusive => return false,
57 None => return false, _ => {}
59 }
60 }
61
62 if let Some(max_val) = max {
64 match compare_values_for_range(value, max_val) {
65 Some(CmpOrdering::Greater) => return false,
66 Some(CmpOrdering::Equal) if !max_inclusive => return false,
67 None => return false,
68 _ => {}
69 }
70 }
71
72 true
73}
74
75#[cfg(feature = "tiered-storage")]
77use crate::storage::EpochStore;
78#[cfg(feature = "tiered-storage")]
79use grafeo_common::memory::arena::ArenaAllocator;
80#[cfg(feature = "tiered-storage")]
81use grafeo_common::mvcc::{ColdVersionRef, HotVersionRef, VersionIndex, VersionRef};
82
83#[derive(Debug, Clone)]
89pub struct LpgStoreConfig {
90 pub backward_edges: bool,
93 pub initial_node_capacity: usize,
95 pub initial_edge_capacity: usize,
97}
98
99impl Default for LpgStoreConfig {
100 fn default() -> Self {
101 Self {
102 backward_edges: true,
103 initial_node_capacity: 1024,
104 initial_edge_capacity: 4096,
105 }
106 }
107}
108
109pub struct LpgStore {
168 #[allow(dead_code)]
170 config: LpgStoreConfig,
171
172 #[cfg(not(feature = "tiered-storage"))]
176 nodes: RwLock<FxHashMap<NodeId, VersionChain<NodeRecord>>>,
177
178 #[cfg(not(feature = "tiered-storage"))]
182 edges: RwLock<FxHashMap<EdgeId, VersionChain<EdgeRecord>>>,
183
184 #[cfg(feature = "tiered-storage")]
198 arena_allocator: Arc<ArenaAllocator>,
199
200 #[cfg(feature = "tiered-storage")]
204 node_versions: RwLock<FxHashMap<NodeId, VersionIndex>>,
205
206 #[cfg(feature = "tiered-storage")]
210 edge_versions: RwLock<FxHashMap<EdgeId, VersionIndex>>,
211
212 #[cfg(feature = "tiered-storage")]
215 epoch_store: Arc<EpochStore>,
216
217 node_properties: PropertyStorage<NodeId>,
219
220 edge_properties: PropertyStorage<EdgeId>,
222
223 label_to_id: RwLock<FxHashMap<ArcStr, u32>>,
226
227 id_to_label: RwLock<Vec<ArcStr>>,
230
231 edge_type_to_id: RwLock<FxHashMap<ArcStr, u32>>,
234
235 id_to_edge_type: RwLock<Vec<ArcStr>>,
238
239 forward_adj: ChunkedAdjacency,
241
242 backward_adj: Option<ChunkedAdjacency>,
245
246 label_index: RwLock<Vec<FxHashMap<NodeId, ()>>>,
249
250 node_labels: RwLock<FxHashMap<NodeId, FxHashSet<u32>>>,
254
255 property_indexes: RwLock<FxHashMap<PropertyKey, DashMap<HashableValue, FxHashSet<NodeId>>>>,
261
262 #[cfg(feature = "vector-index")]
267 vector_indexes: RwLock<FxHashMap<String, Arc<HnswIndex>>>,
268
269 next_node_id: AtomicU64,
271
272 next_edge_id: AtomicU64,
274
275 current_epoch: AtomicU64,
277
278 statistics: RwLock<Statistics>,
281
282 needs_stats_recompute: AtomicBool,
284}
285
286impl LpgStore {
287 #[must_use]
289 pub fn new() -> Self {
290 Self::with_config(LpgStoreConfig::default())
291 }
292
293 #[must_use]
295 pub fn with_config(config: LpgStoreConfig) -> Self {
296 let backward_adj = if config.backward_edges {
297 Some(ChunkedAdjacency::new())
298 } else {
299 None
300 };
301
302 Self {
303 #[cfg(not(feature = "tiered-storage"))]
304 nodes: RwLock::new(FxHashMap::default()),
305 #[cfg(not(feature = "tiered-storage"))]
306 edges: RwLock::new(FxHashMap::default()),
307 #[cfg(feature = "tiered-storage")]
308 arena_allocator: Arc::new(ArenaAllocator::new()),
309 #[cfg(feature = "tiered-storage")]
310 node_versions: RwLock::new(FxHashMap::default()),
311 #[cfg(feature = "tiered-storage")]
312 edge_versions: RwLock::new(FxHashMap::default()),
313 #[cfg(feature = "tiered-storage")]
314 epoch_store: Arc::new(EpochStore::new()),
315 node_properties: PropertyStorage::new(),
316 edge_properties: PropertyStorage::new(),
317 label_to_id: RwLock::new(FxHashMap::default()),
318 id_to_label: RwLock::new(Vec::new()),
319 edge_type_to_id: RwLock::new(FxHashMap::default()),
320 id_to_edge_type: RwLock::new(Vec::new()),
321 forward_adj: ChunkedAdjacency::new(),
322 backward_adj,
323 label_index: RwLock::new(Vec::new()),
324 node_labels: RwLock::new(FxHashMap::default()),
325 property_indexes: RwLock::new(FxHashMap::default()),
326 #[cfg(feature = "vector-index")]
327 vector_indexes: RwLock::new(FxHashMap::default()),
328 next_node_id: AtomicU64::new(0),
329 next_edge_id: AtomicU64::new(0),
330 current_epoch: AtomicU64::new(0),
331 statistics: RwLock::new(Statistics::new()),
332 needs_stats_recompute: AtomicBool::new(true),
333 config,
334 }
335 }
336
337 #[must_use]
339 pub fn current_epoch(&self) -> EpochId {
340 EpochId::new(self.current_epoch.load(Ordering::Acquire))
341 }
342
343 pub fn new_epoch(&self) -> EpochId {
345 let id = self.current_epoch.fetch_add(1, Ordering::AcqRel) + 1;
346 EpochId::new(id)
347 }
348
349 pub fn create_node(&self, labels: &[&str]) -> NodeId {
355 self.needs_stats_recompute.store(true, Ordering::Relaxed);
356 self.create_node_versioned(labels, self.current_epoch(), TxId::SYSTEM)
357 }
358
359 #[cfg(not(feature = "tiered-storage"))]
361 pub fn create_node_versioned(&self, labels: &[&str], epoch: EpochId, tx_id: TxId) -> NodeId {
362 let id = NodeId::new(self.next_node_id.fetch_add(1, Ordering::Relaxed));
363
364 let mut record = NodeRecord::new(id, epoch);
365 record.set_label_count(labels.len() as u16);
366
367 let mut node_label_set = FxHashSet::default();
369 for label in labels {
370 let label_id = self.get_or_create_label_id(*label);
371 node_label_set.insert(label_id);
372
373 let mut index = self.label_index.write();
375 while index.len() <= label_id as usize {
376 index.push(FxHashMap::default());
377 }
378 index[label_id as usize].insert(id, ());
379 }
380
381 self.node_labels.write().insert(id, node_label_set);
383
384 let chain = VersionChain::with_initial(record, epoch, tx_id);
386 self.nodes.write().insert(id, chain);
387 id
388 }
389
390 #[cfg(feature = "tiered-storage")]
393 pub fn create_node_versioned(&self, labels: &[&str], epoch: EpochId, tx_id: TxId) -> NodeId {
394 let id = NodeId::new(self.next_node_id.fetch_add(1, Ordering::Relaxed));
395
396 let mut record = NodeRecord::new(id, epoch);
397 record.set_label_count(labels.len() as u16);
398
399 let mut node_label_set = FxHashSet::default();
401 for label in labels {
402 let label_id = self.get_or_create_label_id(*label);
403 node_label_set.insert(label_id);
404
405 let mut index = self.label_index.write();
407 while index.len() <= label_id as usize {
408 index.push(FxHashMap::default());
409 }
410 index[label_id as usize].insert(id, ());
411 }
412
413 self.node_labels.write().insert(id, node_label_set);
415
416 let arena = self.arena_allocator.arena_or_create(epoch);
418 let (offset, _stored) = arena.alloc_value_with_offset(record);
419
420 let hot_ref = HotVersionRef::new(epoch, offset, tx_id);
422
423 let mut versions = self.node_versions.write();
425 if let Some(index) = versions.get_mut(&id) {
426 index.add_hot(hot_ref);
427 } else {
428 versions.insert(id, VersionIndex::with_initial(hot_ref));
429 }
430
431 id
432 }
433
434 pub fn create_node_with_props(
436 &self,
437 labels: &[&str],
438 properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
439 ) -> NodeId {
440 self.create_node_with_props_versioned(
441 labels,
442 properties,
443 self.current_epoch(),
444 TxId::SYSTEM,
445 )
446 }
447
448 #[cfg(not(feature = "tiered-storage"))]
450 pub fn create_node_with_props_versioned(
451 &self,
452 labels: &[&str],
453 properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
454 epoch: EpochId,
455 tx_id: TxId,
456 ) -> NodeId {
457 let id = self.create_node_versioned(labels, epoch, tx_id);
458
459 for (key, value) in properties {
460 let prop_key: PropertyKey = key.into();
461 let prop_value: Value = value.into();
462 self.update_property_index_on_set(id, &prop_key, &prop_value);
464 self.node_properties.set(id, prop_key, prop_value);
465 }
466
467 let count = self.node_properties.get_all(id).len() as u16;
469 if let Some(chain) = self.nodes.write().get_mut(&id) {
470 if let Some(record) = chain.latest_mut() {
471 record.props_count = count;
472 }
473 }
474
475 id
476 }
477
478 #[cfg(feature = "tiered-storage")]
481 pub fn create_node_with_props_versioned(
482 &self,
483 labels: &[&str],
484 properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
485 epoch: EpochId,
486 tx_id: TxId,
487 ) -> NodeId {
488 let id = self.create_node_versioned(labels, epoch, tx_id);
489
490 for (key, value) in properties {
491 let prop_key: PropertyKey = key.into();
492 let prop_value: Value = value.into();
493 self.update_property_index_on_set(id, &prop_key, &prop_value);
495 self.node_properties.set(id, prop_key, prop_value);
496 }
497
498 id
502 }
503
504 #[must_use]
506 pub fn get_node(&self, id: NodeId) -> Option<Node> {
507 self.get_node_at_epoch(id, self.current_epoch())
508 }
509
510 #[must_use]
512 #[cfg(not(feature = "tiered-storage"))]
513 pub fn get_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> Option<Node> {
514 let nodes = self.nodes.read();
515 let chain = nodes.get(&id)?;
516 let record = chain.visible_at(epoch)?;
517
518 if record.is_deleted() {
519 return None;
520 }
521
522 let mut node = Node::new(id);
523
524 let id_to_label = self.id_to_label.read();
526 let node_labels = self.node_labels.read();
527 if let Some(label_ids) = node_labels.get(&id) {
528 for &label_id in label_ids {
529 if let Some(label) = id_to_label.get(label_id as usize) {
530 node.labels.push(label.clone());
531 }
532 }
533 }
534
535 node.properties = self.node_properties.get_all(id).into_iter().collect();
537
538 Some(node)
539 }
540
541 #[must_use]
544 #[cfg(feature = "tiered-storage")]
545 pub fn get_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> Option<Node> {
546 let versions = self.node_versions.read();
547 let index = versions.get(&id)?;
548 let version_ref = index.visible_at(epoch)?;
549
550 let record = self.read_node_record(&version_ref)?;
552
553 if record.is_deleted() {
554 return None;
555 }
556
557 let mut node = Node::new(id);
558
559 let id_to_label = self.id_to_label.read();
561 let node_labels = self.node_labels.read();
562 if let Some(label_ids) = node_labels.get(&id) {
563 for &label_id in label_ids {
564 if let Some(label) = id_to_label.get(label_id as usize) {
565 node.labels.push(label.clone());
566 }
567 }
568 }
569
570 node.properties = self.node_properties.get_all(id).into_iter().collect();
572
573 Some(node)
574 }
575
576 #[must_use]
578 #[cfg(not(feature = "tiered-storage"))]
579 pub fn get_node_versioned(&self, id: NodeId, epoch: EpochId, tx_id: TxId) -> Option<Node> {
580 let nodes = self.nodes.read();
581 let chain = nodes.get(&id)?;
582 let record = chain.visible_to(epoch, tx_id)?;
583
584 if record.is_deleted() {
585 return None;
586 }
587
588 let mut node = Node::new(id);
589
590 let id_to_label = self.id_to_label.read();
592 let node_labels = self.node_labels.read();
593 if let Some(label_ids) = node_labels.get(&id) {
594 for &label_id in label_ids {
595 if let Some(label) = id_to_label.get(label_id as usize) {
596 node.labels.push(label.clone());
597 }
598 }
599 }
600
601 node.properties = self.node_properties.get_all(id).into_iter().collect();
603
604 Some(node)
605 }
606
607 #[must_use]
610 #[cfg(feature = "tiered-storage")]
611 pub fn get_node_versioned(&self, id: NodeId, epoch: EpochId, tx_id: TxId) -> Option<Node> {
612 let versions = self.node_versions.read();
613 let index = versions.get(&id)?;
614 let version_ref = index.visible_to(epoch, tx_id)?;
615
616 let record = self.read_node_record(&version_ref)?;
618
619 if record.is_deleted() {
620 return None;
621 }
622
623 let mut node = Node::new(id);
624
625 let id_to_label = self.id_to_label.read();
627 let node_labels = self.node_labels.read();
628 if let Some(label_ids) = node_labels.get(&id) {
629 for &label_id in label_ids {
630 if let Some(label) = id_to_label.get(label_id as usize) {
631 node.labels.push(label.clone());
632 }
633 }
634 }
635
636 node.properties = self.node_properties.get_all(id).into_iter().collect();
638
639 Some(node)
640 }
641
642 #[cfg(feature = "tiered-storage")]
644 #[allow(unsafe_code)]
645 fn read_node_record(&self, version_ref: &VersionRef) -> Option<NodeRecord> {
646 match version_ref {
647 VersionRef::Hot(hot_ref) => {
648 let arena = self.arena_allocator.arena(hot_ref.epoch);
649 let record: &NodeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
651 Some(*record)
652 }
653 VersionRef::Cold(cold_ref) => {
654 self.epoch_store
656 .get_node(cold_ref.epoch, cold_ref.block_offset, cold_ref.length)
657 }
658 }
659 }
660
661 pub fn delete_node(&self, id: NodeId) -> bool {
663 self.needs_stats_recompute.store(true, Ordering::Relaxed);
664 self.delete_node_at_epoch(id, self.current_epoch())
665 }
666
667 #[cfg(not(feature = "tiered-storage"))]
669 pub fn delete_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> bool {
670 let mut nodes = self.nodes.write();
671 if let Some(chain) = nodes.get_mut(&id) {
672 if let Some(record) = chain.visible_at(epoch) {
674 if record.is_deleted() {
675 return false;
676 }
677 } else {
678 return false;
680 }
681
682 chain.mark_deleted(epoch);
684
685 let mut index = self.label_index.write();
687 let mut node_labels = self.node_labels.write();
688 if let Some(label_ids) = node_labels.remove(&id) {
689 for label_id in label_ids {
690 if let Some(set) = index.get_mut(label_id as usize) {
691 set.remove(&id);
692 }
693 }
694 }
695
696 drop(nodes); drop(index);
699 drop(node_labels);
700 self.node_properties.remove_all(id);
701
702 true
705 } else {
706 false
707 }
708 }
709
710 #[cfg(feature = "tiered-storage")]
713 pub fn delete_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> bool {
714 let mut versions = self.node_versions.write();
715 if let Some(index) = versions.get_mut(&id) {
716 if let Some(version_ref) = index.visible_at(epoch) {
718 if let Some(record) = self.read_node_record(&version_ref) {
719 if record.is_deleted() {
720 return false;
721 }
722 } else {
723 return false;
724 }
725 } else {
726 return false;
727 }
728
729 index.mark_deleted(epoch);
731
732 let mut label_index = self.label_index.write();
734 let mut node_labels = self.node_labels.write();
735 if let Some(label_ids) = node_labels.remove(&id) {
736 for label_id in label_ids {
737 if let Some(set) = label_index.get_mut(label_id as usize) {
738 set.remove(&id);
739 }
740 }
741 }
742
743 drop(versions);
745 drop(label_index);
746 drop(node_labels);
747 self.node_properties.remove_all(id);
748
749 true
750 } else {
751 false
752 }
753 }
754
755 #[cfg(not(feature = "tiered-storage"))]
760 pub fn delete_node_edges(&self, node_id: NodeId) {
761 let outgoing: Vec<EdgeId> = self
763 .forward_adj
764 .edges_from(node_id)
765 .into_iter()
766 .map(|(_, edge_id)| edge_id)
767 .collect();
768
769 let incoming: Vec<EdgeId> = if let Some(ref backward) = self.backward_adj {
771 backward
772 .edges_from(node_id)
773 .into_iter()
774 .map(|(_, edge_id)| edge_id)
775 .collect()
776 } else {
777 let epoch = self.current_epoch();
779 self.edges
780 .read()
781 .iter()
782 .filter_map(|(id, chain)| {
783 chain.visible_at(epoch).and_then(|r| {
784 if !r.is_deleted() && r.dst == node_id {
785 Some(*id)
786 } else {
787 None
788 }
789 })
790 })
791 .collect()
792 };
793
794 for edge_id in outgoing.into_iter().chain(incoming) {
796 self.delete_edge(edge_id);
797 }
798 }
799
800 #[cfg(feature = "tiered-storage")]
803 pub fn delete_node_edges(&self, node_id: NodeId) {
804 let outgoing: Vec<EdgeId> = self
806 .forward_adj
807 .edges_from(node_id)
808 .into_iter()
809 .map(|(_, edge_id)| edge_id)
810 .collect();
811
812 let incoming: Vec<EdgeId> = if let Some(ref backward) = self.backward_adj {
814 backward
815 .edges_from(node_id)
816 .into_iter()
817 .map(|(_, edge_id)| edge_id)
818 .collect()
819 } else {
820 let epoch = self.current_epoch();
822 let versions = self.edge_versions.read();
823 versions
824 .iter()
825 .filter_map(|(id, index)| {
826 index.visible_at(epoch).and_then(|vref| {
827 self.read_edge_record(&vref).and_then(|r| {
828 if !r.is_deleted() && r.dst == node_id {
829 Some(*id)
830 } else {
831 None
832 }
833 })
834 })
835 })
836 .collect()
837 };
838
839 for edge_id in outgoing.into_iter().chain(incoming) {
841 self.delete_edge(edge_id);
842 }
843 }
844
845 #[cfg(not(feature = "tiered-storage"))]
847 pub fn set_node_property(&self, id: NodeId, key: &str, value: Value) {
848 let prop_key: PropertyKey = key.into();
849
850 self.update_property_index_on_set(id, &prop_key, &value);
852
853 self.node_properties.set(id, prop_key, value);
854
855 let count = self.node_properties.get_all(id).len() as u16;
857 if let Some(chain) = self.nodes.write().get_mut(&id) {
858 if let Some(record) = chain.latest_mut() {
859 record.props_count = count;
860 }
861 }
862 }
863
864 #[cfg(feature = "tiered-storage")]
867 pub fn set_node_property(&self, id: NodeId, key: &str, value: Value) {
868 let prop_key: PropertyKey = key.into();
869
870 self.update_property_index_on_set(id, &prop_key, &value);
872
873 self.node_properties.set(id, prop_key, value);
874 }
878
879 pub fn set_edge_property(&self, id: EdgeId, key: &str, value: Value) {
881 self.edge_properties.set(id, key.into(), value);
882 }
883
884 #[cfg(not(feature = "tiered-storage"))]
888 pub fn remove_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
889 let prop_key: PropertyKey = key.into();
890
891 self.update_property_index_on_remove(id, &prop_key);
893
894 let result = self.node_properties.remove(id, &prop_key);
895
896 let count = self.node_properties.get_all(id).len() as u16;
898 if let Some(chain) = self.nodes.write().get_mut(&id) {
899 if let Some(record) = chain.latest_mut() {
900 record.props_count = count;
901 }
902 }
903
904 result
905 }
906
907 #[cfg(feature = "tiered-storage")]
910 pub fn remove_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
911 let prop_key: PropertyKey = key.into();
912
913 self.update_property_index_on_remove(id, &prop_key);
915
916 self.node_properties.remove(id, &prop_key)
917 }
919
920 pub fn remove_edge_property(&self, id: EdgeId, key: &str) -> Option<Value> {
924 self.edge_properties.remove(id, &key.into())
925 }
926
927 #[must_use]
942 pub fn get_node_property(&self, id: NodeId, key: &PropertyKey) -> Option<Value> {
943 self.node_properties.get(id, key)
944 }
945
946 #[must_use]
950 pub fn get_edge_property(&self, id: EdgeId, key: &PropertyKey) -> Option<Value> {
951 self.edge_properties.get(id, key)
952 }
953
954 #[must_use]
977 pub fn get_node_property_batch(&self, ids: &[NodeId], key: &PropertyKey) -> Vec<Option<Value>> {
978 self.node_properties.get_batch(ids, key)
979 }
980
981 #[must_use]
986 pub fn get_nodes_properties_batch(&self, ids: &[NodeId]) -> Vec<FxHashMap<PropertyKey, Value>> {
987 self.node_properties.get_all_batch(ids)
988 }
989
990 #[must_use]
1018 pub fn get_nodes_properties_selective_batch(
1019 &self,
1020 ids: &[NodeId],
1021 keys: &[PropertyKey],
1022 ) -> Vec<FxHashMap<PropertyKey, Value>> {
1023 self.node_properties.get_selective_batch(ids, keys)
1024 }
1025
1026 #[must_use]
1030 pub fn get_edges_properties_selective_batch(
1031 &self,
1032 ids: &[EdgeId],
1033 keys: &[PropertyKey],
1034 ) -> Vec<FxHashMap<PropertyKey, Value>> {
1035 self.edge_properties.get_selective_batch(ids, keys)
1036 }
1037
1038 #[must_use]
1074 pub fn find_nodes_in_range(
1075 &self,
1076 property: &str,
1077 min: Option<&Value>,
1078 max: Option<&Value>,
1079 min_inclusive: bool,
1080 max_inclusive: bool,
1081 ) -> Vec<NodeId> {
1082 let key = PropertyKey::new(property);
1083
1084 if !self
1086 .node_properties
1087 .might_match_range(&key, min, max, min_inclusive, max_inclusive)
1088 {
1089 return Vec::new();
1090 }
1091
1092 self.node_ids()
1094 .into_iter()
1095 .filter(|&node_id| {
1096 self.node_properties
1097 .get(node_id, &key)
1098 .is_some_and(|v| value_in_range(&v, min, max, min_inclusive, max_inclusive))
1099 })
1100 .collect()
1101 }
1102
1103 #[must_use]
1128 pub fn find_nodes_by_properties(&self, conditions: &[(&str, Value)]) -> Vec<NodeId> {
1129 if conditions.is_empty() {
1130 return self.node_ids();
1131 }
1132
1133 let mut best_start: Option<(usize, Vec<NodeId>)> = None;
1136 let indexes = self.property_indexes.read();
1137
1138 for (i, (prop, value)) in conditions.iter().enumerate() {
1139 let key = PropertyKey::new(*prop);
1140 let hv = HashableValue::new(value.clone());
1141
1142 if let Some(index) = indexes.get(&key) {
1143 let matches: Vec<NodeId> = index
1144 .get(&hv)
1145 .map(|nodes| nodes.iter().copied().collect())
1146 .unwrap_or_default();
1147
1148 if matches.is_empty() {
1150 return Vec::new();
1151 }
1152
1153 if best_start
1155 .as_ref()
1156 .is_none_or(|(_, best)| matches.len() < best.len())
1157 {
1158 best_start = Some((i, matches));
1159 }
1160 }
1161 }
1162 drop(indexes);
1163
1164 let (start_idx, mut candidates) = best_start.unwrap_or_else(|| {
1166 let (prop, value) = &conditions[0];
1168 (0, self.find_nodes_by_property(prop, value))
1169 });
1170
1171 for (i, (prop, value)) in conditions.iter().enumerate() {
1173 if i == start_idx {
1174 continue;
1175 }
1176
1177 let key = PropertyKey::new(*prop);
1178 candidates.retain(|&node_id| {
1179 self.node_properties
1180 .get(node_id, &key)
1181 .is_some_and(|v| v == *value)
1182 });
1183
1184 if candidates.is_empty() {
1186 return Vec::new();
1187 }
1188 }
1189
1190 candidates
1191 }
1192
1193 pub fn create_property_index(&self, property: &str) {
1221 let key = PropertyKey::new(property);
1222
1223 let mut indexes = self.property_indexes.write();
1224 if indexes.contains_key(&key) {
1225 return; }
1227
1228 let index: DashMap<HashableValue, FxHashSet<NodeId>> = DashMap::new();
1230
1231 for node_id in self.node_ids() {
1233 if let Some(value) = self.node_properties.get(node_id, &key) {
1234 let hv = HashableValue::new(value);
1235 index
1236 .entry(hv)
1237 .or_insert_with(FxHashSet::default)
1238 .insert(node_id);
1239 }
1240 }
1241
1242 indexes.insert(key, index);
1243 }
1244
1245 pub fn drop_property_index(&self, property: &str) -> bool {
1249 let key = PropertyKey::new(property);
1250 self.property_indexes.write().remove(&key).is_some()
1251 }
1252
1253 #[must_use]
1255 pub fn has_property_index(&self, property: &str) -> bool {
1256 let key = PropertyKey::new(property);
1257 self.property_indexes.read().contains_key(&key)
1258 }
1259
1260 #[cfg(feature = "vector-index")]
1262 pub fn add_vector_index(&self, label: &str, property: &str, index: Arc<HnswIndex>) {
1263 let key = format!("{label}:{property}");
1264 self.vector_indexes.write().insert(key, index);
1265 }
1266
1267 #[cfg(feature = "vector-index")]
1269 #[must_use]
1270 pub fn get_vector_index(&self, label: &str, property: &str) -> Option<Arc<HnswIndex>> {
1271 let key = format!("{label}:{property}");
1272 self.vector_indexes.read().get(&key).cloned()
1273 }
1274
1275 #[must_use]
1298 pub fn find_nodes_by_property(&self, property: &str, value: &Value) -> Vec<NodeId> {
1299 let key = PropertyKey::new(property);
1300 let hv = HashableValue::new(value.clone());
1301
1302 let indexes = self.property_indexes.read();
1304 if let Some(index) = indexes.get(&key) {
1305 if let Some(nodes) = index.get(&hv) {
1306 return nodes.iter().copied().collect();
1307 }
1308 return Vec::new();
1309 }
1310 drop(indexes);
1311
1312 self.node_ids()
1314 .into_iter()
1315 .filter(|&node_id| {
1316 self.node_properties
1317 .get(node_id, &key)
1318 .is_some_and(|v| v == *value)
1319 })
1320 .collect()
1321 }
1322
1323 fn update_property_index_on_set(&self, node_id: NodeId, key: &PropertyKey, new_value: &Value) {
1325 let indexes = self.property_indexes.read();
1326 if let Some(index) = indexes.get(key) {
1327 if let Some(old_value) = self.node_properties.get(node_id, key) {
1329 let old_hv = HashableValue::new(old_value);
1330 if let Some(mut nodes) = index.get_mut(&old_hv) {
1331 nodes.remove(&node_id);
1332 if nodes.is_empty() {
1333 drop(nodes);
1334 index.remove(&old_hv);
1335 }
1336 }
1337 }
1338
1339 let new_hv = HashableValue::new(new_value.clone());
1341 index
1342 .entry(new_hv)
1343 .or_insert_with(FxHashSet::default)
1344 .insert(node_id);
1345 }
1346 }
1347
1348 fn update_property_index_on_remove(&self, node_id: NodeId, key: &PropertyKey) {
1350 let indexes = self.property_indexes.read();
1351 if let Some(index) = indexes.get(key) {
1352 if let Some(old_value) = self.node_properties.get(node_id, key) {
1354 let old_hv = HashableValue::new(old_value);
1355 if let Some(mut nodes) = index.get_mut(&old_hv) {
1356 nodes.remove(&node_id);
1357 if nodes.is_empty() {
1358 drop(nodes);
1359 index.remove(&old_hv);
1360 }
1361 }
1362 }
1363 }
1364 }
1365
1366 #[cfg(not(feature = "tiered-storage"))]
1371 pub fn add_label(&self, node_id: NodeId, label: &str) -> bool {
1372 let epoch = self.current_epoch();
1373
1374 let nodes = self.nodes.read();
1376 if let Some(chain) = nodes.get(&node_id) {
1377 if chain.visible_at(epoch).map_or(true, |r| r.is_deleted()) {
1378 return false;
1379 }
1380 } else {
1381 return false;
1382 }
1383 drop(nodes);
1384
1385 let label_id = self.get_or_create_label_id(label);
1387
1388 let mut node_labels = self.node_labels.write();
1390 let label_set = node_labels
1391 .entry(node_id)
1392 .or_insert_with(FxHashSet::default);
1393
1394 if label_set.contains(&label_id) {
1395 return false; }
1397
1398 label_set.insert(label_id);
1399 drop(node_labels);
1400
1401 let mut index = self.label_index.write();
1403 if (label_id as usize) >= index.len() {
1404 index.resize(label_id as usize + 1, FxHashMap::default());
1405 }
1406 index[label_id as usize].insert(node_id, ());
1407
1408 if let Some(chain) = self.nodes.write().get_mut(&node_id) {
1410 if let Some(record) = chain.latest_mut() {
1411 let count = self.node_labels.read().get(&node_id).map_or(0, |s| s.len());
1412 record.set_label_count(count as u16);
1413 }
1414 }
1415
1416 true
1417 }
1418
1419 #[cfg(feature = "tiered-storage")]
1422 pub fn add_label(&self, node_id: NodeId, label: &str) -> bool {
1423 let epoch = self.current_epoch();
1424
1425 let versions = self.node_versions.read();
1427 if let Some(index) = versions.get(&node_id) {
1428 if let Some(vref) = index.visible_at(epoch) {
1429 if let Some(record) = self.read_node_record(&vref) {
1430 if record.is_deleted() {
1431 return false;
1432 }
1433 } else {
1434 return false;
1435 }
1436 } else {
1437 return false;
1438 }
1439 } else {
1440 return false;
1441 }
1442 drop(versions);
1443
1444 let label_id = self.get_or_create_label_id(label);
1446
1447 let mut node_labels = self.node_labels.write();
1449 let label_set = node_labels
1450 .entry(node_id)
1451 .or_insert_with(FxHashSet::default);
1452
1453 if label_set.contains(&label_id) {
1454 return false; }
1456
1457 label_set.insert(label_id);
1458 drop(node_labels);
1459
1460 let mut index = self.label_index.write();
1462 if (label_id as usize) >= index.len() {
1463 index.resize(label_id as usize + 1, FxHashMap::default());
1464 }
1465 index[label_id as usize].insert(node_id, ());
1466
1467 true
1471 }
1472
1473 #[cfg(not(feature = "tiered-storage"))]
1478 pub fn remove_label(&self, node_id: NodeId, label: &str) -> bool {
1479 let epoch = self.current_epoch();
1480
1481 let nodes = self.nodes.read();
1483 if let Some(chain) = nodes.get(&node_id) {
1484 if chain.visible_at(epoch).map_or(true, |r| r.is_deleted()) {
1485 return false;
1486 }
1487 } else {
1488 return false;
1489 }
1490 drop(nodes);
1491
1492 let label_id = {
1494 let label_ids = self.label_to_id.read();
1495 match label_ids.get(label) {
1496 Some(&id) => id,
1497 None => return false, }
1499 };
1500
1501 let mut node_labels = self.node_labels.write();
1503 if let Some(label_set) = node_labels.get_mut(&node_id) {
1504 if !label_set.remove(&label_id) {
1505 return false; }
1507 } else {
1508 return false;
1509 }
1510 drop(node_labels);
1511
1512 let mut index = self.label_index.write();
1514 if (label_id as usize) < index.len() {
1515 index[label_id as usize].remove(&node_id);
1516 }
1517
1518 if let Some(chain) = self.nodes.write().get_mut(&node_id) {
1520 if let Some(record) = chain.latest_mut() {
1521 let count = self.node_labels.read().get(&node_id).map_or(0, |s| s.len());
1522 record.set_label_count(count as u16);
1523 }
1524 }
1525
1526 true
1527 }
1528
1529 #[cfg(feature = "tiered-storage")]
1532 pub fn remove_label(&self, node_id: NodeId, label: &str) -> bool {
1533 let epoch = self.current_epoch();
1534
1535 let versions = self.node_versions.read();
1537 if let Some(index) = versions.get(&node_id) {
1538 if let Some(vref) = index.visible_at(epoch) {
1539 if let Some(record) = self.read_node_record(&vref) {
1540 if record.is_deleted() {
1541 return false;
1542 }
1543 } else {
1544 return false;
1545 }
1546 } else {
1547 return false;
1548 }
1549 } else {
1550 return false;
1551 }
1552 drop(versions);
1553
1554 let label_id = {
1556 let label_ids = self.label_to_id.read();
1557 match label_ids.get(label) {
1558 Some(&id) => id,
1559 None => return false, }
1561 };
1562
1563 let mut node_labels = self.node_labels.write();
1565 if let Some(label_set) = node_labels.get_mut(&node_id) {
1566 if !label_set.remove(&label_id) {
1567 return false; }
1569 } else {
1570 return false;
1571 }
1572 drop(node_labels);
1573
1574 let mut index = self.label_index.write();
1576 if (label_id as usize) < index.len() {
1577 index[label_id as usize].remove(&node_id);
1578 }
1579
1580 true
1583 }
1584
1585 #[must_use]
1587 #[cfg(not(feature = "tiered-storage"))]
1588 pub fn node_count(&self) -> usize {
1589 let epoch = self.current_epoch();
1590 self.nodes
1591 .read()
1592 .values()
1593 .filter_map(|chain| chain.visible_at(epoch))
1594 .filter(|r| !r.is_deleted())
1595 .count()
1596 }
1597
1598 #[must_use]
1601 #[cfg(feature = "tiered-storage")]
1602 pub fn node_count(&self) -> usize {
1603 let epoch = self.current_epoch();
1604 let versions = self.node_versions.read();
1605 versions
1606 .iter()
1607 .filter(|(_, index)| {
1608 index.visible_at(epoch).map_or(false, |vref| {
1609 self.read_node_record(&vref)
1610 .map_or(false, |r| !r.is_deleted())
1611 })
1612 })
1613 .count()
1614 }
1615
1616 #[must_use]
1622 #[cfg(not(feature = "tiered-storage"))]
1623 pub fn node_ids(&self) -> Vec<NodeId> {
1624 let epoch = self.current_epoch();
1625 let mut ids: Vec<NodeId> = self
1626 .nodes
1627 .read()
1628 .iter()
1629 .filter_map(|(id, chain)| {
1630 chain
1631 .visible_at(epoch)
1632 .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
1633 })
1634 .collect();
1635 ids.sort_unstable();
1636 ids
1637 }
1638
1639 #[must_use]
1642 #[cfg(feature = "tiered-storage")]
1643 pub fn node_ids(&self) -> Vec<NodeId> {
1644 let epoch = self.current_epoch();
1645 let versions = self.node_versions.read();
1646 let mut ids: Vec<NodeId> = versions
1647 .iter()
1648 .filter_map(|(id, index)| {
1649 index.visible_at(epoch).and_then(|vref| {
1650 self.read_node_record(&vref)
1651 .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
1652 })
1653 })
1654 .collect();
1655 ids.sort_unstable();
1656 ids
1657 }
1658
1659 pub fn create_edge(&self, src: NodeId, dst: NodeId, edge_type: &str) -> EdgeId {
1663 self.needs_stats_recompute.store(true, Ordering::Relaxed);
1664 self.create_edge_versioned(src, dst, edge_type, self.current_epoch(), TxId::SYSTEM)
1665 }
1666
1667 #[cfg(not(feature = "tiered-storage"))]
1669 pub fn create_edge_versioned(
1670 &self,
1671 src: NodeId,
1672 dst: NodeId,
1673 edge_type: &str,
1674 epoch: EpochId,
1675 tx_id: TxId,
1676 ) -> EdgeId {
1677 let id = EdgeId::new(self.next_edge_id.fetch_add(1, Ordering::Relaxed));
1678 let type_id = self.get_or_create_edge_type_id(edge_type);
1679
1680 let record = EdgeRecord::new(id, src, dst, type_id, epoch);
1681 let chain = VersionChain::with_initial(record, epoch, tx_id);
1682 self.edges.write().insert(id, chain);
1683
1684 self.forward_adj.add_edge(src, dst, id);
1686 if let Some(ref backward) = self.backward_adj {
1687 backward.add_edge(dst, src, id);
1688 }
1689
1690 id
1691 }
1692
1693 #[cfg(feature = "tiered-storage")]
1696 pub fn create_edge_versioned(
1697 &self,
1698 src: NodeId,
1699 dst: NodeId,
1700 edge_type: &str,
1701 epoch: EpochId,
1702 tx_id: TxId,
1703 ) -> EdgeId {
1704 let id = EdgeId::new(self.next_edge_id.fetch_add(1, Ordering::Relaxed));
1705 let type_id = self.get_or_create_edge_type_id(edge_type);
1706
1707 let record = EdgeRecord::new(id, src, dst, type_id, epoch);
1708
1709 let arena = self.arena_allocator.arena_or_create(epoch);
1711 let (offset, _stored) = arena.alloc_value_with_offset(record);
1712
1713 let hot_ref = HotVersionRef::new(epoch, offset, tx_id);
1715
1716 let mut versions = self.edge_versions.write();
1718 if let Some(index) = versions.get_mut(&id) {
1719 index.add_hot(hot_ref);
1720 } else {
1721 versions.insert(id, VersionIndex::with_initial(hot_ref));
1722 }
1723
1724 self.forward_adj.add_edge(src, dst, id);
1726 if let Some(ref backward) = self.backward_adj {
1727 backward.add_edge(dst, src, id);
1728 }
1729
1730 id
1731 }
1732
1733 pub fn create_edge_with_props(
1735 &self,
1736 src: NodeId,
1737 dst: NodeId,
1738 edge_type: &str,
1739 properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
1740 ) -> EdgeId {
1741 let id = self.create_edge(src, dst, edge_type);
1742
1743 for (key, value) in properties {
1744 self.edge_properties.set(id, key.into(), value.into());
1745 }
1746
1747 id
1748 }
1749
1750 #[must_use]
1752 pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
1753 self.get_edge_at_epoch(id, self.current_epoch())
1754 }
1755
1756 #[must_use]
1758 #[cfg(not(feature = "tiered-storage"))]
1759 pub fn get_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> Option<Edge> {
1760 let edges = self.edges.read();
1761 let chain = edges.get(&id)?;
1762 let record = chain.visible_at(epoch)?;
1763
1764 if record.is_deleted() {
1765 return None;
1766 }
1767
1768 let edge_type = {
1769 let id_to_type = self.id_to_edge_type.read();
1770 id_to_type.get(record.type_id as usize)?.clone()
1771 };
1772
1773 let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1774
1775 edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1777
1778 Some(edge)
1779 }
1780
1781 #[must_use]
1784 #[cfg(feature = "tiered-storage")]
1785 pub fn get_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> Option<Edge> {
1786 let versions = self.edge_versions.read();
1787 let index = versions.get(&id)?;
1788 let version_ref = index.visible_at(epoch)?;
1789
1790 let record = self.read_edge_record(&version_ref)?;
1791
1792 if record.is_deleted() {
1793 return None;
1794 }
1795
1796 let edge_type = {
1797 let id_to_type = self.id_to_edge_type.read();
1798 id_to_type.get(record.type_id as usize)?.clone()
1799 };
1800
1801 let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1802
1803 edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1805
1806 Some(edge)
1807 }
1808
1809 #[must_use]
1811 #[cfg(not(feature = "tiered-storage"))]
1812 pub fn get_edge_versioned(&self, id: EdgeId, epoch: EpochId, tx_id: TxId) -> Option<Edge> {
1813 let edges = self.edges.read();
1814 let chain = edges.get(&id)?;
1815 let record = chain.visible_to(epoch, tx_id)?;
1816
1817 if record.is_deleted() {
1818 return None;
1819 }
1820
1821 let edge_type = {
1822 let id_to_type = self.id_to_edge_type.read();
1823 id_to_type.get(record.type_id as usize)?.clone()
1824 };
1825
1826 let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1827
1828 edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1830
1831 Some(edge)
1832 }
1833
1834 #[must_use]
1837 #[cfg(feature = "tiered-storage")]
1838 pub fn get_edge_versioned(&self, id: EdgeId, epoch: EpochId, tx_id: TxId) -> Option<Edge> {
1839 let versions = self.edge_versions.read();
1840 let index = versions.get(&id)?;
1841 let version_ref = index.visible_to(epoch, tx_id)?;
1842
1843 let record = self.read_edge_record(&version_ref)?;
1844
1845 if record.is_deleted() {
1846 return None;
1847 }
1848
1849 let edge_type = {
1850 let id_to_type = self.id_to_edge_type.read();
1851 id_to_type.get(record.type_id as usize)?.clone()
1852 };
1853
1854 let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1855
1856 edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1858
1859 Some(edge)
1860 }
1861
1862 #[cfg(feature = "tiered-storage")]
1864 #[allow(unsafe_code)]
1865 fn read_edge_record(&self, version_ref: &VersionRef) -> Option<EdgeRecord> {
1866 match version_ref {
1867 VersionRef::Hot(hot_ref) => {
1868 let arena = self.arena_allocator.arena(hot_ref.epoch);
1869 let record: &EdgeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
1871 Some(*record)
1872 }
1873 VersionRef::Cold(cold_ref) => {
1874 self.epoch_store
1876 .get_edge(cold_ref.epoch, cold_ref.block_offset, cold_ref.length)
1877 }
1878 }
1879 }
1880
1881 pub fn delete_edge(&self, id: EdgeId) -> bool {
1883 self.needs_stats_recompute.store(true, Ordering::Relaxed);
1884 self.delete_edge_at_epoch(id, self.current_epoch())
1885 }
1886
1887 #[cfg(not(feature = "tiered-storage"))]
1889 pub fn delete_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> bool {
1890 let mut edges = self.edges.write();
1891 if let Some(chain) = edges.get_mut(&id) {
1892 let (src, dst) = {
1894 match chain.visible_at(epoch) {
1895 Some(record) => {
1896 if record.is_deleted() {
1897 return false;
1898 }
1899 (record.src, record.dst)
1900 }
1901 None => return false, }
1903 };
1904
1905 chain.mark_deleted(epoch);
1907
1908 drop(edges); self.forward_adj.mark_deleted(src, id);
1912 if let Some(ref backward) = self.backward_adj {
1913 backward.mark_deleted(dst, id);
1914 }
1915
1916 self.edge_properties.remove_all(id);
1918
1919 true
1920 } else {
1921 false
1922 }
1923 }
1924
1925 #[cfg(feature = "tiered-storage")]
1928 pub fn delete_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> bool {
1929 let mut versions = self.edge_versions.write();
1930 if let Some(index) = versions.get_mut(&id) {
1931 let (src, dst) = {
1933 match index.visible_at(epoch) {
1934 Some(version_ref) => {
1935 if let Some(record) = self.read_edge_record(&version_ref) {
1936 if record.is_deleted() {
1937 return false;
1938 }
1939 (record.src, record.dst)
1940 } else {
1941 return false;
1942 }
1943 }
1944 None => return false,
1945 }
1946 };
1947
1948 index.mark_deleted(epoch);
1950
1951 drop(versions); self.forward_adj.mark_deleted(src, id);
1955 if let Some(ref backward) = self.backward_adj {
1956 backward.mark_deleted(dst, id);
1957 }
1958
1959 self.edge_properties.remove_all(id);
1961
1962 true
1963 } else {
1964 false
1965 }
1966 }
1967
1968 #[must_use]
1970 #[cfg(not(feature = "tiered-storage"))]
1971 pub fn edge_count(&self) -> usize {
1972 let epoch = self.current_epoch();
1973 self.edges
1974 .read()
1975 .values()
1976 .filter_map(|chain| chain.visible_at(epoch))
1977 .filter(|r| !r.is_deleted())
1978 .count()
1979 }
1980
1981 #[must_use]
1984 #[cfg(feature = "tiered-storage")]
1985 pub fn edge_count(&self) -> usize {
1986 let epoch = self.current_epoch();
1987 let versions = self.edge_versions.read();
1988 versions
1989 .iter()
1990 .filter(|(_, index)| {
1991 index.visible_at(epoch).map_or(false, |vref| {
1992 self.read_edge_record(&vref)
1993 .map_or(false, |r| !r.is_deleted())
1994 })
1995 })
1996 .count()
1997 }
1998
1999 #[cfg(not(feature = "tiered-storage"))]
2004 pub fn discard_uncommitted_versions(&self, tx_id: TxId) {
2005 {
2007 let mut nodes = self.nodes.write();
2008 for chain in nodes.values_mut() {
2009 chain.remove_versions_by(tx_id);
2010 }
2011 nodes.retain(|_, chain| !chain.is_empty());
2013 }
2014
2015 {
2017 let mut edges = self.edges.write();
2018 for chain in edges.values_mut() {
2019 chain.remove_versions_by(tx_id);
2020 }
2021 edges.retain(|_, chain| !chain.is_empty());
2023 }
2024 }
2025
2026 #[cfg(feature = "tiered-storage")]
2029 pub fn discard_uncommitted_versions(&self, tx_id: TxId) {
2030 {
2032 let mut versions = self.node_versions.write();
2033 for index in versions.values_mut() {
2034 index.remove_versions_by(tx_id);
2035 }
2036 versions.retain(|_, index| !index.is_empty());
2038 }
2039
2040 {
2042 let mut versions = self.edge_versions.write();
2043 for index in versions.values_mut() {
2044 index.remove_versions_by(tx_id);
2045 }
2046 versions.retain(|_, index| !index.is_empty());
2048 }
2049 }
2050
2051 #[cfg(feature = "tiered-storage")]
2070 #[allow(unsafe_code)]
2071 pub fn freeze_epoch(&self, epoch: EpochId) -> usize {
2072 let mut node_records: Vec<(u64, NodeRecord)> = Vec::new();
2074 let mut node_hot_refs: Vec<(NodeId, HotVersionRef)> = Vec::new();
2075
2076 {
2077 let versions = self.node_versions.read();
2078 for (node_id, index) in versions.iter() {
2079 for hot_ref in index.hot_refs_for_epoch(epoch) {
2080 let arena = self.arena_allocator.arena(hot_ref.epoch);
2081 let record: &NodeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
2083 node_records.push((node_id.as_u64(), *record));
2084 node_hot_refs.push((*node_id, *hot_ref));
2085 }
2086 }
2087 }
2088
2089 let mut edge_records: Vec<(u64, EdgeRecord)> = Vec::new();
2091 let mut edge_hot_refs: Vec<(EdgeId, HotVersionRef)> = Vec::new();
2092
2093 {
2094 let versions = self.edge_versions.read();
2095 for (edge_id, index) in versions.iter() {
2096 for hot_ref in index.hot_refs_for_epoch(epoch) {
2097 let arena = self.arena_allocator.arena(hot_ref.epoch);
2098 let record: &EdgeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
2100 edge_records.push((edge_id.as_u64(), *record));
2101 edge_hot_refs.push((*edge_id, *hot_ref));
2102 }
2103 }
2104 }
2105
2106 let total_frozen = node_records.len() + edge_records.len();
2107
2108 if total_frozen == 0 {
2109 return 0;
2110 }
2111
2112 let (node_entries, edge_entries) =
2114 self.epoch_store
2115 .freeze_epoch(epoch, node_records, edge_records);
2116
2117 let node_entry_map: FxHashMap<u64, _> = node_entries
2119 .iter()
2120 .map(|e| (e.entity_id, (e.offset, e.length)))
2121 .collect();
2122 let edge_entry_map: FxHashMap<u64, _> = edge_entries
2123 .iter()
2124 .map(|e| (e.entity_id, (e.offset, e.length)))
2125 .collect();
2126
2127 {
2129 let mut versions = self.node_versions.write();
2130 for (node_id, hot_ref) in &node_hot_refs {
2131 if let Some(index) = versions.get_mut(node_id)
2132 && let Some(&(offset, length)) = node_entry_map.get(&node_id.as_u64())
2133 {
2134 let cold_ref = ColdVersionRef {
2135 epoch,
2136 block_offset: offset,
2137 length,
2138 created_by: hot_ref.created_by,
2139 deleted_epoch: hot_ref.deleted_epoch,
2140 };
2141 index.freeze_epoch(epoch, std::iter::once(cold_ref));
2142 }
2143 }
2144 }
2145
2146 {
2147 let mut versions = self.edge_versions.write();
2148 for (edge_id, hot_ref) in &edge_hot_refs {
2149 if let Some(index) = versions.get_mut(edge_id)
2150 && let Some(&(offset, length)) = edge_entry_map.get(&edge_id.as_u64())
2151 {
2152 let cold_ref = ColdVersionRef {
2153 epoch,
2154 block_offset: offset,
2155 length,
2156 created_by: hot_ref.created_by,
2157 deleted_epoch: hot_ref.deleted_epoch,
2158 };
2159 index.freeze_epoch(epoch, std::iter::once(cold_ref));
2160 }
2161 }
2162 }
2163
2164 total_frozen
2165 }
2166
2167 #[cfg(feature = "tiered-storage")]
2169 #[must_use]
2170 pub fn epoch_store(&self) -> &EpochStore {
2171 &self.epoch_store
2172 }
2173
2174 #[must_use]
2176 pub fn label_count(&self) -> usize {
2177 self.id_to_label.read().len()
2178 }
2179
2180 #[must_use]
2184 pub fn property_key_count(&self) -> usize {
2185 let node_keys = self.node_properties.column_count();
2186 let edge_keys = self.edge_properties.column_count();
2187 node_keys + edge_keys
2191 }
2192
2193 #[must_use]
2195 pub fn edge_type_count(&self) -> usize {
2196 self.id_to_edge_type.read().len()
2197 }
2198
2199 pub fn neighbors(
2206 &self,
2207 node: NodeId,
2208 direction: Direction,
2209 ) -> impl Iterator<Item = NodeId> + '_ {
2210 let forward: Box<dyn Iterator<Item = NodeId>> = match direction {
2211 Direction::Outgoing | Direction::Both => {
2212 Box::new(self.forward_adj.neighbors(node).into_iter())
2213 }
2214 Direction::Incoming => Box::new(std::iter::empty()),
2215 };
2216
2217 let backward: Box<dyn Iterator<Item = NodeId>> = match direction {
2218 Direction::Incoming | Direction::Both => {
2219 if let Some(ref adj) = self.backward_adj {
2220 Box::new(adj.neighbors(node).into_iter())
2221 } else {
2222 Box::new(std::iter::empty())
2223 }
2224 }
2225 Direction::Outgoing => Box::new(std::iter::empty()),
2226 };
2227
2228 forward.chain(backward)
2229 }
2230
2231 pub fn edges_from(
2235 &self,
2236 node: NodeId,
2237 direction: Direction,
2238 ) -> impl Iterator<Item = (NodeId, EdgeId)> + '_ {
2239 let forward: Box<dyn Iterator<Item = (NodeId, EdgeId)>> = match direction {
2240 Direction::Outgoing | Direction::Both => {
2241 Box::new(self.forward_adj.edges_from(node).into_iter())
2242 }
2243 Direction::Incoming => Box::new(std::iter::empty()),
2244 };
2245
2246 let backward: Box<dyn Iterator<Item = (NodeId, EdgeId)>> = match direction {
2247 Direction::Incoming | Direction::Both => {
2248 if let Some(ref adj) = self.backward_adj {
2249 Box::new(adj.edges_from(node).into_iter())
2250 } else {
2251 Box::new(std::iter::empty())
2252 }
2253 }
2254 Direction::Outgoing => Box::new(std::iter::empty()),
2255 };
2256
2257 forward.chain(backward)
2258 }
2259
2260 pub fn edges_to(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
2273 if let Some(ref backward) = self.backward_adj {
2274 backward.edges_from(node)
2275 } else {
2276 self.all_edges()
2278 .filter_map(|edge| {
2279 if edge.dst == node {
2280 Some((edge.src, edge.id))
2281 } else {
2282 None
2283 }
2284 })
2285 .collect()
2286 }
2287 }
2288
2289 #[must_use]
2293 pub fn out_degree(&self, node: NodeId) -> usize {
2294 self.forward_adj.out_degree(node)
2295 }
2296
2297 #[must_use]
2302 pub fn in_degree(&self, node: NodeId) -> usize {
2303 if let Some(ref backward) = self.backward_adj {
2304 backward.in_degree(node)
2305 } else {
2306 self.all_edges().filter(|edge| edge.dst == node).count()
2308 }
2309 }
2310
2311 #[must_use]
2313 #[cfg(not(feature = "tiered-storage"))]
2314 pub fn edge_type(&self, id: EdgeId) -> Option<ArcStr> {
2315 let edges = self.edges.read();
2316 let chain = edges.get(&id)?;
2317 let epoch = self.current_epoch();
2318 let record = chain.visible_at(epoch)?;
2319 let id_to_type = self.id_to_edge_type.read();
2320 id_to_type.get(record.type_id as usize).cloned()
2321 }
2322
2323 #[must_use]
2326 #[cfg(feature = "tiered-storage")]
2327 pub fn edge_type(&self, id: EdgeId) -> Option<ArcStr> {
2328 let versions = self.edge_versions.read();
2329 let index = versions.get(&id)?;
2330 let epoch = self.current_epoch();
2331 let vref = index.visible_at(epoch)?;
2332 let record = self.read_edge_record(&vref)?;
2333 let id_to_type = self.id_to_edge_type.read();
2334 id_to_type.get(record.type_id as usize).cloned()
2335 }
2336
2337 pub fn nodes_by_label(&self, label: &str) -> Vec<NodeId> {
2343 let label_to_id = self.label_to_id.read();
2344 if let Some(&label_id) = label_to_id.get(label) {
2345 let index = self.label_index.read();
2346 if let Some(set) = index.get(label_id as usize) {
2347 let mut ids: Vec<NodeId> = set.keys().copied().collect();
2348 ids.sort_unstable();
2349 return ids;
2350 }
2351 }
2352 Vec::new()
2353 }
2354
2355 #[cfg(not(feature = "tiered-storage"))]
2362 pub fn all_nodes(&self) -> impl Iterator<Item = Node> + '_ {
2363 let epoch = self.current_epoch();
2364 let node_ids: Vec<NodeId> = self
2365 .nodes
2366 .read()
2367 .iter()
2368 .filter_map(|(id, chain)| {
2369 chain
2370 .visible_at(epoch)
2371 .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
2372 })
2373 .collect();
2374
2375 node_ids.into_iter().filter_map(move |id| self.get_node(id))
2376 }
2377
2378 #[cfg(feature = "tiered-storage")]
2381 pub fn all_nodes(&self) -> impl Iterator<Item = Node> + '_ {
2382 let node_ids = self.node_ids();
2383 node_ids.into_iter().filter_map(move |id| self.get_node(id))
2384 }
2385
2386 #[cfg(not(feature = "tiered-storage"))]
2391 pub fn all_edges(&self) -> impl Iterator<Item = Edge> + '_ {
2392 let epoch = self.current_epoch();
2393 let edge_ids: Vec<EdgeId> = self
2394 .edges
2395 .read()
2396 .iter()
2397 .filter_map(|(id, chain)| {
2398 chain
2399 .visible_at(epoch)
2400 .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
2401 })
2402 .collect();
2403
2404 edge_ids.into_iter().filter_map(move |id| self.get_edge(id))
2405 }
2406
2407 #[cfg(feature = "tiered-storage")]
2410 pub fn all_edges(&self) -> impl Iterator<Item = Edge> + '_ {
2411 let epoch = self.current_epoch();
2412 let versions = self.edge_versions.read();
2413 let edge_ids: Vec<EdgeId> = versions
2414 .iter()
2415 .filter_map(|(id, index)| {
2416 index.visible_at(epoch).and_then(|vref| {
2417 self.read_edge_record(&vref)
2418 .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
2419 })
2420 })
2421 .collect();
2422
2423 edge_ids.into_iter().filter_map(move |id| self.get_edge(id))
2424 }
2425
2426 pub fn all_labels(&self) -> Vec<String> {
2428 self.id_to_label
2429 .read()
2430 .iter()
2431 .map(|s| s.to_string())
2432 .collect()
2433 }
2434
2435 pub fn all_edge_types(&self) -> Vec<String> {
2437 self.id_to_edge_type
2438 .read()
2439 .iter()
2440 .map(|s| s.to_string())
2441 .collect()
2442 }
2443
2444 pub fn all_property_keys(&self) -> Vec<String> {
2446 let mut keys = std::collections::HashSet::new();
2447 for key in self.node_properties.keys() {
2448 keys.insert(key.to_string());
2449 }
2450 for key in self.edge_properties.keys() {
2451 keys.insert(key.to_string());
2452 }
2453 keys.into_iter().collect()
2454 }
2455
2456 pub fn nodes_with_label<'a>(&'a self, label: &str) -> impl Iterator<Item = Node> + 'a {
2458 let node_ids = self.nodes_by_label(label);
2459 node_ids.into_iter().filter_map(move |id| self.get_node(id))
2460 }
2461
2462 #[cfg(not(feature = "tiered-storage"))]
2464 pub fn edges_with_type<'a>(&'a self, edge_type: &str) -> impl Iterator<Item = Edge> + 'a {
2465 let epoch = self.current_epoch();
2466 let type_to_id = self.edge_type_to_id.read();
2467
2468 if let Some(&type_id) = type_to_id.get(edge_type) {
2469 let edge_ids: Vec<EdgeId> = self
2470 .edges
2471 .read()
2472 .iter()
2473 .filter_map(|(id, chain)| {
2474 chain.visible_at(epoch).and_then(|r| {
2475 if !r.is_deleted() && r.type_id == type_id {
2476 Some(*id)
2477 } else {
2478 None
2479 }
2480 })
2481 })
2482 .collect();
2483
2484 Box::new(edge_ids.into_iter().filter_map(move |id| self.get_edge(id)))
2486 as Box<dyn Iterator<Item = Edge> + 'a>
2487 } else {
2488 Box::new(std::iter::empty()) as Box<dyn Iterator<Item = Edge> + 'a>
2490 }
2491 }
2492
2493 #[cfg(feature = "tiered-storage")]
2496 pub fn edges_with_type<'a>(&'a self, edge_type: &str) -> impl Iterator<Item = Edge> + 'a {
2497 let epoch = self.current_epoch();
2498 let type_to_id = self.edge_type_to_id.read();
2499
2500 if let Some(&type_id) = type_to_id.get(edge_type) {
2501 let versions = self.edge_versions.read();
2502 let edge_ids: Vec<EdgeId> = versions
2503 .iter()
2504 .filter_map(|(id, index)| {
2505 index.visible_at(epoch).and_then(|vref| {
2506 self.read_edge_record(&vref).and_then(|r| {
2507 if !r.is_deleted() && r.type_id == type_id {
2508 Some(*id)
2509 } else {
2510 None
2511 }
2512 })
2513 })
2514 })
2515 .collect();
2516
2517 Box::new(edge_ids.into_iter().filter_map(move |id| self.get_edge(id)))
2518 as Box<dyn Iterator<Item = Edge> + 'a>
2519 } else {
2520 Box::new(std::iter::empty()) as Box<dyn Iterator<Item = Edge> + 'a>
2521 }
2522 }
2523
2524 #[must_use]
2531 pub fn node_property_might_match(
2532 &self,
2533 property: &PropertyKey,
2534 op: CompareOp,
2535 value: &Value,
2536 ) -> bool {
2537 self.node_properties.might_match(property, op, value)
2538 }
2539
2540 #[must_use]
2542 pub fn edge_property_might_match(
2543 &self,
2544 property: &PropertyKey,
2545 op: CompareOp,
2546 value: &Value,
2547 ) -> bool {
2548 self.edge_properties.might_match(property, op, value)
2549 }
2550
2551 #[must_use]
2553 pub fn node_property_zone_map(&self, property: &PropertyKey) -> Option<ZoneMapEntry> {
2554 self.node_properties.zone_map(property)
2555 }
2556
2557 #[must_use]
2559 pub fn edge_property_zone_map(&self, property: &PropertyKey) -> Option<ZoneMapEntry> {
2560 self.edge_properties.zone_map(property)
2561 }
2562
2563 pub fn rebuild_zone_maps(&self) {
2565 self.node_properties.rebuild_zone_maps();
2566 self.edge_properties.rebuild_zone_maps();
2567 }
2568
2569 #[must_use]
2573 pub fn statistics(&self) -> Statistics {
2574 self.statistics.read().clone()
2575 }
2576
2577 pub fn ensure_statistics_fresh(&self) {
2582 if self.needs_stats_recompute.swap(false, Ordering::Relaxed) {
2583 self.compute_statistics();
2584 }
2585 }
2586
2587 #[cfg(not(feature = "tiered-storage"))]
2592 pub fn compute_statistics(&self) {
2593 let mut stats = Statistics::new();
2594
2595 stats.total_nodes = self.node_count() as u64;
2597 stats.total_edges = self.edge_count() as u64;
2598
2599 let id_to_label = self.id_to_label.read();
2601 let label_index = self.label_index.read();
2602
2603 for (label_id, label_name) in id_to_label.iter().enumerate() {
2604 let node_count = label_index
2605 .get(label_id)
2606 .map(|set| set.len() as u64)
2607 .unwrap_or(0);
2608
2609 if node_count > 0 {
2610 let avg_out_degree = if stats.total_nodes > 0 {
2612 stats.total_edges as f64 / stats.total_nodes as f64
2613 } else {
2614 0.0
2615 };
2616
2617 let label_stats =
2618 LabelStatistics::new(node_count).with_degrees(avg_out_degree, avg_out_degree);
2619
2620 stats.update_label(label_name.as_ref(), label_stats);
2621 }
2622 }
2623
2624 let id_to_edge_type = self.id_to_edge_type.read();
2626 let edges = self.edges.read();
2627 let epoch = self.current_epoch();
2628
2629 let mut edge_type_counts: FxHashMap<u32, u64> = FxHashMap::default();
2630 for chain in edges.values() {
2631 if let Some(record) = chain.visible_at(epoch) {
2632 if !record.is_deleted() {
2633 *edge_type_counts.entry(record.type_id).or_default() += 1;
2634 }
2635 }
2636 }
2637
2638 for (type_id, count) in edge_type_counts {
2639 if let Some(type_name) = id_to_edge_type.get(type_id as usize) {
2640 let avg_degree = if stats.total_nodes > 0 {
2641 count as f64 / stats.total_nodes as f64
2642 } else {
2643 0.0
2644 };
2645
2646 let edge_stats = EdgeTypeStatistics::new(count, avg_degree, avg_degree);
2647 stats.update_edge_type(type_name.as_ref(), edge_stats);
2648 }
2649 }
2650
2651 *self.statistics.write() = stats;
2652 }
2653
2654 #[cfg(feature = "tiered-storage")]
2657 pub fn compute_statistics(&self) {
2658 let mut stats = Statistics::new();
2659
2660 stats.total_nodes = self.node_count() as u64;
2662 stats.total_edges = self.edge_count() as u64;
2663
2664 let id_to_label = self.id_to_label.read();
2666 let label_index = self.label_index.read();
2667
2668 for (label_id, label_name) in id_to_label.iter().enumerate() {
2669 let node_count = label_index
2670 .get(label_id)
2671 .map(|set| set.len() as u64)
2672 .unwrap_or(0);
2673
2674 if node_count > 0 {
2675 let avg_out_degree = if stats.total_nodes > 0 {
2676 stats.total_edges as f64 / stats.total_nodes as f64
2677 } else {
2678 0.0
2679 };
2680
2681 let label_stats =
2682 LabelStatistics::new(node_count).with_degrees(avg_out_degree, avg_out_degree);
2683
2684 stats.update_label(label_name.as_ref(), label_stats);
2685 }
2686 }
2687
2688 let id_to_edge_type = self.id_to_edge_type.read();
2690 let versions = self.edge_versions.read();
2691 let epoch = self.current_epoch();
2692
2693 let mut edge_type_counts: FxHashMap<u32, u64> = FxHashMap::default();
2694 for index in versions.values() {
2695 if let Some(vref) = index.visible_at(epoch)
2696 && let Some(record) = self.read_edge_record(&vref)
2697 && !record.is_deleted()
2698 {
2699 *edge_type_counts.entry(record.type_id).or_default() += 1;
2700 }
2701 }
2702
2703 for (type_id, count) in edge_type_counts {
2704 if let Some(type_name) = id_to_edge_type.get(type_id as usize) {
2705 let avg_degree = if stats.total_nodes > 0 {
2706 count as f64 / stats.total_nodes as f64
2707 } else {
2708 0.0
2709 };
2710
2711 let edge_stats = EdgeTypeStatistics::new(count, avg_degree, avg_degree);
2712 stats.update_edge_type(type_name.as_ref(), edge_stats);
2713 }
2714 }
2715
2716 *self.statistics.write() = stats;
2717 }
2718
2719 #[must_use]
2721 pub fn estimate_label_cardinality(&self, label: &str) -> f64 {
2722 self.statistics.read().estimate_label_cardinality(label)
2723 }
2724
2725 #[must_use]
2727 pub fn estimate_avg_degree(&self, edge_type: &str, outgoing: bool) -> f64 {
2728 self.statistics
2729 .read()
2730 .estimate_avg_degree(edge_type, outgoing)
2731 }
2732
2733 fn get_or_create_label_id(&self, label: &str) -> u32 {
2736 {
2737 let label_to_id = self.label_to_id.read();
2738 if let Some(&id) = label_to_id.get(label) {
2739 return id;
2740 }
2741 }
2742
2743 let mut label_to_id = self.label_to_id.write();
2744 let mut id_to_label = self.id_to_label.write();
2745
2746 if let Some(&id) = label_to_id.get(label) {
2748 return id;
2749 }
2750
2751 let id = id_to_label.len() as u32;
2752
2753 let label: ArcStr = label.into();
2754 label_to_id.insert(label.clone(), id);
2755 id_to_label.push(label);
2756
2757 id
2758 }
2759
2760 fn get_or_create_edge_type_id(&self, edge_type: &str) -> u32 {
2761 {
2762 let type_to_id = self.edge_type_to_id.read();
2763 if let Some(&id) = type_to_id.get(edge_type) {
2764 return id;
2765 }
2766 }
2767
2768 let mut type_to_id = self.edge_type_to_id.write();
2769 let mut id_to_type = self.id_to_edge_type.write();
2770
2771 if let Some(&id) = type_to_id.get(edge_type) {
2773 return id;
2774 }
2775
2776 let id = id_to_type.len() as u32;
2777 let edge_type: ArcStr = edge_type.into();
2778 type_to_id.insert(edge_type.clone(), id);
2779 id_to_type.push(edge_type);
2780
2781 id
2782 }
2783
2784 #[cfg(not(feature = "tiered-storage"))]
2791 pub fn create_node_with_id(&self, id: NodeId, labels: &[&str]) {
2792 let epoch = self.current_epoch();
2793 let mut record = NodeRecord::new(id, epoch);
2794 record.set_label_count(labels.len() as u16);
2795
2796 let mut node_label_set = FxHashSet::default();
2798 for label in labels {
2799 let label_id = self.get_or_create_label_id(*label);
2800 node_label_set.insert(label_id);
2801
2802 let mut index = self.label_index.write();
2804 while index.len() <= label_id as usize {
2805 index.push(FxHashMap::default());
2806 }
2807 index[label_id as usize].insert(id, ());
2808 }
2809
2810 self.node_labels.write().insert(id, node_label_set);
2812
2813 let chain = VersionChain::with_initial(record, epoch, TxId::SYSTEM);
2815 self.nodes.write().insert(id, chain);
2816
2817 let id_val = id.as_u64();
2819 let _ = self
2820 .next_node_id
2821 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
2822 if id_val >= current {
2823 Some(id_val + 1)
2824 } else {
2825 None
2826 }
2827 });
2828 }
2829
2830 #[cfg(feature = "tiered-storage")]
2833 pub fn create_node_with_id(&self, id: NodeId, labels: &[&str]) {
2834 let epoch = self.current_epoch();
2835 let mut record = NodeRecord::new(id, epoch);
2836 record.set_label_count(labels.len() as u16);
2837
2838 let mut node_label_set = FxHashSet::default();
2840 for label in labels {
2841 let label_id = self.get_or_create_label_id(*label);
2842 node_label_set.insert(label_id);
2843
2844 let mut index = self.label_index.write();
2846 while index.len() <= label_id as usize {
2847 index.push(FxHashMap::default());
2848 }
2849 index[label_id as usize].insert(id, ());
2850 }
2851
2852 self.node_labels.write().insert(id, node_label_set);
2854
2855 let arena = self.arena_allocator.arena_or_create(epoch);
2857 let (offset, _stored) = arena.alloc_value_with_offset(record);
2858
2859 let hot_ref = HotVersionRef::new(epoch, offset, TxId::SYSTEM);
2861 let mut versions = self.node_versions.write();
2862 versions.insert(id, VersionIndex::with_initial(hot_ref));
2863
2864 let id_val = id.as_u64();
2866 let _ = self
2867 .next_node_id
2868 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
2869 if id_val >= current {
2870 Some(id_val + 1)
2871 } else {
2872 None
2873 }
2874 });
2875 }
2876
2877 #[cfg(not(feature = "tiered-storage"))]
2881 pub fn create_edge_with_id(&self, id: EdgeId, src: NodeId, dst: NodeId, edge_type: &str) {
2882 let epoch = self.current_epoch();
2883 let type_id = self.get_or_create_edge_type_id(edge_type);
2884
2885 let record = EdgeRecord::new(id, src, dst, type_id, epoch);
2886 let chain = VersionChain::with_initial(record, epoch, TxId::SYSTEM);
2887 self.edges.write().insert(id, chain);
2888
2889 self.forward_adj.add_edge(src, dst, id);
2891 if let Some(ref backward) = self.backward_adj {
2892 backward.add_edge(dst, src, id);
2893 }
2894
2895 let id_val = id.as_u64();
2897 let _ = self
2898 .next_edge_id
2899 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
2900 if id_val >= current {
2901 Some(id_val + 1)
2902 } else {
2903 None
2904 }
2905 });
2906 }
2907
2908 #[cfg(feature = "tiered-storage")]
2911 pub fn create_edge_with_id(&self, id: EdgeId, src: NodeId, dst: NodeId, edge_type: &str) {
2912 let epoch = self.current_epoch();
2913 let type_id = self.get_or_create_edge_type_id(edge_type);
2914
2915 let record = EdgeRecord::new(id, src, dst, type_id, epoch);
2916
2917 let arena = self.arena_allocator.arena_or_create(epoch);
2919 let (offset, _stored) = arena.alloc_value_with_offset(record);
2920
2921 let hot_ref = HotVersionRef::new(epoch, offset, TxId::SYSTEM);
2923 let mut versions = self.edge_versions.write();
2924 versions.insert(id, VersionIndex::with_initial(hot_ref));
2925
2926 self.forward_adj.add_edge(src, dst, id);
2928 if let Some(ref backward) = self.backward_adj {
2929 backward.add_edge(dst, src, id);
2930 }
2931
2932 let id_val = id.as_u64();
2934 let _ = self
2935 .next_edge_id
2936 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
2937 if id_val >= current {
2938 Some(id_val + 1)
2939 } else {
2940 None
2941 }
2942 });
2943 }
2944
2945 pub fn set_epoch(&self, epoch: EpochId) {
2947 self.current_epoch.store(epoch.as_u64(), Ordering::SeqCst);
2948 }
2949}
2950
2951impl Default for LpgStore {
2952 fn default() -> Self {
2953 Self::new()
2954 }
2955}
2956
2957#[cfg(test)]
2958mod tests {
2959 use super::*;
2960
2961 #[test]
2962 fn test_create_node() {
2963 let store = LpgStore::new();
2964
2965 let id = store.create_node(&["Person"]);
2966 assert!(id.is_valid());
2967
2968 let node = store.get_node(id).unwrap();
2969 assert!(node.has_label("Person"));
2970 assert!(!node.has_label("Animal"));
2971 }
2972
2973 #[test]
2974 fn test_create_node_with_props() {
2975 let store = LpgStore::new();
2976
2977 let id = store.create_node_with_props(
2978 &["Person"],
2979 [("name", Value::from("Alice")), ("age", Value::from(30i64))],
2980 );
2981
2982 let node = store.get_node(id).unwrap();
2983 assert_eq!(
2984 node.get_property("name").and_then(|v| v.as_str()),
2985 Some("Alice")
2986 );
2987 assert_eq!(
2988 node.get_property("age").and_then(|v| v.as_int64()),
2989 Some(30)
2990 );
2991 }
2992
2993 #[test]
2994 fn test_delete_node() {
2995 let store = LpgStore::new();
2996
2997 let id = store.create_node(&["Person"]);
2998 assert_eq!(store.node_count(), 1);
2999
3000 assert!(store.delete_node(id));
3001 assert_eq!(store.node_count(), 0);
3002 assert!(store.get_node(id).is_none());
3003
3004 assert!(!store.delete_node(id));
3006 }
3007
3008 #[test]
3009 fn test_create_edge() {
3010 let store = LpgStore::new();
3011
3012 let alice = store.create_node(&["Person"]);
3013 let bob = store.create_node(&["Person"]);
3014
3015 let edge_id = store.create_edge(alice, bob, "KNOWS");
3016 assert!(edge_id.is_valid());
3017
3018 let edge = store.get_edge(edge_id).unwrap();
3019 assert_eq!(edge.src, alice);
3020 assert_eq!(edge.dst, bob);
3021 assert_eq!(edge.edge_type.as_str(), "KNOWS");
3022 }
3023
3024 #[test]
3025 fn test_neighbors() {
3026 let store = LpgStore::new();
3027
3028 let a = store.create_node(&["Person"]);
3029 let b = store.create_node(&["Person"]);
3030 let c = store.create_node(&["Person"]);
3031
3032 store.create_edge(a, b, "KNOWS");
3033 store.create_edge(a, c, "KNOWS");
3034
3035 let outgoing: Vec<_> = store.neighbors(a, Direction::Outgoing).collect();
3036 assert_eq!(outgoing.len(), 2);
3037 assert!(outgoing.contains(&b));
3038 assert!(outgoing.contains(&c));
3039
3040 let incoming: Vec<_> = store.neighbors(b, Direction::Incoming).collect();
3041 assert_eq!(incoming.len(), 1);
3042 assert!(incoming.contains(&a));
3043 }
3044
3045 #[test]
3046 fn test_nodes_by_label() {
3047 let store = LpgStore::new();
3048
3049 let p1 = store.create_node(&["Person"]);
3050 let p2 = store.create_node(&["Person"]);
3051 let _a = store.create_node(&["Animal"]);
3052
3053 let persons = store.nodes_by_label("Person");
3054 assert_eq!(persons.len(), 2);
3055 assert!(persons.contains(&p1));
3056 assert!(persons.contains(&p2));
3057
3058 let animals = store.nodes_by_label("Animal");
3059 assert_eq!(animals.len(), 1);
3060 }
3061
3062 #[test]
3063 fn test_delete_edge() {
3064 let store = LpgStore::new();
3065
3066 let a = store.create_node(&["Person"]);
3067 let b = store.create_node(&["Person"]);
3068 let edge_id = store.create_edge(a, b, "KNOWS");
3069
3070 assert_eq!(store.edge_count(), 1);
3071
3072 assert!(store.delete_edge(edge_id));
3073 assert_eq!(store.edge_count(), 0);
3074 assert!(store.get_edge(edge_id).is_none());
3075 }
3076
3077 #[test]
3080 fn test_lpg_store_config() {
3081 let config = LpgStoreConfig {
3083 backward_edges: false,
3084 initial_node_capacity: 100,
3085 initial_edge_capacity: 200,
3086 };
3087 let store = LpgStore::with_config(config);
3088
3089 let a = store.create_node(&["Person"]);
3091 let b = store.create_node(&["Person"]);
3092 store.create_edge(a, b, "KNOWS");
3093
3094 let outgoing: Vec<_> = store.neighbors(a, Direction::Outgoing).collect();
3096 assert_eq!(outgoing.len(), 1);
3097
3098 let incoming: Vec<_> = store.neighbors(b, Direction::Incoming).collect();
3100 assert_eq!(incoming.len(), 0);
3101 }
3102
3103 #[test]
3104 fn test_epoch_management() {
3105 let store = LpgStore::new();
3106
3107 let epoch0 = store.current_epoch();
3108 assert_eq!(epoch0.as_u64(), 0);
3109
3110 let epoch1 = store.new_epoch();
3111 assert_eq!(epoch1.as_u64(), 1);
3112
3113 let current = store.current_epoch();
3114 assert_eq!(current.as_u64(), 1);
3115 }
3116
3117 #[test]
3118 fn test_node_properties() {
3119 let store = LpgStore::new();
3120 let id = store.create_node(&["Person"]);
3121
3122 store.set_node_property(id, "name", Value::from("Alice"));
3124 let name = store.get_node_property(id, &"name".into());
3125 assert!(matches!(name, Some(Value::String(s)) if s.as_str() == "Alice"));
3126
3127 store.set_node_property(id, "name", Value::from("Bob"));
3129 let name = store.get_node_property(id, &"name".into());
3130 assert!(matches!(name, Some(Value::String(s)) if s.as_str() == "Bob"));
3131
3132 let old = store.remove_node_property(id, "name");
3134 assert!(matches!(old, Some(Value::String(s)) if s.as_str() == "Bob"));
3135
3136 let name = store.get_node_property(id, &"name".into());
3138 assert!(name.is_none());
3139
3140 let none = store.remove_node_property(id, "nonexistent");
3142 assert!(none.is_none());
3143 }
3144
3145 #[test]
3146 fn test_edge_properties() {
3147 let store = LpgStore::new();
3148 let a = store.create_node(&["Person"]);
3149 let b = store.create_node(&["Person"]);
3150 let edge_id = store.create_edge(a, b, "KNOWS");
3151
3152 store.set_edge_property(edge_id, "since", Value::from(2020i64));
3154 let since = store.get_edge_property(edge_id, &"since".into());
3155 assert_eq!(since.and_then(|v| v.as_int64()), Some(2020));
3156
3157 let old = store.remove_edge_property(edge_id, "since");
3159 assert_eq!(old.and_then(|v| v.as_int64()), Some(2020));
3160
3161 let since = store.get_edge_property(edge_id, &"since".into());
3162 assert!(since.is_none());
3163 }
3164
3165 #[test]
3166 fn test_add_remove_label() {
3167 let store = LpgStore::new();
3168 let id = store.create_node(&["Person"]);
3169
3170 assert!(store.add_label(id, "Employee"));
3172
3173 let node = store.get_node(id).unwrap();
3174 assert!(node.has_label("Person"));
3175 assert!(node.has_label("Employee"));
3176
3177 assert!(!store.add_label(id, "Employee"));
3179
3180 assert!(store.remove_label(id, "Employee"));
3182
3183 let node = store.get_node(id).unwrap();
3184 assert!(node.has_label("Person"));
3185 assert!(!node.has_label("Employee"));
3186
3187 assert!(!store.remove_label(id, "Employee"));
3189 assert!(!store.remove_label(id, "NonExistent"));
3190 }
3191
3192 #[test]
3193 fn test_add_label_to_nonexistent_node() {
3194 let store = LpgStore::new();
3195 let fake_id = NodeId::new(999);
3196 assert!(!store.add_label(fake_id, "Label"));
3197 }
3198
3199 #[test]
3200 fn test_remove_label_from_nonexistent_node() {
3201 let store = LpgStore::new();
3202 let fake_id = NodeId::new(999);
3203 assert!(!store.remove_label(fake_id, "Label"));
3204 }
3205
3206 #[test]
3207 fn test_node_ids() {
3208 let store = LpgStore::new();
3209
3210 let n1 = store.create_node(&["Person"]);
3211 let n2 = store.create_node(&["Person"]);
3212 let n3 = store.create_node(&["Person"]);
3213
3214 let ids = store.node_ids();
3215 assert_eq!(ids.len(), 3);
3216 assert!(ids.contains(&n1));
3217 assert!(ids.contains(&n2));
3218 assert!(ids.contains(&n3));
3219
3220 store.delete_node(n2);
3222 let ids = store.node_ids();
3223 assert_eq!(ids.len(), 2);
3224 assert!(!ids.contains(&n2));
3225 }
3226
3227 #[test]
3228 fn test_delete_node_nonexistent() {
3229 let store = LpgStore::new();
3230 let fake_id = NodeId::new(999);
3231 assert!(!store.delete_node(fake_id));
3232 }
3233
3234 #[test]
3235 fn test_delete_edge_nonexistent() {
3236 let store = LpgStore::new();
3237 let fake_id = EdgeId::new(999);
3238 assert!(!store.delete_edge(fake_id));
3239 }
3240
3241 #[test]
3242 fn test_delete_edge_double() {
3243 let store = LpgStore::new();
3244 let a = store.create_node(&["Person"]);
3245 let b = store.create_node(&["Person"]);
3246 let edge_id = store.create_edge(a, b, "KNOWS");
3247
3248 assert!(store.delete_edge(edge_id));
3249 assert!(!store.delete_edge(edge_id)); }
3251
3252 #[test]
3253 fn test_create_edge_with_props() {
3254 let store = LpgStore::new();
3255 let a = store.create_node(&["Person"]);
3256 let b = store.create_node(&["Person"]);
3257
3258 let edge_id = store.create_edge_with_props(
3259 a,
3260 b,
3261 "KNOWS",
3262 [
3263 ("since", Value::from(2020i64)),
3264 ("weight", Value::from(1.0)),
3265 ],
3266 );
3267
3268 let edge = store.get_edge(edge_id).unwrap();
3269 assert_eq!(
3270 edge.get_property("since").and_then(|v| v.as_int64()),
3271 Some(2020)
3272 );
3273 assert_eq!(
3274 edge.get_property("weight").and_then(|v| v.as_float64()),
3275 Some(1.0)
3276 );
3277 }
3278
3279 #[test]
3280 fn test_delete_node_edges() {
3281 let store = LpgStore::new();
3282
3283 let a = store.create_node(&["Person"]);
3284 let b = store.create_node(&["Person"]);
3285 let c = store.create_node(&["Person"]);
3286
3287 store.create_edge(a, b, "KNOWS"); store.create_edge(c, a, "KNOWS"); assert_eq!(store.edge_count(), 2);
3291
3292 store.delete_node_edges(a);
3294
3295 assert_eq!(store.edge_count(), 0);
3296 }
3297
3298 #[test]
3299 fn test_neighbors_both_directions() {
3300 let store = LpgStore::new();
3301
3302 let a = store.create_node(&["Person"]);
3303 let b = store.create_node(&["Person"]);
3304 let c = store.create_node(&["Person"]);
3305
3306 store.create_edge(a, b, "KNOWS"); store.create_edge(c, a, "KNOWS"); let neighbors: Vec<_> = store.neighbors(a, Direction::Both).collect();
3311 assert_eq!(neighbors.len(), 2);
3312 assert!(neighbors.contains(&b)); assert!(neighbors.contains(&c)); }
3315
3316 #[test]
3317 fn test_edges_from() {
3318 let store = LpgStore::new();
3319
3320 let a = store.create_node(&["Person"]);
3321 let b = store.create_node(&["Person"]);
3322 let c = store.create_node(&["Person"]);
3323
3324 let e1 = store.create_edge(a, b, "KNOWS");
3325 let e2 = store.create_edge(a, c, "KNOWS");
3326
3327 let edges: Vec<_> = store.edges_from(a, Direction::Outgoing).collect();
3328 assert_eq!(edges.len(), 2);
3329 assert!(edges.iter().any(|(_, e)| *e == e1));
3330 assert!(edges.iter().any(|(_, e)| *e == e2));
3331
3332 let incoming: Vec<_> = store.edges_from(b, Direction::Incoming).collect();
3334 assert_eq!(incoming.len(), 1);
3335 assert_eq!(incoming[0].1, e1);
3336 }
3337
3338 #[test]
3339 fn test_edges_to() {
3340 let store = LpgStore::new();
3341
3342 let a = store.create_node(&["Person"]);
3343 let b = store.create_node(&["Person"]);
3344 let c = store.create_node(&["Person"]);
3345
3346 let e1 = store.create_edge(a, b, "KNOWS");
3347 let e2 = store.create_edge(c, b, "KNOWS");
3348
3349 let to_b = store.edges_to(b);
3351 assert_eq!(to_b.len(), 2);
3352 assert!(to_b.iter().any(|(src, e)| *src == a && *e == e1));
3353 assert!(to_b.iter().any(|(src, e)| *src == c && *e == e2));
3354 }
3355
3356 #[test]
3357 fn test_out_degree_in_degree() {
3358 let store = LpgStore::new();
3359
3360 let a = store.create_node(&["Person"]);
3361 let b = store.create_node(&["Person"]);
3362 let c = store.create_node(&["Person"]);
3363
3364 store.create_edge(a, b, "KNOWS");
3365 store.create_edge(a, c, "KNOWS");
3366 store.create_edge(c, b, "KNOWS");
3367
3368 assert_eq!(store.out_degree(a), 2);
3369 assert_eq!(store.out_degree(b), 0);
3370 assert_eq!(store.out_degree(c), 1);
3371
3372 assert_eq!(store.in_degree(a), 0);
3373 assert_eq!(store.in_degree(b), 2);
3374 assert_eq!(store.in_degree(c), 1);
3375 }
3376
3377 #[test]
3378 fn test_edge_type() {
3379 let store = LpgStore::new();
3380
3381 let a = store.create_node(&["Person"]);
3382 let b = store.create_node(&["Person"]);
3383 let edge_id = store.create_edge(a, b, "KNOWS");
3384
3385 let edge_type = store.edge_type(edge_id);
3386 assert_eq!(edge_type.as_deref(), Some("KNOWS"));
3387
3388 let fake_id = EdgeId::new(999);
3390 assert!(store.edge_type(fake_id).is_none());
3391 }
3392
3393 #[test]
3394 fn test_count_methods() {
3395 let store = LpgStore::new();
3396
3397 assert_eq!(store.label_count(), 0);
3398 assert_eq!(store.edge_type_count(), 0);
3399 assert_eq!(store.property_key_count(), 0);
3400
3401 let a = store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
3402 let b = store.create_node(&["Company"]);
3403 store.create_edge_with_props(a, b, "WORKS_AT", [("since", Value::from(2020i64))]);
3404
3405 assert_eq!(store.label_count(), 2); assert_eq!(store.edge_type_count(), 1); assert_eq!(store.property_key_count(), 2); }
3409
3410 #[test]
3411 fn test_all_nodes_and_edges() {
3412 let store = LpgStore::new();
3413
3414 let a = store.create_node(&["Person"]);
3415 let b = store.create_node(&["Person"]);
3416 store.create_edge(a, b, "KNOWS");
3417
3418 let nodes: Vec<_> = store.all_nodes().collect();
3419 assert_eq!(nodes.len(), 2);
3420
3421 let edges: Vec<_> = store.all_edges().collect();
3422 assert_eq!(edges.len(), 1);
3423 }
3424
3425 #[test]
3426 fn test_all_labels_and_edge_types() {
3427 let store = LpgStore::new();
3428
3429 store.create_node(&["Person"]);
3430 store.create_node(&["Company"]);
3431 let a = store.create_node(&["Animal"]);
3432 let b = store.create_node(&["Animal"]);
3433 store.create_edge(a, b, "EATS");
3434
3435 let labels = store.all_labels();
3436 assert_eq!(labels.len(), 3);
3437 assert!(labels.contains(&"Person".to_string()));
3438 assert!(labels.contains(&"Company".to_string()));
3439 assert!(labels.contains(&"Animal".to_string()));
3440
3441 let edge_types = store.all_edge_types();
3442 assert_eq!(edge_types.len(), 1);
3443 assert!(edge_types.contains(&"EATS".to_string()));
3444 }
3445
3446 #[test]
3447 fn test_all_property_keys() {
3448 let store = LpgStore::new();
3449
3450 let a = store.create_node_with_props(&["Person"], [("name", Value::from("Alice"))]);
3451 let b = store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
3452 store.create_edge_with_props(a, b, "KNOWS", [("since", Value::from(2020i64))]);
3453
3454 let keys = store.all_property_keys();
3455 assert!(keys.contains(&"name".to_string()));
3456 assert!(keys.contains(&"age".to_string()));
3457 assert!(keys.contains(&"since".to_string()));
3458 }
3459
3460 #[test]
3461 fn test_nodes_with_label() {
3462 let store = LpgStore::new();
3463
3464 store.create_node(&["Person"]);
3465 store.create_node(&["Person"]);
3466 store.create_node(&["Company"]);
3467
3468 let persons: Vec<_> = store.nodes_with_label("Person").collect();
3469 assert_eq!(persons.len(), 2);
3470
3471 let companies: Vec<_> = store.nodes_with_label("Company").collect();
3472 assert_eq!(companies.len(), 1);
3473
3474 let none: Vec<_> = store.nodes_with_label("NonExistent").collect();
3475 assert_eq!(none.len(), 0);
3476 }
3477
3478 #[test]
3479 fn test_edges_with_type() {
3480 let store = LpgStore::new();
3481
3482 let a = store.create_node(&["Person"]);
3483 let b = store.create_node(&["Person"]);
3484 let c = store.create_node(&["Company"]);
3485
3486 store.create_edge(a, b, "KNOWS");
3487 store.create_edge(a, c, "WORKS_AT");
3488
3489 let knows: Vec<_> = store.edges_with_type("KNOWS").collect();
3490 assert_eq!(knows.len(), 1);
3491
3492 let works_at: Vec<_> = store.edges_with_type("WORKS_AT").collect();
3493 assert_eq!(works_at.len(), 1);
3494
3495 let none: Vec<_> = store.edges_with_type("NonExistent").collect();
3496 assert_eq!(none.len(), 0);
3497 }
3498
3499 #[test]
3500 fn test_nodes_by_label_nonexistent() {
3501 let store = LpgStore::new();
3502 store.create_node(&["Person"]);
3503
3504 let empty = store.nodes_by_label("NonExistent");
3505 assert!(empty.is_empty());
3506 }
3507
3508 #[test]
3509 fn test_statistics() {
3510 let store = LpgStore::new();
3511
3512 let a = store.create_node(&["Person"]);
3513 let b = store.create_node(&["Person"]);
3514 let c = store.create_node(&["Company"]);
3515
3516 store.create_edge(a, b, "KNOWS");
3517 store.create_edge(a, c, "WORKS_AT");
3518
3519 store.compute_statistics();
3520 let stats = store.statistics();
3521
3522 assert_eq!(stats.total_nodes, 3);
3523 assert_eq!(stats.total_edges, 2);
3524
3525 let person_card = store.estimate_label_cardinality("Person");
3527 assert!(person_card > 0.0);
3528
3529 let avg_degree = store.estimate_avg_degree("KNOWS", true);
3530 assert!(avg_degree >= 0.0);
3531 }
3532
3533 #[test]
3534 fn test_zone_maps() {
3535 let store = LpgStore::new();
3536
3537 store.create_node_with_props(&["Person"], [("age", Value::from(25i64))]);
3538 store.create_node_with_props(&["Person"], [("age", Value::from(35i64))]);
3539
3540 let might_match =
3542 store.node_property_might_match(&"age".into(), CompareOp::Eq, &Value::from(30i64));
3543 assert!(might_match);
3545
3546 let zone = store.node_property_zone_map(&"age".into());
3547 assert!(zone.is_some());
3548
3549 let no_zone = store.node_property_zone_map(&"nonexistent".into());
3551 assert!(no_zone.is_none());
3552
3553 let a = store.create_node(&["A"]);
3555 let b = store.create_node(&["B"]);
3556 store.create_edge_with_props(a, b, "REL", [("weight", Value::from(1.0))]);
3557
3558 let edge_zone = store.edge_property_zone_map(&"weight".into());
3559 assert!(edge_zone.is_some());
3560 }
3561
3562 #[test]
3563 fn test_rebuild_zone_maps() {
3564 let store = LpgStore::new();
3565 store.create_node_with_props(&["Person"], [("age", Value::from(25i64))]);
3566
3567 store.rebuild_zone_maps();
3569 }
3570
3571 #[test]
3572 fn test_create_node_with_id() {
3573 let store = LpgStore::new();
3574
3575 let specific_id = NodeId::new(100);
3576 store.create_node_with_id(specific_id, &["Person", "Employee"]);
3577
3578 let node = store.get_node(specific_id).unwrap();
3579 assert!(node.has_label("Person"));
3580 assert!(node.has_label("Employee"));
3581
3582 let next = store.create_node(&["Other"]);
3584 assert!(next.as_u64() > 100);
3585 }
3586
3587 #[test]
3588 fn test_create_edge_with_id() {
3589 let store = LpgStore::new();
3590
3591 let a = store.create_node(&["A"]);
3592 let b = store.create_node(&["B"]);
3593
3594 let specific_id = EdgeId::new(500);
3595 store.create_edge_with_id(specific_id, a, b, "REL");
3596
3597 let edge = store.get_edge(specific_id).unwrap();
3598 assert_eq!(edge.src, a);
3599 assert_eq!(edge.dst, b);
3600 assert_eq!(edge.edge_type.as_str(), "REL");
3601
3602 let next = store.create_edge(a, b, "OTHER");
3604 assert!(next.as_u64() > 500);
3605 }
3606
3607 #[test]
3608 fn test_set_epoch() {
3609 let store = LpgStore::new();
3610
3611 assert_eq!(store.current_epoch().as_u64(), 0);
3612
3613 store.set_epoch(EpochId::new(42));
3614 assert_eq!(store.current_epoch().as_u64(), 42);
3615 }
3616
3617 #[test]
3618 fn test_get_node_nonexistent() {
3619 let store = LpgStore::new();
3620 let fake_id = NodeId::new(999);
3621 assert!(store.get_node(fake_id).is_none());
3622 }
3623
3624 #[test]
3625 fn test_get_edge_nonexistent() {
3626 let store = LpgStore::new();
3627 let fake_id = EdgeId::new(999);
3628 assert!(store.get_edge(fake_id).is_none());
3629 }
3630
3631 #[test]
3632 fn test_multiple_labels() {
3633 let store = LpgStore::new();
3634
3635 let id = store.create_node(&["Person", "Employee", "Manager"]);
3636 let node = store.get_node(id).unwrap();
3637
3638 assert!(node.has_label("Person"));
3639 assert!(node.has_label("Employee"));
3640 assert!(node.has_label("Manager"));
3641 assert!(!node.has_label("Other"));
3642 }
3643
3644 #[test]
3645 fn test_default_impl() {
3646 let store: LpgStore = Default::default();
3647 assert_eq!(store.node_count(), 0);
3648 assert_eq!(store.edge_count(), 0);
3649 }
3650
3651 #[test]
3652 fn test_edges_from_both_directions() {
3653 let store = LpgStore::new();
3654
3655 let a = store.create_node(&["A"]);
3656 let b = store.create_node(&["B"]);
3657 let c = store.create_node(&["C"]);
3658
3659 let e1 = store.create_edge(a, b, "R1"); let e2 = store.create_edge(c, a, "R2"); let edges: Vec<_> = store.edges_from(a, Direction::Both).collect();
3664 assert_eq!(edges.len(), 2);
3665 assert!(edges.iter().any(|(_, e)| *e == e1)); assert!(edges.iter().any(|(_, e)| *e == e2)); }
3668
3669 #[test]
3670 fn test_no_backward_adj_in_degree() {
3671 let config = LpgStoreConfig {
3672 backward_edges: false,
3673 initial_node_capacity: 10,
3674 initial_edge_capacity: 10,
3675 };
3676 let store = LpgStore::with_config(config);
3677
3678 let a = store.create_node(&["A"]);
3679 let b = store.create_node(&["B"]);
3680 store.create_edge(a, b, "R");
3681
3682 let degree = store.in_degree(b);
3684 assert_eq!(degree, 1);
3685 }
3686
3687 #[test]
3688 fn test_no_backward_adj_edges_to() {
3689 let config = LpgStoreConfig {
3690 backward_edges: false,
3691 initial_node_capacity: 10,
3692 initial_edge_capacity: 10,
3693 };
3694 let store = LpgStore::with_config(config);
3695
3696 let a = store.create_node(&["A"]);
3697 let b = store.create_node(&["B"]);
3698 let e = store.create_edge(a, b, "R");
3699
3700 let edges = store.edges_to(b);
3702 assert_eq!(edges.len(), 1);
3703 assert_eq!(edges[0].1, e);
3704 }
3705
3706 #[test]
3707 fn test_node_versioned_creation() {
3708 let store = LpgStore::new();
3709
3710 let epoch = store.new_epoch();
3711 let tx_id = TxId::new(1);
3712
3713 let id = store.create_node_versioned(&["Person"], epoch, tx_id);
3714 assert!(store.get_node(id).is_some());
3715 }
3716
3717 #[test]
3718 fn test_edge_versioned_creation() {
3719 let store = LpgStore::new();
3720
3721 let a = store.create_node(&["A"]);
3722 let b = store.create_node(&["B"]);
3723
3724 let epoch = store.new_epoch();
3725 let tx_id = TxId::new(1);
3726
3727 let edge_id = store.create_edge_versioned(a, b, "REL", epoch, tx_id);
3728 assert!(store.get_edge(edge_id).is_some());
3729 }
3730
3731 #[test]
3732 fn test_node_with_props_versioned() {
3733 let store = LpgStore::new();
3734
3735 let epoch = store.new_epoch();
3736 let tx_id = TxId::new(1);
3737
3738 let id = store.create_node_with_props_versioned(
3739 &["Person"],
3740 [("name", Value::from("Alice"))],
3741 epoch,
3742 tx_id,
3743 );
3744
3745 let node = store.get_node(id).unwrap();
3746 assert_eq!(
3747 node.get_property("name").and_then(|v| v.as_str()),
3748 Some("Alice")
3749 );
3750 }
3751
3752 #[test]
3753 fn test_discard_uncommitted_versions() {
3754 let store = LpgStore::new();
3755
3756 let epoch = store.new_epoch();
3757 let tx_id = TxId::new(42);
3758
3759 let node_id = store.create_node_versioned(&["Person"], epoch, tx_id);
3761 assert!(store.get_node(node_id).is_some());
3762
3763 store.discard_uncommitted_versions(tx_id);
3765
3766 assert!(store.get_node(node_id).is_none());
3768 }
3769
3770 #[test]
3773 fn test_property_index_create_and_lookup() {
3774 let store = LpgStore::new();
3775
3776 let alice = store.create_node(&["Person"]);
3778 let bob = store.create_node(&["Person"]);
3779 let charlie = store.create_node(&["Person"]);
3780
3781 store.set_node_property(alice, "city", Value::from("NYC"));
3782 store.set_node_property(bob, "city", Value::from("NYC"));
3783 store.set_node_property(charlie, "city", Value::from("LA"));
3784
3785 let nyc_people = store.find_nodes_by_property("city", &Value::from("NYC"));
3787 assert_eq!(nyc_people.len(), 2);
3788
3789 store.create_property_index("city");
3791 assert!(store.has_property_index("city"));
3792
3793 let nyc_people = store.find_nodes_by_property("city", &Value::from("NYC"));
3795 assert_eq!(nyc_people.len(), 2);
3796 assert!(nyc_people.contains(&alice));
3797 assert!(nyc_people.contains(&bob));
3798
3799 let la_people = store.find_nodes_by_property("city", &Value::from("LA"));
3800 assert_eq!(la_people.len(), 1);
3801 assert!(la_people.contains(&charlie));
3802 }
3803
3804 #[test]
3805 fn test_property_index_maintained_on_update() {
3806 let store = LpgStore::new();
3807
3808 store.create_property_index("status");
3810
3811 let node = store.create_node(&["Task"]);
3812 store.set_node_property(node, "status", Value::from("pending"));
3813
3814 let pending = store.find_nodes_by_property("status", &Value::from("pending"));
3816 assert_eq!(pending.len(), 1);
3817 assert!(pending.contains(&node));
3818
3819 store.set_node_property(node, "status", Value::from("done"));
3821
3822 let pending = store.find_nodes_by_property("status", &Value::from("pending"));
3824 assert!(pending.is_empty());
3825
3826 let done = store.find_nodes_by_property("status", &Value::from("done"));
3828 assert_eq!(done.len(), 1);
3829 assert!(done.contains(&node));
3830 }
3831
3832 #[test]
3833 fn test_property_index_maintained_on_remove() {
3834 let store = LpgStore::new();
3835
3836 store.create_property_index("tag");
3837
3838 let node = store.create_node(&["Item"]);
3839 store.set_node_property(node, "tag", Value::from("important"));
3840
3841 let found = store.find_nodes_by_property("tag", &Value::from("important"));
3843 assert_eq!(found.len(), 1);
3844
3845 store.remove_node_property(node, "tag");
3847
3848 let found = store.find_nodes_by_property("tag", &Value::from("important"));
3850 assert!(found.is_empty());
3851 }
3852
3853 #[test]
3854 fn test_property_index_drop() {
3855 let store = LpgStore::new();
3856
3857 store.create_property_index("key");
3858 assert!(store.has_property_index("key"));
3859
3860 assert!(store.drop_property_index("key"));
3861 assert!(!store.has_property_index("key"));
3862
3863 assert!(!store.drop_property_index("key"));
3865 }
3866
3867 #[test]
3868 fn test_property_index_multiple_values() {
3869 let store = LpgStore::new();
3870
3871 store.create_property_index("age");
3872
3873 let n1 = store.create_node(&["Person"]);
3875 let n2 = store.create_node(&["Person"]);
3876 let n3 = store.create_node(&["Person"]);
3877 let n4 = store.create_node(&["Person"]);
3878
3879 store.set_node_property(n1, "age", Value::from(25i64));
3880 store.set_node_property(n2, "age", Value::from(25i64));
3881 store.set_node_property(n3, "age", Value::from(30i64));
3882 store.set_node_property(n4, "age", Value::from(25i64));
3883
3884 let age_25 = store.find_nodes_by_property("age", &Value::from(25i64));
3885 assert_eq!(age_25.len(), 3);
3886
3887 let age_30 = store.find_nodes_by_property("age", &Value::from(30i64));
3888 assert_eq!(age_30.len(), 1);
3889
3890 let age_40 = store.find_nodes_by_property("age", &Value::from(40i64));
3891 assert!(age_40.is_empty());
3892 }
3893
3894 #[test]
3895 fn test_property_index_builds_from_existing_data() {
3896 let store = LpgStore::new();
3897
3898 let n1 = store.create_node(&["Person"]);
3900 let n2 = store.create_node(&["Person"]);
3901 store.set_node_property(n1, "email", Value::from("alice@example.com"));
3902 store.set_node_property(n2, "email", Value::from("bob@example.com"));
3903
3904 store.create_property_index("email");
3906
3907 let alice = store.find_nodes_by_property("email", &Value::from("alice@example.com"));
3909 assert_eq!(alice.len(), 1);
3910 assert!(alice.contains(&n1));
3911
3912 let bob = store.find_nodes_by_property("email", &Value::from("bob@example.com"));
3913 assert_eq!(bob.len(), 1);
3914 assert!(bob.contains(&n2));
3915 }
3916
3917 #[test]
3918 fn test_get_node_property_batch() {
3919 let store = LpgStore::new();
3920
3921 let n1 = store.create_node(&["Person"]);
3922 let n2 = store.create_node(&["Person"]);
3923 let n3 = store.create_node(&["Person"]);
3924
3925 store.set_node_property(n1, "age", Value::from(25i64));
3926 store.set_node_property(n2, "age", Value::from(30i64));
3927 let age_key = PropertyKey::new("age");
3930 let values = store.get_node_property_batch(&[n1, n2, n3], &age_key);
3931
3932 assert_eq!(values.len(), 3);
3933 assert_eq!(values[0], Some(Value::from(25i64)));
3934 assert_eq!(values[1], Some(Value::from(30i64)));
3935 assert_eq!(values[2], None);
3936 }
3937
3938 #[test]
3939 fn test_get_node_property_batch_empty() {
3940 let store = LpgStore::new();
3941 let key = PropertyKey::new("any");
3942
3943 let values = store.get_node_property_batch(&[], &key);
3944 assert!(values.is_empty());
3945 }
3946
3947 #[test]
3948 fn test_get_nodes_properties_batch() {
3949 let store = LpgStore::new();
3950
3951 let n1 = store.create_node(&["Person"]);
3952 let n2 = store.create_node(&["Person"]);
3953 let n3 = store.create_node(&["Person"]);
3954
3955 store.set_node_property(n1, "name", Value::from("Alice"));
3956 store.set_node_property(n1, "age", Value::from(25i64));
3957 store.set_node_property(n2, "name", Value::from("Bob"));
3958 let all_props = store.get_nodes_properties_batch(&[n1, n2, n3]);
3961
3962 assert_eq!(all_props.len(), 3);
3963 assert_eq!(all_props[0].len(), 2); assert_eq!(all_props[1].len(), 1); assert_eq!(all_props[2].len(), 0); assert_eq!(
3968 all_props[0].get(&PropertyKey::new("name")),
3969 Some(&Value::from("Alice"))
3970 );
3971 assert_eq!(
3972 all_props[1].get(&PropertyKey::new("name")),
3973 Some(&Value::from("Bob"))
3974 );
3975 }
3976
3977 #[test]
3978 fn test_get_nodes_properties_batch_empty() {
3979 let store = LpgStore::new();
3980
3981 let all_props = store.get_nodes_properties_batch(&[]);
3982 assert!(all_props.is_empty());
3983 }
3984
3985 #[test]
3986 fn test_get_nodes_properties_selective_batch() {
3987 let store = LpgStore::new();
3988
3989 let n1 = store.create_node(&["Person"]);
3990 let n2 = store.create_node(&["Person"]);
3991
3992 store.set_node_property(n1, "name", Value::from("Alice"));
3994 store.set_node_property(n1, "age", Value::from(25i64));
3995 store.set_node_property(n1, "email", Value::from("alice@example.com"));
3996 store.set_node_property(n2, "name", Value::from("Bob"));
3997 store.set_node_property(n2, "age", Value::from(30i64));
3998 store.set_node_property(n2, "city", Value::from("NYC"));
3999
4000 let keys = vec![PropertyKey::new("name"), PropertyKey::new("age")];
4002 let props = store.get_nodes_properties_selective_batch(&[n1, n2], &keys);
4003
4004 assert_eq!(props.len(), 2);
4005
4006 assert_eq!(props[0].len(), 2);
4008 assert_eq!(
4009 props[0].get(&PropertyKey::new("name")),
4010 Some(&Value::from("Alice"))
4011 );
4012 assert_eq!(
4013 props[0].get(&PropertyKey::new("age")),
4014 Some(&Value::from(25i64))
4015 );
4016 assert_eq!(props[0].get(&PropertyKey::new("email")), None);
4017
4018 assert_eq!(props[1].len(), 2);
4020 assert_eq!(
4021 props[1].get(&PropertyKey::new("name")),
4022 Some(&Value::from("Bob"))
4023 );
4024 assert_eq!(
4025 props[1].get(&PropertyKey::new("age")),
4026 Some(&Value::from(30i64))
4027 );
4028 assert_eq!(props[1].get(&PropertyKey::new("city")), None);
4029 }
4030
4031 #[test]
4032 fn test_get_nodes_properties_selective_batch_empty_keys() {
4033 let store = LpgStore::new();
4034
4035 let n1 = store.create_node(&["Person"]);
4036 store.set_node_property(n1, "name", Value::from("Alice"));
4037
4038 let props = store.get_nodes_properties_selective_batch(&[n1], &[]);
4040
4041 assert_eq!(props.len(), 1);
4042 assert!(props[0].is_empty()); }
4044
4045 #[test]
4046 fn test_get_nodes_properties_selective_batch_missing_keys() {
4047 let store = LpgStore::new();
4048
4049 let n1 = store.create_node(&["Person"]);
4050 store.set_node_property(n1, "name", Value::from("Alice"));
4051
4052 let keys = vec![PropertyKey::new("nonexistent"), PropertyKey::new("name")];
4054 let props = store.get_nodes_properties_selective_batch(&[n1], &keys);
4055
4056 assert_eq!(props.len(), 1);
4057 assert_eq!(props[0].len(), 1); assert_eq!(
4059 props[0].get(&PropertyKey::new("name")),
4060 Some(&Value::from("Alice"))
4061 );
4062 }
4063}