1use super::property::CompareOp;
13use super::{Edge, EdgeRecord, Node, NodeRecord, PropertyStorage};
14use crate::graph::Direction;
15use crate::index::adjacency::ChunkedAdjacency;
16use crate::index::zone_map::ZoneMapEntry;
17use crate::statistics::{EdgeTypeStatistics, LabelStatistics, Statistics};
18use arcstr::ArcStr;
19use dashmap::DashMap;
20#[cfg(not(feature = "tiered-storage"))]
21use grafeo_common::mvcc::VersionChain;
22use grafeo_common::types::{EdgeId, EpochId, HashableValue, NodeId, PropertyKey, TxId, Value};
23use grafeo_common::utils::hash::{FxHashMap, FxHashSet};
24use parking_lot::RwLock;
25use std::cmp::Ordering as CmpOrdering;
26#[cfg(feature = "tiered-storage")]
27use std::sync::Arc;
28use std::sync::atomic::{AtomicU64, Ordering};
29
30fn compare_values_for_range(a: &Value, b: &Value) -> Option<CmpOrdering> {
32 match (a, b) {
33 (Value::Int64(a), Value::Int64(b)) => Some(a.cmp(b)),
34 (Value::Float64(a), Value::Float64(b)) => a.partial_cmp(b),
35 (Value::String(a), Value::String(b)) => Some(a.cmp(b)),
36 (Value::Bool(a), Value::Bool(b)) => Some(a.cmp(b)),
37 _ => None,
38 }
39}
40
41fn value_in_range(
43 value: &Value,
44 min: Option<&Value>,
45 max: Option<&Value>,
46 min_inclusive: bool,
47 max_inclusive: bool,
48) -> bool {
49 if let Some(min_val) = min {
51 match compare_values_for_range(value, min_val) {
52 Some(CmpOrdering::Less) => return false,
53 Some(CmpOrdering::Equal) if !min_inclusive => return false,
54 None => return false, _ => {}
56 }
57 }
58
59 if let Some(max_val) = max {
61 match compare_values_for_range(value, max_val) {
62 Some(CmpOrdering::Greater) => return false,
63 Some(CmpOrdering::Equal) if !max_inclusive => return false,
64 None => return false,
65 _ => {}
66 }
67 }
68
69 true
70}
71
72#[cfg(feature = "tiered-storage")]
74use crate::storage::EpochStore;
75#[cfg(feature = "tiered-storage")]
76use grafeo_common::memory::arena::ArenaAllocator;
77#[cfg(feature = "tiered-storage")]
78use grafeo_common::mvcc::{ColdVersionRef, HotVersionRef, VersionIndex, VersionRef};
79
80#[derive(Debug, Clone)]
86pub struct LpgStoreConfig {
87 pub backward_edges: bool,
90 pub initial_node_capacity: usize,
92 pub initial_edge_capacity: usize,
94}
95
96impl Default for LpgStoreConfig {
97 fn default() -> Self {
98 Self {
99 backward_edges: true,
100 initial_node_capacity: 1024,
101 initial_edge_capacity: 4096,
102 }
103 }
104}
105
106pub struct LpgStore {
165 #[allow(dead_code)]
167 config: LpgStoreConfig,
168
169 #[cfg(not(feature = "tiered-storage"))]
173 nodes: RwLock<FxHashMap<NodeId, VersionChain<NodeRecord>>>,
174
175 #[cfg(not(feature = "tiered-storage"))]
179 edges: RwLock<FxHashMap<EdgeId, VersionChain<EdgeRecord>>>,
180
181 #[cfg(feature = "tiered-storage")]
185 arena_allocator: Arc<ArenaAllocator>,
186
187 #[cfg(feature = "tiered-storage")]
191 node_versions: RwLock<FxHashMap<NodeId, VersionIndex>>,
192
193 #[cfg(feature = "tiered-storage")]
197 edge_versions: RwLock<FxHashMap<EdgeId, VersionIndex>>,
198
199 #[cfg(feature = "tiered-storage")]
202 epoch_store: Arc<EpochStore>,
203
204 node_properties: PropertyStorage<NodeId>,
206
207 edge_properties: PropertyStorage<EdgeId>,
209
210 label_to_id: RwLock<FxHashMap<ArcStr, u32>>,
213
214 id_to_label: RwLock<Vec<ArcStr>>,
217
218 edge_type_to_id: RwLock<FxHashMap<ArcStr, u32>>,
221
222 id_to_edge_type: RwLock<Vec<ArcStr>>,
225
226 forward_adj: ChunkedAdjacency,
228
229 backward_adj: Option<ChunkedAdjacency>,
232
233 label_index: RwLock<Vec<FxHashMap<NodeId, ()>>>,
236
237 node_labels: RwLock<FxHashMap<NodeId, FxHashSet<u32>>>,
241
242 property_indexes: RwLock<FxHashMap<PropertyKey, DashMap<HashableValue, FxHashSet<NodeId>>>>,
248
249 next_node_id: AtomicU64,
251
252 next_edge_id: AtomicU64,
254
255 current_epoch: AtomicU64,
257
258 statistics: RwLock<Statistics>,
261}
262
263impl LpgStore {
264 #[must_use]
266 pub fn new() -> Self {
267 Self::with_config(LpgStoreConfig::default())
268 }
269
270 #[must_use]
272 pub fn with_config(config: LpgStoreConfig) -> Self {
273 let backward_adj = if config.backward_edges {
274 Some(ChunkedAdjacency::new())
275 } else {
276 None
277 };
278
279 Self {
280 #[cfg(not(feature = "tiered-storage"))]
281 nodes: RwLock::new(FxHashMap::default()),
282 #[cfg(not(feature = "tiered-storage"))]
283 edges: RwLock::new(FxHashMap::default()),
284 #[cfg(feature = "tiered-storage")]
285 arena_allocator: Arc::new(ArenaAllocator::new()),
286 #[cfg(feature = "tiered-storage")]
287 node_versions: RwLock::new(FxHashMap::default()),
288 #[cfg(feature = "tiered-storage")]
289 edge_versions: RwLock::new(FxHashMap::default()),
290 #[cfg(feature = "tiered-storage")]
291 epoch_store: Arc::new(EpochStore::new()),
292 node_properties: PropertyStorage::new(),
293 edge_properties: PropertyStorage::new(),
294 label_to_id: RwLock::new(FxHashMap::default()),
295 id_to_label: RwLock::new(Vec::new()),
296 edge_type_to_id: RwLock::new(FxHashMap::default()),
297 id_to_edge_type: RwLock::new(Vec::new()),
298 forward_adj: ChunkedAdjacency::new(),
299 backward_adj,
300 label_index: RwLock::new(Vec::new()),
301 node_labels: RwLock::new(FxHashMap::default()),
302 property_indexes: RwLock::new(FxHashMap::default()),
303 next_node_id: AtomicU64::new(0),
304 next_edge_id: AtomicU64::new(0),
305 current_epoch: AtomicU64::new(0),
306 statistics: RwLock::new(Statistics::new()),
307 config,
308 }
309 }
310
311 #[must_use]
313 pub fn current_epoch(&self) -> EpochId {
314 EpochId::new(self.current_epoch.load(Ordering::Acquire))
315 }
316
317 pub fn new_epoch(&self) -> EpochId {
319 let id = self.current_epoch.fetch_add(1, Ordering::AcqRel) + 1;
320 EpochId::new(id)
321 }
322
323 pub fn create_node(&self, labels: &[&str]) -> NodeId {
329 self.create_node_versioned(labels, self.current_epoch(), TxId::SYSTEM)
330 }
331
332 #[cfg(not(feature = "tiered-storage"))]
334 pub fn create_node_versioned(&self, labels: &[&str], epoch: EpochId, tx_id: TxId) -> NodeId {
335 let id = NodeId::new(self.next_node_id.fetch_add(1, Ordering::Relaxed));
336
337 let mut record = NodeRecord::new(id, epoch);
338 record.set_label_count(labels.len() as u16);
339
340 let mut node_label_set = FxHashSet::default();
342 for label in labels {
343 let label_id = self.get_or_create_label_id(*label);
344 node_label_set.insert(label_id);
345
346 let mut index = self.label_index.write();
348 while index.len() <= label_id as usize {
349 index.push(FxHashMap::default());
350 }
351 index[label_id as usize].insert(id, ());
352 }
353
354 self.node_labels.write().insert(id, node_label_set);
356
357 let chain = VersionChain::with_initial(record, epoch, tx_id);
359 self.nodes.write().insert(id, chain);
360 id
361 }
362
363 #[cfg(feature = "tiered-storage")]
366 pub fn create_node_versioned(&self, labels: &[&str], epoch: EpochId, tx_id: TxId) -> NodeId {
367 let id = NodeId::new(self.next_node_id.fetch_add(1, Ordering::Relaxed));
368
369 let mut record = NodeRecord::new(id, epoch);
370 record.set_label_count(labels.len() as u16);
371
372 let mut node_label_set = FxHashSet::default();
374 for label in labels {
375 let label_id = self.get_or_create_label_id(*label);
376 node_label_set.insert(label_id);
377
378 let mut index = self.label_index.write();
380 while index.len() <= label_id as usize {
381 index.push(FxHashMap::default());
382 }
383 index[label_id as usize].insert(id, ());
384 }
385
386 self.node_labels.write().insert(id, node_label_set);
388
389 let arena = self.arena_allocator.arena_or_create(epoch);
391 let (offset, _stored) = arena.alloc_value_with_offset(record);
392
393 let hot_ref = HotVersionRef::new(epoch, offset, tx_id);
395
396 let mut versions = self.node_versions.write();
398 if let Some(index) = versions.get_mut(&id) {
399 index.add_hot(hot_ref);
400 } else {
401 versions.insert(id, VersionIndex::with_initial(hot_ref));
402 }
403
404 id
405 }
406
407 pub fn create_node_with_props(
409 &self,
410 labels: &[&str],
411 properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
412 ) -> NodeId {
413 self.create_node_with_props_versioned(
414 labels,
415 properties,
416 self.current_epoch(),
417 TxId::SYSTEM,
418 )
419 }
420
421 #[cfg(not(feature = "tiered-storage"))]
423 pub fn create_node_with_props_versioned(
424 &self,
425 labels: &[&str],
426 properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
427 epoch: EpochId,
428 tx_id: TxId,
429 ) -> NodeId {
430 let id = self.create_node_versioned(labels, epoch, tx_id);
431
432 for (key, value) in properties {
433 self.node_properties.set(id, key.into(), value.into());
434 }
435
436 let count = self.node_properties.get_all(id).len() as u16;
438 if let Some(chain) = self.nodes.write().get_mut(&id) {
439 if let Some(record) = chain.latest_mut() {
440 record.props_count = count;
441 }
442 }
443
444 id
445 }
446
447 #[cfg(feature = "tiered-storage")]
450 pub fn create_node_with_props_versioned(
451 &self,
452 labels: &[&str],
453 properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
454 epoch: EpochId,
455 tx_id: TxId,
456 ) -> NodeId {
457 let id = self.create_node_versioned(labels, epoch, tx_id);
458
459 for (key, value) in properties {
460 self.node_properties.set(id, key.into(), value.into());
461 }
462
463 id
467 }
468
469 #[must_use]
471 pub fn get_node(&self, id: NodeId) -> Option<Node> {
472 self.get_node_at_epoch(id, self.current_epoch())
473 }
474
475 #[must_use]
477 #[cfg(not(feature = "tiered-storage"))]
478 pub fn get_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> Option<Node> {
479 let nodes = self.nodes.read();
480 let chain = nodes.get(&id)?;
481 let record = chain.visible_at(epoch)?;
482
483 if record.is_deleted() {
484 return None;
485 }
486
487 let mut node = Node::new(id);
488
489 let id_to_label = self.id_to_label.read();
491 let node_labels = self.node_labels.read();
492 if let Some(label_ids) = node_labels.get(&id) {
493 for &label_id in label_ids {
494 if let Some(label) = id_to_label.get(label_id as usize) {
495 node.labels.push(label.clone());
496 }
497 }
498 }
499
500 node.properties = self.node_properties.get_all(id).into_iter().collect();
502
503 Some(node)
504 }
505
506 #[must_use]
509 #[cfg(feature = "tiered-storage")]
510 pub fn get_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> Option<Node> {
511 let versions = self.node_versions.read();
512 let index = versions.get(&id)?;
513 let version_ref = index.visible_at(epoch)?;
514
515 let record = self.read_node_record(&version_ref)?;
517
518 if record.is_deleted() {
519 return None;
520 }
521
522 let mut node = Node::new(id);
523
524 let id_to_label = self.id_to_label.read();
526 let node_labels = self.node_labels.read();
527 if let Some(label_ids) = node_labels.get(&id) {
528 for &label_id in label_ids {
529 if let Some(label) = id_to_label.get(label_id as usize) {
530 node.labels.push(label.clone());
531 }
532 }
533 }
534
535 node.properties = self.node_properties.get_all(id).into_iter().collect();
537
538 Some(node)
539 }
540
541 #[must_use]
543 #[cfg(not(feature = "tiered-storage"))]
544 pub fn get_node_versioned(&self, id: NodeId, epoch: EpochId, tx_id: TxId) -> Option<Node> {
545 let nodes = self.nodes.read();
546 let chain = nodes.get(&id)?;
547 let record = chain.visible_to(epoch, tx_id)?;
548
549 if record.is_deleted() {
550 return None;
551 }
552
553 let mut node = Node::new(id);
554
555 let id_to_label = self.id_to_label.read();
557 let node_labels = self.node_labels.read();
558 if let Some(label_ids) = node_labels.get(&id) {
559 for &label_id in label_ids {
560 if let Some(label) = id_to_label.get(label_id as usize) {
561 node.labels.push(label.clone());
562 }
563 }
564 }
565
566 node.properties = self.node_properties.get_all(id).into_iter().collect();
568
569 Some(node)
570 }
571
572 #[must_use]
575 #[cfg(feature = "tiered-storage")]
576 pub fn get_node_versioned(&self, id: NodeId, epoch: EpochId, tx_id: TxId) -> Option<Node> {
577 let versions = self.node_versions.read();
578 let index = versions.get(&id)?;
579 let version_ref = index.visible_to(epoch, tx_id)?;
580
581 let record = self.read_node_record(&version_ref)?;
583
584 if record.is_deleted() {
585 return None;
586 }
587
588 let mut node = Node::new(id);
589
590 let id_to_label = self.id_to_label.read();
592 let node_labels = self.node_labels.read();
593 if let Some(label_ids) = node_labels.get(&id) {
594 for &label_id in label_ids {
595 if let Some(label) = id_to_label.get(label_id as usize) {
596 node.labels.push(label.clone());
597 }
598 }
599 }
600
601 node.properties = self.node_properties.get_all(id).into_iter().collect();
603
604 Some(node)
605 }
606
607 #[cfg(feature = "tiered-storage")]
609 #[allow(unsafe_code)]
610 fn read_node_record(&self, version_ref: &VersionRef) -> Option<NodeRecord> {
611 match version_ref {
612 VersionRef::Hot(hot_ref) => {
613 let arena = self.arena_allocator.arena(hot_ref.epoch);
614 let record: &NodeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
616 Some(record.clone())
617 }
618 VersionRef::Cold(cold_ref) => {
619 self.epoch_store
621 .get_node(cold_ref.epoch, cold_ref.block_offset, cold_ref.length)
622 }
623 }
624 }
625
626 pub fn delete_node(&self, id: NodeId) -> bool {
628 self.delete_node_at_epoch(id, self.current_epoch())
629 }
630
631 #[cfg(not(feature = "tiered-storage"))]
633 pub fn delete_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> bool {
634 let mut nodes = self.nodes.write();
635 if let Some(chain) = nodes.get_mut(&id) {
636 if let Some(record) = chain.visible_at(epoch) {
638 if record.is_deleted() {
639 return false;
640 }
641 } else {
642 return false;
644 }
645
646 chain.mark_deleted(epoch);
648
649 let mut index = self.label_index.write();
651 let mut node_labels = self.node_labels.write();
652 if let Some(label_ids) = node_labels.remove(&id) {
653 for label_id in label_ids {
654 if let Some(set) = index.get_mut(label_id as usize) {
655 set.remove(&id);
656 }
657 }
658 }
659
660 drop(nodes); drop(index);
663 drop(node_labels);
664 self.node_properties.remove_all(id);
665
666 true
669 } else {
670 false
671 }
672 }
673
674 #[cfg(feature = "tiered-storage")]
677 pub fn delete_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> bool {
678 let mut versions = self.node_versions.write();
679 if let Some(index) = versions.get_mut(&id) {
680 if let Some(version_ref) = index.visible_at(epoch) {
682 if let Some(record) = self.read_node_record(&version_ref) {
683 if record.is_deleted() {
684 return false;
685 }
686 } else {
687 return false;
688 }
689 } else {
690 return false;
691 }
692
693 index.mark_deleted(epoch);
695
696 let mut label_index = self.label_index.write();
698 let mut node_labels = self.node_labels.write();
699 if let Some(label_ids) = node_labels.remove(&id) {
700 for label_id in label_ids {
701 if let Some(set) = label_index.get_mut(label_id as usize) {
702 set.remove(&id);
703 }
704 }
705 }
706
707 drop(versions);
709 drop(label_index);
710 drop(node_labels);
711 self.node_properties.remove_all(id);
712
713 true
714 } else {
715 false
716 }
717 }
718
719 #[cfg(not(feature = "tiered-storage"))]
724 pub fn delete_node_edges(&self, node_id: NodeId) {
725 let outgoing: Vec<EdgeId> = self
727 .forward_adj
728 .edges_from(node_id)
729 .into_iter()
730 .map(|(_, edge_id)| edge_id)
731 .collect();
732
733 let incoming: Vec<EdgeId> = if let Some(ref backward) = self.backward_adj {
735 backward
736 .edges_from(node_id)
737 .into_iter()
738 .map(|(_, edge_id)| edge_id)
739 .collect()
740 } else {
741 let epoch = self.current_epoch();
743 self.edges
744 .read()
745 .iter()
746 .filter_map(|(id, chain)| {
747 chain.visible_at(epoch).and_then(|r| {
748 if !r.is_deleted() && r.dst == node_id {
749 Some(*id)
750 } else {
751 None
752 }
753 })
754 })
755 .collect()
756 };
757
758 for edge_id in outgoing.into_iter().chain(incoming) {
760 self.delete_edge(edge_id);
761 }
762 }
763
764 #[cfg(feature = "tiered-storage")]
767 pub fn delete_node_edges(&self, node_id: NodeId) {
768 let outgoing: Vec<EdgeId> = self
770 .forward_adj
771 .edges_from(node_id)
772 .into_iter()
773 .map(|(_, edge_id)| edge_id)
774 .collect();
775
776 let incoming: Vec<EdgeId> = if let Some(ref backward) = self.backward_adj {
778 backward
779 .edges_from(node_id)
780 .into_iter()
781 .map(|(_, edge_id)| edge_id)
782 .collect()
783 } else {
784 let epoch = self.current_epoch();
786 let versions = self.edge_versions.read();
787 versions
788 .iter()
789 .filter_map(|(id, index)| {
790 index.visible_at(epoch).and_then(|vref| {
791 self.read_edge_record(&vref).and_then(|r| {
792 if !r.is_deleted() && r.dst == node_id {
793 Some(*id)
794 } else {
795 None
796 }
797 })
798 })
799 })
800 .collect()
801 };
802
803 for edge_id in outgoing.into_iter().chain(incoming) {
805 self.delete_edge(edge_id);
806 }
807 }
808
809 #[cfg(not(feature = "tiered-storage"))]
811 pub fn set_node_property(&self, id: NodeId, key: &str, value: Value) {
812 let prop_key: PropertyKey = key.into();
813
814 self.update_property_index_on_set(id, &prop_key, &value);
816
817 self.node_properties.set(id, prop_key, value);
818
819 let count = self.node_properties.get_all(id).len() as u16;
821 if let Some(chain) = self.nodes.write().get_mut(&id) {
822 if let Some(record) = chain.latest_mut() {
823 record.props_count = count;
824 }
825 }
826 }
827
828 #[cfg(feature = "tiered-storage")]
831 pub fn set_node_property(&self, id: NodeId, key: &str, value: Value) {
832 let prop_key: PropertyKey = key.into();
833
834 self.update_property_index_on_set(id, &prop_key, &value);
836
837 self.node_properties.set(id, prop_key, value);
838 }
842
843 pub fn set_edge_property(&self, id: EdgeId, key: &str, value: Value) {
845 self.edge_properties.set(id, key.into(), value);
846 }
847
848 #[cfg(not(feature = "tiered-storage"))]
852 pub fn remove_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
853 let prop_key: PropertyKey = key.into();
854
855 self.update_property_index_on_remove(id, &prop_key);
857
858 let result = self.node_properties.remove(id, &prop_key);
859
860 let count = self.node_properties.get_all(id).len() as u16;
862 if let Some(chain) = self.nodes.write().get_mut(&id) {
863 if let Some(record) = chain.latest_mut() {
864 record.props_count = count;
865 }
866 }
867
868 result
869 }
870
871 #[cfg(feature = "tiered-storage")]
874 pub fn remove_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
875 let prop_key: PropertyKey = key.into();
876
877 self.update_property_index_on_remove(id, &prop_key);
879
880 self.node_properties.remove(id, &prop_key)
881 }
883
884 pub fn remove_edge_property(&self, id: EdgeId, key: &str) -> Option<Value> {
888 self.edge_properties.remove(id, &key.into())
889 }
890
891 #[must_use]
906 pub fn get_node_property(&self, id: NodeId, key: &PropertyKey) -> Option<Value> {
907 self.node_properties.get(id, key)
908 }
909
910 #[must_use]
914 pub fn get_edge_property(&self, id: EdgeId, key: &PropertyKey) -> Option<Value> {
915 self.edge_properties.get(id, key)
916 }
917
918 #[must_use]
941 pub fn get_node_property_batch(&self, ids: &[NodeId], key: &PropertyKey) -> Vec<Option<Value>> {
942 self.node_properties.get_batch(ids, key)
943 }
944
945 #[must_use]
950 pub fn get_nodes_properties_batch(&self, ids: &[NodeId]) -> Vec<FxHashMap<PropertyKey, Value>> {
951 self.node_properties.get_all_batch(ids)
952 }
953
954 #[must_use]
982 pub fn get_nodes_properties_selective_batch(
983 &self,
984 ids: &[NodeId],
985 keys: &[PropertyKey],
986 ) -> Vec<FxHashMap<PropertyKey, Value>> {
987 self.node_properties.get_selective_batch(ids, keys)
988 }
989
990 #[must_use]
994 pub fn get_edges_properties_selective_batch(
995 &self,
996 ids: &[EdgeId],
997 keys: &[PropertyKey],
998 ) -> Vec<FxHashMap<PropertyKey, Value>> {
999 self.edge_properties.get_selective_batch(ids, keys)
1000 }
1001
1002 #[must_use]
1038 pub fn find_nodes_in_range(
1039 &self,
1040 property: &str,
1041 min: Option<&Value>,
1042 max: Option<&Value>,
1043 min_inclusive: bool,
1044 max_inclusive: bool,
1045 ) -> Vec<NodeId> {
1046 let key = PropertyKey::new(property);
1047
1048 if !self
1050 .node_properties
1051 .might_match_range(&key, min, max, min_inclusive, max_inclusive)
1052 {
1053 return Vec::new();
1054 }
1055
1056 self.node_ids()
1058 .into_iter()
1059 .filter(|&node_id| {
1060 self.node_properties
1061 .get(node_id, &key)
1062 .is_some_and(|v| value_in_range(&v, min, max, min_inclusive, max_inclusive))
1063 })
1064 .collect()
1065 }
1066
1067 #[must_use]
1092 pub fn find_nodes_by_properties(&self, conditions: &[(&str, Value)]) -> Vec<NodeId> {
1093 if conditions.is_empty() {
1094 return self.node_ids();
1095 }
1096
1097 let mut best_start: Option<(usize, Vec<NodeId>)> = None;
1100 let indexes = self.property_indexes.read();
1101
1102 for (i, (prop, value)) in conditions.iter().enumerate() {
1103 let key = PropertyKey::new(*prop);
1104 let hv = HashableValue::new(value.clone());
1105
1106 if let Some(index) = indexes.get(&key) {
1107 let matches: Vec<NodeId> = index
1108 .get(&hv)
1109 .map(|nodes| nodes.iter().copied().collect())
1110 .unwrap_or_default();
1111
1112 if matches.is_empty() {
1114 return Vec::new();
1115 }
1116
1117 if best_start
1119 .as_ref()
1120 .is_none_or(|(_, best)| matches.len() < best.len())
1121 {
1122 best_start = Some((i, matches));
1123 }
1124 }
1125 }
1126 drop(indexes);
1127
1128 let (start_idx, mut candidates) = best_start.unwrap_or_else(|| {
1130 let (prop, value) = &conditions[0];
1132 (0, self.find_nodes_by_property(prop, value))
1133 });
1134
1135 for (i, (prop, value)) in conditions.iter().enumerate() {
1137 if i == start_idx {
1138 continue;
1139 }
1140
1141 let key = PropertyKey::new(*prop);
1142 candidates.retain(|&node_id| {
1143 self.node_properties
1144 .get(node_id, &key)
1145 .is_some_and(|v| v == *value)
1146 });
1147
1148 if candidates.is_empty() {
1150 return Vec::new();
1151 }
1152 }
1153
1154 candidates
1155 }
1156
1157 pub fn create_property_index(&self, property: &str) {
1185 let key = PropertyKey::new(property);
1186
1187 let mut indexes = self.property_indexes.write();
1188 if indexes.contains_key(&key) {
1189 return; }
1191
1192 let index: DashMap<HashableValue, FxHashSet<NodeId>> = DashMap::new();
1194
1195 for node_id in self.node_ids() {
1197 if let Some(value) = self.node_properties.get(node_id, &key) {
1198 let hv = HashableValue::new(value);
1199 index
1200 .entry(hv)
1201 .or_insert_with(FxHashSet::default)
1202 .insert(node_id);
1203 }
1204 }
1205
1206 indexes.insert(key, index);
1207 }
1208
1209 pub fn drop_property_index(&self, property: &str) -> bool {
1213 let key = PropertyKey::new(property);
1214 self.property_indexes.write().remove(&key).is_some()
1215 }
1216
1217 #[must_use]
1219 pub fn has_property_index(&self, property: &str) -> bool {
1220 let key = PropertyKey::new(property);
1221 self.property_indexes.read().contains_key(&key)
1222 }
1223
1224 #[must_use]
1247 pub fn find_nodes_by_property(&self, property: &str, value: &Value) -> Vec<NodeId> {
1248 let key = PropertyKey::new(property);
1249 let hv = HashableValue::new(value.clone());
1250
1251 let indexes = self.property_indexes.read();
1253 if let Some(index) = indexes.get(&key) {
1254 if let Some(nodes) = index.get(&hv) {
1255 return nodes.iter().copied().collect();
1256 }
1257 return Vec::new();
1258 }
1259 drop(indexes);
1260
1261 self.node_ids()
1263 .into_iter()
1264 .filter(|&node_id| {
1265 self.node_properties
1266 .get(node_id, &key)
1267 .is_some_and(|v| v == *value)
1268 })
1269 .collect()
1270 }
1271
1272 fn update_property_index_on_set(&self, node_id: NodeId, key: &PropertyKey, new_value: &Value) {
1274 let indexes = self.property_indexes.read();
1275 if let Some(index) = indexes.get(key) {
1276 if let Some(old_value) = self.node_properties.get(node_id, key) {
1278 let old_hv = HashableValue::new(old_value);
1279 if let Some(mut nodes) = index.get_mut(&old_hv) {
1280 nodes.remove(&node_id);
1281 if nodes.is_empty() {
1282 drop(nodes);
1283 index.remove(&old_hv);
1284 }
1285 }
1286 }
1287
1288 let new_hv = HashableValue::new(new_value.clone());
1290 index
1291 .entry(new_hv)
1292 .or_insert_with(FxHashSet::default)
1293 .insert(node_id);
1294 }
1295 }
1296
1297 fn update_property_index_on_remove(&self, node_id: NodeId, key: &PropertyKey) {
1299 let indexes = self.property_indexes.read();
1300 if let Some(index) = indexes.get(key) {
1301 if let Some(old_value) = self.node_properties.get(node_id, key) {
1303 let old_hv = HashableValue::new(old_value);
1304 if let Some(mut nodes) = index.get_mut(&old_hv) {
1305 nodes.remove(&node_id);
1306 if nodes.is_empty() {
1307 drop(nodes);
1308 index.remove(&old_hv);
1309 }
1310 }
1311 }
1312 }
1313 }
1314
1315 #[cfg(not(feature = "tiered-storage"))]
1320 pub fn add_label(&self, node_id: NodeId, label: &str) -> bool {
1321 let epoch = self.current_epoch();
1322
1323 let nodes = self.nodes.read();
1325 if let Some(chain) = nodes.get(&node_id) {
1326 if chain.visible_at(epoch).map_or(true, |r| r.is_deleted()) {
1327 return false;
1328 }
1329 } else {
1330 return false;
1331 }
1332 drop(nodes);
1333
1334 let label_id = self.get_or_create_label_id(label);
1336
1337 let mut node_labels = self.node_labels.write();
1339 let label_set = node_labels
1340 .entry(node_id)
1341 .or_insert_with(FxHashSet::default);
1342
1343 if label_set.contains(&label_id) {
1344 return false; }
1346
1347 label_set.insert(label_id);
1348 drop(node_labels);
1349
1350 let mut index = self.label_index.write();
1352 if (label_id as usize) >= index.len() {
1353 index.resize(label_id as usize + 1, FxHashMap::default());
1354 }
1355 index[label_id as usize].insert(node_id, ());
1356
1357 if let Some(chain) = self.nodes.write().get_mut(&node_id) {
1359 if let Some(record) = chain.latest_mut() {
1360 let count = self.node_labels.read().get(&node_id).map_or(0, |s| s.len());
1361 record.set_label_count(count as u16);
1362 }
1363 }
1364
1365 true
1366 }
1367
1368 #[cfg(feature = "tiered-storage")]
1371 pub fn add_label(&self, node_id: NodeId, label: &str) -> bool {
1372 let epoch = self.current_epoch();
1373
1374 let versions = self.node_versions.read();
1376 if let Some(index) = versions.get(&node_id) {
1377 if let Some(vref) = index.visible_at(epoch) {
1378 if let Some(record) = self.read_node_record(&vref) {
1379 if record.is_deleted() {
1380 return false;
1381 }
1382 } else {
1383 return false;
1384 }
1385 } else {
1386 return false;
1387 }
1388 } else {
1389 return false;
1390 }
1391 drop(versions);
1392
1393 let label_id = self.get_or_create_label_id(label);
1395
1396 let mut node_labels = self.node_labels.write();
1398 let label_set = node_labels
1399 .entry(node_id)
1400 .or_insert_with(FxHashSet::default);
1401
1402 if label_set.contains(&label_id) {
1403 return false; }
1405
1406 label_set.insert(label_id);
1407 drop(node_labels);
1408
1409 let mut index = self.label_index.write();
1411 if (label_id as usize) >= index.len() {
1412 index.resize(label_id as usize + 1, FxHashMap::default());
1413 }
1414 index[label_id as usize].insert(node_id, ());
1415
1416 true
1420 }
1421
1422 #[cfg(not(feature = "tiered-storage"))]
1427 pub fn remove_label(&self, node_id: NodeId, label: &str) -> bool {
1428 let epoch = self.current_epoch();
1429
1430 let nodes = self.nodes.read();
1432 if let Some(chain) = nodes.get(&node_id) {
1433 if chain.visible_at(epoch).map_or(true, |r| r.is_deleted()) {
1434 return false;
1435 }
1436 } else {
1437 return false;
1438 }
1439 drop(nodes);
1440
1441 let label_id = {
1443 let label_ids = self.label_to_id.read();
1444 match label_ids.get(label) {
1445 Some(&id) => id,
1446 None => return false, }
1448 };
1449
1450 let mut node_labels = self.node_labels.write();
1452 if let Some(label_set) = node_labels.get_mut(&node_id) {
1453 if !label_set.remove(&label_id) {
1454 return false; }
1456 } else {
1457 return false;
1458 }
1459 drop(node_labels);
1460
1461 let mut index = self.label_index.write();
1463 if (label_id as usize) < index.len() {
1464 index[label_id as usize].remove(&node_id);
1465 }
1466
1467 if let Some(chain) = self.nodes.write().get_mut(&node_id) {
1469 if let Some(record) = chain.latest_mut() {
1470 let count = self.node_labels.read().get(&node_id).map_or(0, |s| s.len());
1471 record.set_label_count(count as u16);
1472 }
1473 }
1474
1475 true
1476 }
1477
1478 #[cfg(feature = "tiered-storage")]
1481 pub fn remove_label(&self, node_id: NodeId, label: &str) -> bool {
1482 let epoch = self.current_epoch();
1483
1484 let versions = self.node_versions.read();
1486 if let Some(index) = versions.get(&node_id) {
1487 if let Some(vref) = index.visible_at(epoch) {
1488 if let Some(record) = self.read_node_record(&vref) {
1489 if record.is_deleted() {
1490 return false;
1491 }
1492 } else {
1493 return false;
1494 }
1495 } else {
1496 return false;
1497 }
1498 } else {
1499 return false;
1500 }
1501 drop(versions);
1502
1503 let label_id = {
1505 let label_ids = self.label_to_id.read();
1506 match label_ids.get(label) {
1507 Some(&id) => id,
1508 None => return false, }
1510 };
1511
1512 let mut node_labels = self.node_labels.write();
1514 if let Some(label_set) = node_labels.get_mut(&node_id) {
1515 if !label_set.remove(&label_id) {
1516 return false; }
1518 } else {
1519 return false;
1520 }
1521 drop(node_labels);
1522
1523 let mut index = self.label_index.write();
1525 if (label_id as usize) < index.len() {
1526 index[label_id as usize].remove(&node_id);
1527 }
1528
1529 true
1532 }
1533
1534 #[must_use]
1536 #[cfg(not(feature = "tiered-storage"))]
1537 pub fn node_count(&self) -> usize {
1538 let epoch = self.current_epoch();
1539 self.nodes
1540 .read()
1541 .values()
1542 .filter_map(|chain| chain.visible_at(epoch))
1543 .filter(|r| !r.is_deleted())
1544 .count()
1545 }
1546
1547 #[must_use]
1550 #[cfg(feature = "tiered-storage")]
1551 pub fn node_count(&self) -> usize {
1552 let epoch = self.current_epoch();
1553 let versions = self.node_versions.read();
1554 versions
1555 .iter()
1556 .filter(|(_, index)| {
1557 index.visible_at(epoch).map_or(false, |vref| {
1558 self.read_node_record(&vref)
1559 .map_or(false, |r| !r.is_deleted())
1560 })
1561 })
1562 .count()
1563 }
1564
1565 #[must_use]
1571 #[cfg(not(feature = "tiered-storage"))]
1572 pub fn node_ids(&self) -> Vec<NodeId> {
1573 let epoch = self.current_epoch();
1574 let mut ids: Vec<NodeId> = self
1575 .nodes
1576 .read()
1577 .iter()
1578 .filter_map(|(id, chain)| {
1579 chain
1580 .visible_at(epoch)
1581 .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
1582 })
1583 .collect();
1584 ids.sort_unstable();
1585 ids
1586 }
1587
1588 #[must_use]
1591 #[cfg(feature = "tiered-storage")]
1592 pub fn node_ids(&self) -> Vec<NodeId> {
1593 let epoch = self.current_epoch();
1594 let versions = self.node_versions.read();
1595 let mut ids: Vec<NodeId> = versions
1596 .iter()
1597 .filter_map(|(id, index)| {
1598 index.visible_at(epoch).and_then(|vref| {
1599 self.read_node_record(&vref)
1600 .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
1601 })
1602 })
1603 .collect();
1604 ids.sort_unstable();
1605 ids
1606 }
1607
1608 pub fn create_edge(&self, src: NodeId, dst: NodeId, edge_type: &str) -> EdgeId {
1612 self.create_edge_versioned(src, dst, edge_type, self.current_epoch(), TxId::SYSTEM)
1613 }
1614
1615 #[cfg(not(feature = "tiered-storage"))]
1617 pub fn create_edge_versioned(
1618 &self,
1619 src: NodeId,
1620 dst: NodeId,
1621 edge_type: &str,
1622 epoch: EpochId,
1623 tx_id: TxId,
1624 ) -> EdgeId {
1625 let id = EdgeId::new(self.next_edge_id.fetch_add(1, Ordering::Relaxed));
1626 let type_id = self.get_or_create_edge_type_id(edge_type);
1627
1628 let record = EdgeRecord::new(id, src, dst, type_id, epoch);
1629 let chain = VersionChain::with_initial(record, epoch, tx_id);
1630 self.edges.write().insert(id, chain);
1631
1632 self.forward_adj.add_edge(src, dst, id);
1634 if let Some(ref backward) = self.backward_adj {
1635 backward.add_edge(dst, src, id);
1636 }
1637
1638 id
1639 }
1640
1641 #[cfg(feature = "tiered-storage")]
1644 pub fn create_edge_versioned(
1645 &self,
1646 src: NodeId,
1647 dst: NodeId,
1648 edge_type: &str,
1649 epoch: EpochId,
1650 tx_id: TxId,
1651 ) -> EdgeId {
1652 let id = EdgeId::new(self.next_edge_id.fetch_add(1, Ordering::Relaxed));
1653 let type_id = self.get_or_create_edge_type_id(edge_type);
1654
1655 let record = EdgeRecord::new(id, src, dst, type_id, epoch);
1656
1657 let arena = self.arena_allocator.arena_or_create(epoch);
1659 let (offset, _stored) = arena.alloc_value_with_offset(record);
1660
1661 let hot_ref = HotVersionRef::new(epoch, offset, tx_id);
1663
1664 let mut versions = self.edge_versions.write();
1666 if let Some(index) = versions.get_mut(&id) {
1667 index.add_hot(hot_ref);
1668 } else {
1669 versions.insert(id, VersionIndex::with_initial(hot_ref));
1670 }
1671
1672 self.forward_adj.add_edge(src, dst, id);
1674 if let Some(ref backward) = self.backward_adj {
1675 backward.add_edge(dst, src, id);
1676 }
1677
1678 id
1679 }
1680
1681 pub fn create_edge_with_props(
1683 &self,
1684 src: NodeId,
1685 dst: NodeId,
1686 edge_type: &str,
1687 properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
1688 ) -> EdgeId {
1689 let id = self.create_edge(src, dst, edge_type);
1690
1691 for (key, value) in properties {
1692 self.edge_properties.set(id, key.into(), value.into());
1693 }
1694
1695 id
1696 }
1697
1698 #[must_use]
1700 pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
1701 self.get_edge_at_epoch(id, self.current_epoch())
1702 }
1703
1704 #[must_use]
1706 #[cfg(not(feature = "tiered-storage"))]
1707 pub fn get_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> Option<Edge> {
1708 let edges = self.edges.read();
1709 let chain = edges.get(&id)?;
1710 let record = chain.visible_at(epoch)?;
1711
1712 if record.is_deleted() {
1713 return None;
1714 }
1715
1716 let edge_type = {
1717 let id_to_type = self.id_to_edge_type.read();
1718 id_to_type.get(record.type_id as usize)?.clone()
1719 };
1720
1721 let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1722
1723 edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1725
1726 Some(edge)
1727 }
1728
1729 #[must_use]
1732 #[cfg(feature = "tiered-storage")]
1733 pub fn get_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> Option<Edge> {
1734 let versions = self.edge_versions.read();
1735 let index = versions.get(&id)?;
1736 let version_ref = index.visible_at(epoch)?;
1737
1738 let record = self.read_edge_record(&version_ref)?;
1739
1740 if record.is_deleted() {
1741 return None;
1742 }
1743
1744 let edge_type = {
1745 let id_to_type = self.id_to_edge_type.read();
1746 id_to_type.get(record.type_id as usize)?.clone()
1747 };
1748
1749 let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1750
1751 edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1753
1754 Some(edge)
1755 }
1756
1757 #[must_use]
1759 #[cfg(not(feature = "tiered-storage"))]
1760 pub fn get_edge_versioned(&self, id: EdgeId, epoch: EpochId, tx_id: TxId) -> Option<Edge> {
1761 let edges = self.edges.read();
1762 let chain = edges.get(&id)?;
1763 let record = chain.visible_to(epoch, tx_id)?;
1764
1765 if record.is_deleted() {
1766 return None;
1767 }
1768
1769 let edge_type = {
1770 let id_to_type = self.id_to_edge_type.read();
1771 id_to_type.get(record.type_id as usize)?.clone()
1772 };
1773
1774 let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1775
1776 edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1778
1779 Some(edge)
1780 }
1781
1782 #[must_use]
1785 #[cfg(feature = "tiered-storage")]
1786 pub fn get_edge_versioned(&self, id: EdgeId, epoch: EpochId, tx_id: TxId) -> Option<Edge> {
1787 let versions = self.edge_versions.read();
1788 let index = versions.get(&id)?;
1789 let version_ref = index.visible_to(epoch, tx_id)?;
1790
1791 let record = self.read_edge_record(&version_ref)?;
1792
1793 if record.is_deleted() {
1794 return None;
1795 }
1796
1797 let edge_type = {
1798 let id_to_type = self.id_to_edge_type.read();
1799 id_to_type.get(record.type_id as usize)?.clone()
1800 };
1801
1802 let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1803
1804 edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1806
1807 Some(edge)
1808 }
1809
1810 #[cfg(feature = "tiered-storage")]
1812 #[allow(unsafe_code)]
1813 fn read_edge_record(&self, version_ref: &VersionRef) -> Option<EdgeRecord> {
1814 match version_ref {
1815 VersionRef::Hot(hot_ref) => {
1816 let arena = self.arena_allocator.arena(hot_ref.epoch);
1817 let record: &EdgeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
1819 Some(record.clone())
1820 }
1821 VersionRef::Cold(cold_ref) => {
1822 self.epoch_store
1824 .get_edge(cold_ref.epoch, cold_ref.block_offset, cold_ref.length)
1825 }
1826 }
1827 }
1828
1829 pub fn delete_edge(&self, id: EdgeId) -> bool {
1831 self.delete_edge_at_epoch(id, self.current_epoch())
1832 }
1833
1834 #[cfg(not(feature = "tiered-storage"))]
1836 pub fn delete_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> bool {
1837 let mut edges = self.edges.write();
1838 if let Some(chain) = edges.get_mut(&id) {
1839 let (src, dst) = {
1841 match chain.visible_at(epoch) {
1842 Some(record) => {
1843 if record.is_deleted() {
1844 return false;
1845 }
1846 (record.src, record.dst)
1847 }
1848 None => return false, }
1850 };
1851
1852 chain.mark_deleted(epoch);
1854
1855 drop(edges); self.forward_adj.mark_deleted(src, id);
1859 if let Some(ref backward) = self.backward_adj {
1860 backward.mark_deleted(dst, id);
1861 }
1862
1863 self.edge_properties.remove_all(id);
1865
1866 true
1867 } else {
1868 false
1869 }
1870 }
1871
1872 #[cfg(feature = "tiered-storage")]
1875 pub fn delete_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> bool {
1876 let mut versions = self.edge_versions.write();
1877 if let Some(index) = versions.get_mut(&id) {
1878 let (src, dst) = {
1880 match index.visible_at(epoch) {
1881 Some(version_ref) => {
1882 if let Some(record) = self.read_edge_record(&version_ref) {
1883 if record.is_deleted() {
1884 return false;
1885 }
1886 (record.src, record.dst)
1887 } else {
1888 return false;
1889 }
1890 }
1891 None => return false,
1892 }
1893 };
1894
1895 index.mark_deleted(epoch);
1897
1898 drop(versions); self.forward_adj.mark_deleted(src, id);
1902 if let Some(ref backward) = self.backward_adj {
1903 backward.mark_deleted(dst, id);
1904 }
1905
1906 self.edge_properties.remove_all(id);
1908
1909 true
1910 } else {
1911 false
1912 }
1913 }
1914
1915 #[must_use]
1917 #[cfg(not(feature = "tiered-storage"))]
1918 pub fn edge_count(&self) -> usize {
1919 let epoch = self.current_epoch();
1920 self.edges
1921 .read()
1922 .values()
1923 .filter_map(|chain| chain.visible_at(epoch))
1924 .filter(|r| !r.is_deleted())
1925 .count()
1926 }
1927
1928 #[must_use]
1931 #[cfg(feature = "tiered-storage")]
1932 pub fn edge_count(&self) -> usize {
1933 let epoch = self.current_epoch();
1934 let versions = self.edge_versions.read();
1935 versions
1936 .iter()
1937 .filter(|(_, index)| {
1938 index.visible_at(epoch).map_or(false, |vref| {
1939 self.read_edge_record(&vref)
1940 .map_or(false, |r| !r.is_deleted())
1941 })
1942 })
1943 .count()
1944 }
1945
1946 #[cfg(not(feature = "tiered-storage"))]
1951 pub fn discard_uncommitted_versions(&self, tx_id: TxId) {
1952 {
1954 let mut nodes = self.nodes.write();
1955 for chain in nodes.values_mut() {
1956 chain.remove_versions_by(tx_id);
1957 }
1958 nodes.retain(|_, chain| !chain.is_empty());
1960 }
1961
1962 {
1964 let mut edges = self.edges.write();
1965 for chain in edges.values_mut() {
1966 chain.remove_versions_by(tx_id);
1967 }
1968 edges.retain(|_, chain| !chain.is_empty());
1970 }
1971 }
1972
1973 #[cfg(feature = "tiered-storage")]
1976 pub fn discard_uncommitted_versions(&self, tx_id: TxId) {
1977 {
1979 let mut versions = self.node_versions.write();
1980 for index in versions.values_mut() {
1981 index.remove_versions_by(tx_id);
1982 }
1983 versions.retain(|_, index| !index.is_empty());
1985 }
1986
1987 {
1989 let mut versions = self.edge_versions.write();
1990 for index in versions.values_mut() {
1991 index.remove_versions_by(tx_id);
1992 }
1993 versions.retain(|_, index| !index.is_empty());
1995 }
1996 }
1997
1998 #[cfg(feature = "tiered-storage")]
2017 #[allow(unsafe_code)]
2018 pub fn freeze_epoch(&self, epoch: EpochId) -> usize {
2019 let mut node_records: Vec<(u64, NodeRecord)> = Vec::new();
2021 let mut node_hot_refs: Vec<(NodeId, HotVersionRef)> = Vec::new();
2022
2023 {
2024 let versions = self.node_versions.read();
2025 for (node_id, index) in versions.iter() {
2026 for hot_ref in index.hot_refs_for_epoch(epoch) {
2027 let arena = self.arena_allocator.arena(hot_ref.epoch);
2028 let record: &NodeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
2030 node_records.push((node_id.as_u64(), record.clone()));
2031 node_hot_refs.push((*node_id, *hot_ref));
2032 }
2033 }
2034 }
2035
2036 let mut edge_records: Vec<(u64, EdgeRecord)> = Vec::new();
2038 let mut edge_hot_refs: Vec<(EdgeId, HotVersionRef)> = Vec::new();
2039
2040 {
2041 let versions = self.edge_versions.read();
2042 for (edge_id, index) in versions.iter() {
2043 for hot_ref in index.hot_refs_for_epoch(epoch) {
2044 let arena = self.arena_allocator.arena(hot_ref.epoch);
2045 let record: &EdgeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
2047 edge_records.push((edge_id.as_u64(), record.clone()));
2048 edge_hot_refs.push((*edge_id, *hot_ref));
2049 }
2050 }
2051 }
2052
2053 let total_frozen = node_records.len() + edge_records.len();
2054
2055 if total_frozen == 0 {
2056 return 0;
2057 }
2058
2059 let (node_entries, edge_entries) =
2061 self.epoch_store
2062 .freeze_epoch(epoch, node_records, edge_records);
2063
2064 let node_entry_map: FxHashMap<u64, _> = node_entries
2066 .iter()
2067 .map(|e| (e.entity_id, (e.offset, e.length)))
2068 .collect();
2069 let edge_entry_map: FxHashMap<u64, _> = edge_entries
2070 .iter()
2071 .map(|e| (e.entity_id, (e.offset, e.length)))
2072 .collect();
2073
2074 {
2076 let mut versions = self.node_versions.write();
2077 for (node_id, hot_ref) in &node_hot_refs {
2078 if let Some(index) = versions.get_mut(node_id) {
2079 if let Some(&(offset, length)) = node_entry_map.get(&node_id.as_u64()) {
2080 let cold_ref = ColdVersionRef {
2081 epoch,
2082 block_offset: offset,
2083 length,
2084 created_by: hot_ref.created_by,
2085 deleted_epoch: hot_ref.deleted_epoch,
2086 };
2087 index.freeze_epoch(epoch, std::iter::once(cold_ref));
2088 }
2089 }
2090 }
2091 }
2092
2093 {
2094 let mut versions = self.edge_versions.write();
2095 for (edge_id, hot_ref) in &edge_hot_refs {
2096 if let Some(index) = versions.get_mut(edge_id) {
2097 if let Some(&(offset, length)) = edge_entry_map.get(&edge_id.as_u64()) {
2098 let cold_ref = ColdVersionRef {
2099 epoch,
2100 block_offset: offset,
2101 length,
2102 created_by: hot_ref.created_by,
2103 deleted_epoch: hot_ref.deleted_epoch,
2104 };
2105 index.freeze_epoch(epoch, std::iter::once(cold_ref));
2106 }
2107 }
2108 }
2109 }
2110
2111 total_frozen
2112 }
2113
2114 #[cfg(feature = "tiered-storage")]
2116 #[must_use]
2117 pub fn epoch_store(&self) -> &EpochStore {
2118 &self.epoch_store
2119 }
2120
2121 #[must_use]
2123 pub fn label_count(&self) -> usize {
2124 self.id_to_label.read().len()
2125 }
2126
2127 #[must_use]
2131 pub fn property_key_count(&self) -> usize {
2132 let node_keys = self.node_properties.column_count();
2133 let edge_keys = self.edge_properties.column_count();
2134 node_keys + edge_keys
2138 }
2139
2140 #[must_use]
2142 pub fn edge_type_count(&self) -> usize {
2143 self.id_to_edge_type.read().len()
2144 }
2145
2146 pub fn neighbors(
2153 &self,
2154 node: NodeId,
2155 direction: Direction,
2156 ) -> impl Iterator<Item = NodeId> + '_ {
2157 let forward: Box<dyn Iterator<Item = NodeId>> = match direction {
2158 Direction::Outgoing | Direction::Both => {
2159 Box::new(self.forward_adj.neighbors(node).into_iter())
2160 }
2161 Direction::Incoming => Box::new(std::iter::empty()),
2162 };
2163
2164 let backward: Box<dyn Iterator<Item = NodeId>> = match direction {
2165 Direction::Incoming | Direction::Both => {
2166 if let Some(ref adj) = self.backward_adj {
2167 Box::new(adj.neighbors(node).into_iter())
2168 } else {
2169 Box::new(std::iter::empty())
2170 }
2171 }
2172 Direction::Outgoing => Box::new(std::iter::empty()),
2173 };
2174
2175 forward.chain(backward)
2176 }
2177
2178 pub fn edges_from(
2182 &self,
2183 node: NodeId,
2184 direction: Direction,
2185 ) -> impl Iterator<Item = (NodeId, EdgeId)> + '_ {
2186 let forward: Box<dyn Iterator<Item = (NodeId, EdgeId)>> = match direction {
2187 Direction::Outgoing | Direction::Both => {
2188 Box::new(self.forward_adj.edges_from(node).into_iter())
2189 }
2190 Direction::Incoming => Box::new(std::iter::empty()),
2191 };
2192
2193 let backward: Box<dyn Iterator<Item = (NodeId, EdgeId)>> = match direction {
2194 Direction::Incoming | Direction::Both => {
2195 if let Some(ref adj) = self.backward_adj {
2196 Box::new(adj.edges_from(node).into_iter())
2197 } else {
2198 Box::new(std::iter::empty())
2199 }
2200 }
2201 Direction::Outgoing => Box::new(std::iter::empty()),
2202 };
2203
2204 forward.chain(backward)
2205 }
2206
2207 pub fn edges_to(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
2220 if let Some(ref backward) = self.backward_adj {
2221 backward.edges_from(node)
2222 } else {
2223 self.all_edges()
2225 .filter_map(|edge| {
2226 if edge.dst == node {
2227 Some((edge.src, edge.id))
2228 } else {
2229 None
2230 }
2231 })
2232 .collect()
2233 }
2234 }
2235
2236 #[must_use]
2240 pub fn out_degree(&self, node: NodeId) -> usize {
2241 self.forward_adj.out_degree(node)
2242 }
2243
2244 #[must_use]
2249 pub fn in_degree(&self, node: NodeId) -> usize {
2250 if let Some(ref backward) = self.backward_adj {
2251 backward.in_degree(node)
2252 } else {
2253 self.all_edges().filter(|edge| edge.dst == node).count()
2255 }
2256 }
2257
2258 #[must_use]
2260 #[cfg(not(feature = "tiered-storage"))]
2261 pub fn edge_type(&self, id: EdgeId) -> Option<ArcStr> {
2262 let edges = self.edges.read();
2263 let chain = edges.get(&id)?;
2264 let epoch = self.current_epoch();
2265 let record = chain.visible_at(epoch)?;
2266 let id_to_type = self.id_to_edge_type.read();
2267 id_to_type.get(record.type_id as usize).cloned()
2268 }
2269
2270 #[must_use]
2273 #[cfg(feature = "tiered-storage")]
2274 pub fn edge_type(&self, id: EdgeId) -> Option<ArcStr> {
2275 let versions = self.edge_versions.read();
2276 let index = versions.get(&id)?;
2277 let epoch = self.current_epoch();
2278 let vref = index.visible_at(epoch)?;
2279 let record = self.read_edge_record(&vref)?;
2280 let id_to_type = self.id_to_edge_type.read();
2281 id_to_type.get(record.type_id as usize).cloned()
2282 }
2283
2284 pub fn nodes_by_label(&self, label: &str) -> Vec<NodeId> {
2290 let label_to_id = self.label_to_id.read();
2291 if let Some(&label_id) = label_to_id.get(label) {
2292 let index = self.label_index.read();
2293 if let Some(set) = index.get(label_id as usize) {
2294 let mut ids: Vec<NodeId> = set.keys().copied().collect();
2295 ids.sort_unstable();
2296 return ids;
2297 }
2298 }
2299 Vec::new()
2300 }
2301
2302 #[cfg(not(feature = "tiered-storage"))]
2309 pub fn all_nodes(&self) -> impl Iterator<Item = Node> + '_ {
2310 let epoch = self.current_epoch();
2311 let node_ids: Vec<NodeId> = self
2312 .nodes
2313 .read()
2314 .iter()
2315 .filter_map(|(id, chain)| {
2316 chain
2317 .visible_at(epoch)
2318 .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
2319 })
2320 .collect();
2321
2322 node_ids.into_iter().filter_map(move |id| self.get_node(id))
2323 }
2324
2325 #[cfg(feature = "tiered-storage")]
2328 pub fn all_nodes(&self) -> impl Iterator<Item = Node> + '_ {
2329 let node_ids = self.node_ids();
2330 node_ids.into_iter().filter_map(move |id| self.get_node(id))
2331 }
2332
2333 #[cfg(not(feature = "tiered-storage"))]
2338 pub fn all_edges(&self) -> impl Iterator<Item = Edge> + '_ {
2339 let epoch = self.current_epoch();
2340 let edge_ids: Vec<EdgeId> = self
2341 .edges
2342 .read()
2343 .iter()
2344 .filter_map(|(id, chain)| {
2345 chain
2346 .visible_at(epoch)
2347 .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
2348 })
2349 .collect();
2350
2351 edge_ids.into_iter().filter_map(move |id| self.get_edge(id))
2352 }
2353
2354 #[cfg(feature = "tiered-storage")]
2357 pub fn all_edges(&self) -> impl Iterator<Item = Edge> + '_ {
2358 let epoch = self.current_epoch();
2359 let versions = self.edge_versions.read();
2360 let edge_ids: Vec<EdgeId> = versions
2361 .iter()
2362 .filter_map(|(id, index)| {
2363 index.visible_at(epoch).and_then(|vref| {
2364 self.read_edge_record(&vref)
2365 .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
2366 })
2367 })
2368 .collect();
2369
2370 edge_ids.into_iter().filter_map(move |id| self.get_edge(id))
2371 }
2372
2373 pub fn all_labels(&self) -> Vec<String> {
2375 self.id_to_label
2376 .read()
2377 .iter()
2378 .map(|s| s.to_string())
2379 .collect()
2380 }
2381
2382 pub fn all_edge_types(&self) -> Vec<String> {
2384 self.id_to_edge_type
2385 .read()
2386 .iter()
2387 .map(|s| s.to_string())
2388 .collect()
2389 }
2390
2391 pub fn all_property_keys(&self) -> Vec<String> {
2393 let mut keys = std::collections::HashSet::new();
2394 for key in self.node_properties.keys() {
2395 keys.insert(key.to_string());
2396 }
2397 for key in self.edge_properties.keys() {
2398 keys.insert(key.to_string());
2399 }
2400 keys.into_iter().collect()
2401 }
2402
2403 pub fn nodes_with_label<'a>(&'a self, label: &str) -> impl Iterator<Item = Node> + 'a {
2405 let node_ids = self.nodes_by_label(label);
2406 node_ids.into_iter().filter_map(move |id| self.get_node(id))
2407 }
2408
2409 #[cfg(not(feature = "tiered-storage"))]
2411 pub fn edges_with_type<'a>(&'a self, edge_type: &str) -> impl Iterator<Item = Edge> + 'a {
2412 let epoch = self.current_epoch();
2413 let type_to_id = self.edge_type_to_id.read();
2414
2415 if let Some(&type_id) = type_to_id.get(edge_type) {
2416 let edge_ids: Vec<EdgeId> = self
2417 .edges
2418 .read()
2419 .iter()
2420 .filter_map(|(id, chain)| {
2421 chain.visible_at(epoch).and_then(|r| {
2422 if !r.is_deleted() && r.type_id == type_id {
2423 Some(*id)
2424 } else {
2425 None
2426 }
2427 })
2428 })
2429 .collect();
2430
2431 Box::new(edge_ids.into_iter().filter_map(move |id| self.get_edge(id)))
2433 as Box<dyn Iterator<Item = Edge> + 'a>
2434 } else {
2435 Box::new(std::iter::empty()) as Box<dyn Iterator<Item = Edge> + 'a>
2437 }
2438 }
2439
2440 #[cfg(feature = "tiered-storage")]
2443 pub fn edges_with_type<'a>(&'a self, edge_type: &str) -> impl Iterator<Item = Edge> + 'a {
2444 let epoch = self.current_epoch();
2445 let type_to_id = self.edge_type_to_id.read();
2446
2447 if let Some(&type_id) = type_to_id.get(edge_type) {
2448 let versions = self.edge_versions.read();
2449 let edge_ids: Vec<EdgeId> = versions
2450 .iter()
2451 .filter_map(|(id, index)| {
2452 index.visible_at(epoch).and_then(|vref| {
2453 self.read_edge_record(&vref).and_then(|r| {
2454 if !r.is_deleted() && r.type_id == type_id {
2455 Some(*id)
2456 } else {
2457 None
2458 }
2459 })
2460 })
2461 })
2462 .collect();
2463
2464 Box::new(edge_ids.into_iter().filter_map(move |id| self.get_edge(id)))
2465 as Box<dyn Iterator<Item = Edge> + 'a>
2466 } else {
2467 Box::new(std::iter::empty()) as Box<dyn Iterator<Item = Edge> + 'a>
2468 }
2469 }
2470
2471 #[must_use]
2478 pub fn node_property_might_match(
2479 &self,
2480 property: &PropertyKey,
2481 op: CompareOp,
2482 value: &Value,
2483 ) -> bool {
2484 self.node_properties.might_match(property, op, value)
2485 }
2486
2487 #[must_use]
2489 pub fn edge_property_might_match(
2490 &self,
2491 property: &PropertyKey,
2492 op: CompareOp,
2493 value: &Value,
2494 ) -> bool {
2495 self.edge_properties.might_match(property, op, value)
2496 }
2497
2498 #[must_use]
2500 pub fn node_property_zone_map(&self, property: &PropertyKey) -> Option<ZoneMapEntry> {
2501 self.node_properties.zone_map(property)
2502 }
2503
2504 #[must_use]
2506 pub fn edge_property_zone_map(&self, property: &PropertyKey) -> Option<ZoneMapEntry> {
2507 self.edge_properties.zone_map(property)
2508 }
2509
2510 pub fn rebuild_zone_maps(&self) {
2512 self.node_properties.rebuild_zone_maps();
2513 self.edge_properties.rebuild_zone_maps();
2514 }
2515
2516 #[must_use]
2520 pub fn statistics(&self) -> Statistics {
2521 self.statistics.read().clone()
2522 }
2523
2524 #[cfg(not(feature = "tiered-storage"))]
2529 pub fn compute_statistics(&self) {
2530 let mut stats = Statistics::new();
2531
2532 stats.total_nodes = self.node_count() as u64;
2534 stats.total_edges = self.edge_count() as u64;
2535
2536 let id_to_label = self.id_to_label.read();
2538 let label_index = self.label_index.read();
2539
2540 for (label_id, label_name) in id_to_label.iter().enumerate() {
2541 let node_count = label_index
2542 .get(label_id)
2543 .map(|set| set.len() as u64)
2544 .unwrap_or(0);
2545
2546 if node_count > 0 {
2547 let avg_out_degree = if stats.total_nodes > 0 {
2549 stats.total_edges as f64 / stats.total_nodes as f64
2550 } else {
2551 0.0
2552 };
2553
2554 let label_stats =
2555 LabelStatistics::new(node_count).with_degrees(avg_out_degree, avg_out_degree);
2556
2557 stats.update_label(label_name.as_ref(), label_stats);
2558 }
2559 }
2560
2561 let id_to_edge_type = self.id_to_edge_type.read();
2563 let edges = self.edges.read();
2564 let epoch = self.current_epoch();
2565
2566 let mut edge_type_counts: FxHashMap<u32, u64> = FxHashMap::default();
2567 for chain in edges.values() {
2568 if let Some(record) = chain.visible_at(epoch) {
2569 if !record.is_deleted() {
2570 *edge_type_counts.entry(record.type_id).or_default() += 1;
2571 }
2572 }
2573 }
2574
2575 for (type_id, count) in edge_type_counts {
2576 if let Some(type_name) = id_to_edge_type.get(type_id as usize) {
2577 let avg_degree = if stats.total_nodes > 0 {
2578 count as f64 / stats.total_nodes as f64
2579 } else {
2580 0.0
2581 };
2582
2583 let edge_stats = EdgeTypeStatistics::new(count, avg_degree, avg_degree);
2584 stats.update_edge_type(type_name.as_ref(), edge_stats);
2585 }
2586 }
2587
2588 *self.statistics.write() = stats;
2589 }
2590
2591 #[cfg(feature = "tiered-storage")]
2594 pub fn compute_statistics(&self) {
2595 let mut stats = Statistics::new();
2596
2597 stats.total_nodes = self.node_count() as u64;
2599 stats.total_edges = self.edge_count() as u64;
2600
2601 let id_to_label = self.id_to_label.read();
2603 let label_index = self.label_index.read();
2604
2605 for (label_id, label_name) in id_to_label.iter().enumerate() {
2606 let node_count = label_index
2607 .get(label_id)
2608 .map(|set| set.len() as u64)
2609 .unwrap_or(0);
2610
2611 if node_count > 0 {
2612 let avg_out_degree = if stats.total_nodes > 0 {
2613 stats.total_edges as f64 / stats.total_nodes as f64
2614 } else {
2615 0.0
2616 };
2617
2618 let label_stats =
2619 LabelStatistics::new(node_count).with_degrees(avg_out_degree, avg_out_degree);
2620
2621 stats.update_label(label_name.as_ref(), label_stats);
2622 }
2623 }
2624
2625 let id_to_edge_type = self.id_to_edge_type.read();
2627 let versions = self.edge_versions.read();
2628 let epoch = self.current_epoch();
2629
2630 let mut edge_type_counts: FxHashMap<u32, u64> = FxHashMap::default();
2631 for index in versions.values() {
2632 if let Some(vref) = index.visible_at(epoch) {
2633 if let Some(record) = self.read_edge_record(&vref) {
2634 if !record.is_deleted() {
2635 *edge_type_counts.entry(record.type_id).or_default() += 1;
2636 }
2637 }
2638 }
2639 }
2640
2641 for (type_id, count) in edge_type_counts {
2642 if let Some(type_name) = id_to_edge_type.get(type_id as usize) {
2643 let avg_degree = if stats.total_nodes > 0 {
2644 count as f64 / stats.total_nodes as f64
2645 } else {
2646 0.0
2647 };
2648
2649 let edge_stats = EdgeTypeStatistics::new(count, avg_degree, avg_degree);
2650 stats.update_edge_type(type_name.as_ref(), edge_stats);
2651 }
2652 }
2653
2654 *self.statistics.write() = stats;
2655 }
2656
2657 #[must_use]
2659 pub fn estimate_label_cardinality(&self, label: &str) -> f64 {
2660 self.statistics.read().estimate_label_cardinality(label)
2661 }
2662
2663 #[must_use]
2665 pub fn estimate_avg_degree(&self, edge_type: &str, outgoing: bool) -> f64 {
2666 self.statistics
2667 .read()
2668 .estimate_avg_degree(edge_type, outgoing)
2669 }
2670
2671 fn get_or_create_label_id(&self, label: &str) -> u32 {
2674 {
2675 let label_to_id = self.label_to_id.read();
2676 if let Some(&id) = label_to_id.get(label) {
2677 return id;
2678 }
2679 }
2680
2681 let mut label_to_id = self.label_to_id.write();
2682 let mut id_to_label = self.id_to_label.write();
2683
2684 if let Some(&id) = label_to_id.get(label) {
2686 return id;
2687 }
2688
2689 let id = id_to_label.len() as u32;
2690
2691 let label: ArcStr = label.into();
2692 label_to_id.insert(label.clone(), id);
2693 id_to_label.push(label);
2694
2695 id
2696 }
2697
2698 fn get_or_create_edge_type_id(&self, edge_type: &str) -> u32 {
2699 {
2700 let type_to_id = self.edge_type_to_id.read();
2701 if let Some(&id) = type_to_id.get(edge_type) {
2702 return id;
2703 }
2704 }
2705
2706 let mut type_to_id = self.edge_type_to_id.write();
2707 let mut id_to_type = self.id_to_edge_type.write();
2708
2709 if let Some(&id) = type_to_id.get(edge_type) {
2711 return id;
2712 }
2713
2714 let id = id_to_type.len() as u32;
2715 let edge_type: ArcStr = edge_type.into();
2716 type_to_id.insert(edge_type.clone(), id);
2717 id_to_type.push(edge_type);
2718
2719 id
2720 }
2721
2722 #[cfg(not(feature = "tiered-storage"))]
2729 pub fn create_node_with_id(&self, id: NodeId, labels: &[&str]) {
2730 let epoch = self.current_epoch();
2731 let mut record = NodeRecord::new(id, epoch);
2732 record.set_label_count(labels.len() as u16);
2733
2734 let mut node_label_set = FxHashSet::default();
2736 for label in labels {
2737 let label_id = self.get_or_create_label_id(*label);
2738 node_label_set.insert(label_id);
2739
2740 let mut index = self.label_index.write();
2742 while index.len() <= label_id as usize {
2743 index.push(FxHashMap::default());
2744 }
2745 index[label_id as usize].insert(id, ());
2746 }
2747
2748 self.node_labels.write().insert(id, node_label_set);
2750
2751 let chain = VersionChain::with_initial(record, epoch, TxId::SYSTEM);
2753 self.nodes.write().insert(id, chain);
2754
2755 let id_val = id.as_u64();
2757 let _ = self
2758 .next_node_id
2759 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
2760 if id_val >= current {
2761 Some(id_val + 1)
2762 } else {
2763 None
2764 }
2765 });
2766 }
2767
2768 #[cfg(feature = "tiered-storage")]
2771 pub fn create_node_with_id(&self, id: NodeId, labels: &[&str]) {
2772 let epoch = self.current_epoch();
2773 let mut record = NodeRecord::new(id, epoch);
2774 record.set_label_count(labels.len() as u16);
2775
2776 let mut node_label_set = FxHashSet::default();
2778 for label in labels {
2779 let label_id = self.get_or_create_label_id(*label);
2780 node_label_set.insert(label_id);
2781
2782 let mut index = self.label_index.write();
2784 while index.len() <= label_id as usize {
2785 index.push(FxHashMap::default());
2786 }
2787 index[label_id as usize].insert(id, ());
2788 }
2789
2790 self.node_labels.write().insert(id, node_label_set);
2792
2793 let arena = self.arena_allocator.arena_or_create(epoch);
2795 let (offset, _stored) = arena.alloc_value_with_offset(record);
2796
2797 let hot_ref = HotVersionRef::new(epoch, offset, TxId::SYSTEM);
2799 let mut versions = self.node_versions.write();
2800 versions.insert(id, VersionIndex::with_initial(hot_ref));
2801
2802 let id_val = id.as_u64();
2804 let _ = self
2805 .next_node_id
2806 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
2807 if id_val >= current {
2808 Some(id_val + 1)
2809 } else {
2810 None
2811 }
2812 });
2813 }
2814
2815 #[cfg(not(feature = "tiered-storage"))]
2819 pub fn create_edge_with_id(&self, id: EdgeId, src: NodeId, dst: NodeId, edge_type: &str) {
2820 let epoch = self.current_epoch();
2821 let type_id = self.get_or_create_edge_type_id(edge_type);
2822
2823 let record = EdgeRecord::new(id, src, dst, type_id, epoch);
2824 let chain = VersionChain::with_initial(record, epoch, TxId::SYSTEM);
2825 self.edges.write().insert(id, chain);
2826
2827 self.forward_adj.add_edge(src, dst, id);
2829 if let Some(ref backward) = self.backward_adj {
2830 backward.add_edge(dst, src, id);
2831 }
2832
2833 let id_val = id.as_u64();
2835 let _ = self
2836 .next_edge_id
2837 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
2838 if id_val >= current {
2839 Some(id_val + 1)
2840 } else {
2841 None
2842 }
2843 });
2844 }
2845
2846 #[cfg(feature = "tiered-storage")]
2849 pub fn create_edge_with_id(&self, id: EdgeId, src: NodeId, dst: NodeId, edge_type: &str) {
2850 let epoch = self.current_epoch();
2851 let type_id = self.get_or_create_edge_type_id(edge_type);
2852
2853 let record = EdgeRecord::new(id, src, dst, type_id, epoch);
2854
2855 let arena = self.arena_allocator.arena_or_create(epoch);
2857 let (offset, _stored) = arena.alloc_value_with_offset(record);
2858
2859 let hot_ref = HotVersionRef::new(epoch, offset, TxId::SYSTEM);
2861 let mut versions = self.edge_versions.write();
2862 versions.insert(id, VersionIndex::with_initial(hot_ref));
2863
2864 self.forward_adj.add_edge(src, dst, id);
2866 if let Some(ref backward) = self.backward_adj {
2867 backward.add_edge(dst, src, id);
2868 }
2869
2870 let id_val = id.as_u64();
2872 let _ = self
2873 .next_edge_id
2874 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
2875 if id_val >= current {
2876 Some(id_val + 1)
2877 } else {
2878 None
2879 }
2880 });
2881 }
2882
2883 pub fn set_epoch(&self, epoch: EpochId) {
2885 self.current_epoch.store(epoch.as_u64(), Ordering::SeqCst);
2886 }
2887}
2888
2889impl Default for LpgStore {
2890 fn default() -> Self {
2891 Self::new()
2892 }
2893}
2894
2895#[cfg(test)]
2896mod tests {
2897 use super::*;
2898
2899 #[test]
2900 fn test_create_node() {
2901 let store = LpgStore::new();
2902
2903 let id = store.create_node(&["Person"]);
2904 assert!(id.is_valid());
2905
2906 let node = store.get_node(id).unwrap();
2907 assert!(node.has_label("Person"));
2908 assert!(!node.has_label("Animal"));
2909 }
2910
2911 #[test]
2912 fn test_create_node_with_props() {
2913 let store = LpgStore::new();
2914
2915 let id = store.create_node_with_props(
2916 &["Person"],
2917 [("name", Value::from("Alice")), ("age", Value::from(30i64))],
2918 );
2919
2920 let node = store.get_node(id).unwrap();
2921 assert_eq!(
2922 node.get_property("name").and_then(|v| v.as_str()),
2923 Some("Alice")
2924 );
2925 assert_eq!(
2926 node.get_property("age").and_then(|v| v.as_int64()),
2927 Some(30)
2928 );
2929 }
2930
2931 #[test]
2932 fn test_delete_node() {
2933 let store = LpgStore::new();
2934
2935 let id = store.create_node(&["Person"]);
2936 assert_eq!(store.node_count(), 1);
2937
2938 assert!(store.delete_node(id));
2939 assert_eq!(store.node_count(), 0);
2940 assert!(store.get_node(id).is_none());
2941
2942 assert!(!store.delete_node(id));
2944 }
2945
2946 #[test]
2947 fn test_create_edge() {
2948 let store = LpgStore::new();
2949
2950 let alice = store.create_node(&["Person"]);
2951 let bob = store.create_node(&["Person"]);
2952
2953 let edge_id = store.create_edge(alice, bob, "KNOWS");
2954 assert!(edge_id.is_valid());
2955
2956 let edge = store.get_edge(edge_id).unwrap();
2957 assert_eq!(edge.src, alice);
2958 assert_eq!(edge.dst, bob);
2959 assert_eq!(edge.edge_type.as_str(), "KNOWS");
2960 }
2961
2962 #[test]
2963 fn test_neighbors() {
2964 let store = LpgStore::new();
2965
2966 let a = store.create_node(&["Person"]);
2967 let b = store.create_node(&["Person"]);
2968 let c = store.create_node(&["Person"]);
2969
2970 store.create_edge(a, b, "KNOWS");
2971 store.create_edge(a, c, "KNOWS");
2972
2973 let outgoing: Vec<_> = store.neighbors(a, Direction::Outgoing).collect();
2974 assert_eq!(outgoing.len(), 2);
2975 assert!(outgoing.contains(&b));
2976 assert!(outgoing.contains(&c));
2977
2978 let incoming: Vec<_> = store.neighbors(b, Direction::Incoming).collect();
2979 assert_eq!(incoming.len(), 1);
2980 assert!(incoming.contains(&a));
2981 }
2982
2983 #[test]
2984 fn test_nodes_by_label() {
2985 let store = LpgStore::new();
2986
2987 let p1 = store.create_node(&["Person"]);
2988 let p2 = store.create_node(&["Person"]);
2989 let _a = store.create_node(&["Animal"]);
2990
2991 let persons = store.nodes_by_label("Person");
2992 assert_eq!(persons.len(), 2);
2993 assert!(persons.contains(&p1));
2994 assert!(persons.contains(&p2));
2995
2996 let animals = store.nodes_by_label("Animal");
2997 assert_eq!(animals.len(), 1);
2998 }
2999
3000 #[test]
3001 fn test_delete_edge() {
3002 let store = LpgStore::new();
3003
3004 let a = store.create_node(&["Person"]);
3005 let b = store.create_node(&["Person"]);
3006 let edge_id = store.create_edge(a, b, "KNOWS");
3007
3008 assert_eq!(store.edge_count(), 1);
3009
3010 assert!(store.delete_edge(edge_id));
3011 assert_eq!(store.edge_count(), 0);
3012 assert!(store.get_edge(edge_id).is_none());
3013 }
3014
3015 #[test]
3018 fn test_lpg_store_config() {
3019 let config = LpgStoreConfig {
3021 backward_edges: false,
3022 initial_node_capacity: 100,
3023 initial_edge_capacity: 200,
3024 };
3025 let store = LpgStore::with_config(config);
3026
3027 let a = store.create_node(&["Person"]);
3029 let b = store.create_node(&["Person"]);
3030 store.create_edge(a, b, "KNOWS");
3031
3032 let outgoing: Vec<_> = store.neighbors(a, Direction::Outgoing).collect();
3034 assert_eq!(outgoing.len(), 1);
3035
3036 let incoming: Vec<_> = store.neighbors(b, Direction::Incoming).collect();
3038 assert_eq!(incoming.len(), 0);
3039 }
3040
3041 #[test]
3042 fn test_epoch_management() {
3043 let store = LpgStore::new();
3044
3045 let epoch0 = store.current_epoch();
3046 assert_eq!(epoch0.as_u64(), 0);
3047
3048 let epoch1 = store.new_epoch();
3049 assert_eq!(epoch1.as_u64(), 1);
3050
3051 let current = store.current_epoch();
3052 assert_eq!(current.as_u64(), 1);
3053 }
3054
3055 #[test]
3056 fn test_node_properties() {
3057 let store = LpgStore::new();
3058 let id = store.create_node(&["Person"]);
3059
3060 store.set_node_property(id, "name", Value::from("Alice"));
3062 let name = store.get_node_property(id, &"name".into());
3063 assert!(matches!(name, Some(Value::String(s)) if s.as_str() == "Alice"));
3064
3065 store.set_node_property(id, "name", Value::from("Bob"));
3067 let name = store.get_node_property(id, &"name".into());
3068 assert!(matches!(name, Some(Value::String(s)) if s.as_str() == "Bob"));
3069
3070 let old = store.remove_node_property(id, "name");
3072 assert!(matches!(old, Some(Value::String(s)) if s.as_str() == "Bob"));
3073
3074 let name = store.get_node_property(id, &"name".into());
3076 assert!(name.is_none());
3077
3078 let none = store.remove_node_property(id, "nonexistent");
3080 assert!(none.is_none());
3081 }
3082
3083 #[test]
3084 fn test_edge_properties() {
3085 let store = LpgStore::new();
3086 let a = store.create_node(&["Person"]);
3087 let b = store.create_node(&["Person"]);
3088 let edge_id = store.create_edge(a, b, "KNOWS");
3089
3090 store.set_edge_property(edge_id, "since", Value::from(2020i64));
3092 let since = store.get_edge_property(edge_id, &"since".into());
3093 assert_eq!(since.and_then(|v| v.as_int64()), Some(2020));
3094
3095 let old = store.remove_edge_property(edge_id, "since");
3097 assert_eq!(old.and_then(|v| v.as_int64()), Some(2020));
3098
3099 let since = store.get_edge_property(edge_id, &"since".into());
3100 assert!(since.is_none());
3101 }
3102
3103 #[test]
3104 fn test_add_remove_label() {
3105 let store = LpgStore::new();
3106 let id = store.create_node(&["Person"]);
3107
3108 assert!(store.add_label(id, "Employee"));
3110
3111 let node = store.get_node(id).unwrap();
3112 assert!(node.has_label("Person"));
3113 assert!(node.has_label("Employee"));
3114
3115 assert!(!store.add_label(id, "Employee"));
3117
3118 assert!(store.remove_label(id, "Employee"));
3120
3121 let node = store.get_node(id).unwrap();
3122 assert!(node.has_label("Person"));
3123 assert!(!node.has_label("Employee"));
3124
3125 assert!(!store.remove_label(id, "Employee"));
3127 assert!(!store.remove_label(id, "NonExistent"));
3128 }
3129
3130 #[test]
3131 fn test_add_label_to_nonexistent_node() {
3132 let store = LpgStore::new();
3133 let fake_id = NodeId::new(999);
3134 assert!(!store.add_label(fake_id, "Label"));
3135 }
3136
3137 #[test]
3138 fn test_remove_label_from_nonexistent_node() {
3139 let store = LpgStore::new();
3140 let fake_id = NodeId::new(999);
3141 assert!(!store.remove_label(fake_id, "Label"));
3142 }
3143
3144 #[test]
3145 fn test_node_ids() {
3146 let store = LpgStore::new();
3147
3148 let n1 = store.create_node(&["Person"]);
3149 let n2 = store.create_node(&["Person"]);
3150 let n3 = store.create_node(&["Person"]);
3151
3152 let ids = store.node_ids();
3153 assert_eq!(ids.len(), 3);
3154 assert!(ids.contains(&n1));
3155 assert!(ids.contains(&n2));
3156 assert!(ids.contains(&n3));
3157
3158 store.delete_node(n2);
3160 let ids = store.node_ids();
3161 assert_eq!(ids.len(), 2);
3162 assert!(!ids.contains(&n2));
3163 }
3164
3165 #[test]
3166 fn test_delete_node_nonexistent() {
3167 let store = LpgStore::new();
3168 let fake_id = NodeId::new(999);
3169 assert!(!store.delete_node(fake_id));
3170 }
3171
3172 #[test]
3173 fn test_delete_edge_nonexistent() {
3174 let store = LpgStore::new();
3175 let fake_id = EdgeId::new(999);
3176 assert!(!store.delete_edge(fake_id));
3177 }
3178
3179 #[test]
3180 fn test_delete_edge_double() {
3181 let store = LpgStore::new();
3182 let a = store.create_node(&["Person"]);
3183 let b = store.create_node(&["Person"]);
3184 let edge_id = store.create_edge(a, b, "KNOWS");
3185
3186 assert!(store.delete_edge(edge_id));
3187 assert!(!store.delete_edge(edge_id)); }
3189
3190 #[test]
3191 fn test_create_edge_with_props() {
3192 let store = LpgStore::new();
3193 let a = store.create_node(&["Person"]);
3194 let b = store.create_node(&["Person"]);
3195
3196 let edge_id = store.create_edge_with_props(
3197 a,
3198 b,
3199 "KNOWS",
3200 [
3201 ("since", Value::from(2020i64)),
3202 ("weight", Value::from(1.0)),
3203 ],
3204 );
3205
3206 let edge = store.get_edge(edge_id).unwrap();
3207 assert_eq!(
3208 edge.get_property("since").and_then(|v| v.as_int64()),
3209 Some(2020)
3210 );
3211 assert_eq!(
3212 edge.get_property("weight").and_then(|v| v.as_float64()),
3213 Some(1.0)
3214 );
3215 }
3216
3217 #[test]
3218 fn test_delete_node_edges() {
3219 let store = LpgStore::new();
3220
3221 let a = store.create_node(&["Person"]);
3222 let b = store.create_node(&["Person"]);
3223 let c = store.create_node(&["Person"]);
3224
3225 store.create_edge(a, b, "KNOWS"); store.create_edge(c, a, "KNOWS"); assert_eq!(store.edge_count(), 2);
3229
3230 store.delete_node_edges(a);
3232
3233 assert_eq!(store.edge_count(), 0);
3234 }
3235
3236 #[test]
3237 fn test_neighbors_both_directions() {
3238 let store = LpgStore::new();
3239
3240 let a = store.create_node(&["Person"]);
3241 let b = store.create_node(&["Person"]);
3242 let c = store.create_node(&["Person"]);
3243
3244 store.create_edge(a, b, "KNOWS"); store.create_edge(c, a, "KNOWS"); let neighbors: Vec<_> = store.neighbors(a, Direction::Both).collect();
3249 assert_eq!(neighbors.len(), 2);
3250 assert!(neighbors.contains(&b)); assert!(neighbors.contains(&c)); }
3253
3254 #[test]
3255 fn test_edges_from() {
3256 let store = LpgStore::new();
3257
3258 let a = store.create_node(&["Person"]);
3259 let b = store.create_node(&["Person"]);
3260 let c = store.create_node(&["Person"]);
3261
3262 let e1 = store.create_edge(a, b, "KNOWS");
3263 let e2 = store.create_edge(a, c, "KNOWS");
3264
3265 let edges: Vec<_> = store.edges_from(a, Direction::Outgoing).collect();
3266 assert_eq!(edges.len(), 2);
3267 assert!(edges.iter().any(|(_, e)| *e == e1));
3268 assert!(edges.iter().any(|(_, e)| *e == e2));
3269
3270 let incoming: Vec<_> = store.edges_from(b, Direction::Incoming).collect();
3272 assert_eq!(incoming.len(), 1);
3273 assert_eq!(incoming[0].1, e1);
3274 }
3275
3276 #[test]
3277 fn test_edges_to() {
3278 let store = LpgStore::new();
3279
3280 let a = store.create_node(&["Person"]);
3281 let b = store.create_node(&["Person"]);
3282 let c = store.create_node(&["Person"]);
3283
3284 let e1 = store.create_edge(a, b, "KNOWS");
3285 let e2 = store.create_edge(c, b, "KNOWS");
3286
3287 let to_b = store.edges_to(b);
3289 assert_eq!(to_b.len(), 2);
3290 assert!(to_b.iter().any(|(src, e)| *src == a && *e == e1));
3291 assert!(to_b.iter().any(|(src, e)| *src == c && *e == e2));
3292 }
3293
3294 #[test]
3295 fn test_out_degree_in_degree() {
3296 let store = LpgStore::new();
3297
3298 let a = store.create_node(&["Person"]);
3299 let b = store.create_node(&["Person"]);
3300 let c = store.create_node(&["Person"]);
3301
3302 store.create_edge(a, b, "KNOWS");
3303 store.create_edge(a, c, "KNOWS");
3304 store.create_edge(c, b, "KNOWS");
3305
3306 assert_eq!(store.out_degree(a), 2);
3307 assert_eq!(store.out_degree(b), 0);
3308 assert_eq!(store.out_degree(c), 1);
3309
3310 assert_eq!(store.in_degree(a), 0);
3311 assert_eq!(store.in_degree(b), 2);
3312 assert_eq!(store.in_degree(c), 1);
3313 }
3314
3315 #[test]
3316 fn test_edge_type() {
3317 let store = LpgStore::new();
3318
3319 let a = store.create_node(&["Person"]);
3320 let b = store.create_node(&["Person"]);
3321 let edge_id = store.create_edge(a, b, "KNOWS");
3322
3323 let edge_type = store.edge_type(edge_id);
3324 assert_eq!(edge_type.as_deref(), Some("KNOWS"));
3325
3326 let fake_id = EdgeId::new(999);
3328 assert!(store.edge_type(fake_id).is_none());
3329 }
3330
3331 #[test]
3332 fn test_count_methods() {
3333 let store = LpgStore::new();
3334
3335 assert_eq!(store.label_count(), 0);
3336 assert_eq!(store.edge_type_count(), 0);
3337 assert_eq!(store.property_key_count(), 0);
3338
3339 let a = store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
3340 let b = store.create_node(&["Company"]);
3341 store.create_edge_with_props(a, b, "WORKS_AT", [("since", Value::from(2020i64))]);
3342
3343 assert_eq!(store.label_count(), 2); assert_eq!(store.edge_type_count(), 1); assert_eq!(store.property_key_count(), 2); }
3347
3348 #[test]
3349 fn test_all_nodes_and_edges() {
3350 let store = LpgStore::new();
3351
3352 let a = store.create_node(&["Person"]);
3353 let b = store.create_node(&["Person"]);
3354 store.create_edge(a, b, "KNOWS");
3355
3356 let nodes: Vec<_> = store.all_nodes().collect();
3357 assert_eq!(nodes.len(), 2);
3358
3359 let edges: Vec<_> = store.all_edges().collect();
3360 assert_eq!(edges.len(), 1);
3361 }
3362
3363 #[test]
3364 fn test_all_labels_and_edge_types() {
3365 let store = LpgStore::new();
3366
3367 store.create_node(&["Person"]);
3368 store.create_node(&["Company"]);
3369 let a = store.create_node(&["Animal"]);
3370 let b = store.create_node(&["Animal"]);
3371 store.create_edge(a, b, "EATS");
3372
3373 let labels = store.all_labels();
3374 assert_eq!(labels.len(), 3);
3375 assert!(labels.contains(&"Person".to_string()));
3376 assert!(labels.contains(&"Company".to_string()));
3377 assert!(labels.contains(&"Animal".to_string()));
3378
3379 let edge_types = store.all_edge_types();
3380 assert_eq!(edge_types.len(), 1);
3381 assert!(edge_types.contains(&"EATS".to_string()));
3382 }
3383
3384 #[test]
3385 fn test_all_property_keys() {
3386 let store = LpgStore::new();
3387
3388 let a = store.create_node_with_props(&["Person"], [("name", Value::from("Alice"))]);
3389 let b = store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
3390 store.create_edge_with_props(a, b, "KNOWS", [("since", Value::from(2020i64))]);
3391
3392 let keys = store.all_property_keys();
3393 assert!(keys.contains(&"name".to_string()));
3394 assert!(keys.contains(&"age".to_string()));
3395 assert!(keys.contains(&"since".to_string()));
3396 }
3397
3398 #[test]
3399 fn test_nodes_with_label() {
3400 let store = LpgStore::new();
3401
3402 store.create_node(&["Person"]);
3403 store.create_node(&["Person"]);
3404 store.create_node(&["Company"]);
3405
3406 let persons: Vec<_> = store.nodes_with_label("Person").collect();
3407 assert_eq!(persons.len(), 2);
3408
3409 let companies: Vec<_> = store.nodes_with_label("Company").collect();
3410 assert_eq!(companies.len(), 1);
3411
3412 let none: Vec<_> = store.nodes_with_label("NonExistent").collect();
3413 assert_eq!(none.len(), 0);
3414 }
3415
3416 #[test]
3417 fn test_edges_with_type() {
3418 let store = LpgStore::new();
3419
3420 let a = store.create_node(&["Person"]);
3421 let b = store.create_node(&["Person"]);
3422 let c = store.create_node(&["Company"]);
3423
3424 store.create_edge(a, b, "KNOWS");
3425 store.create_edge(a, c, "WORKS_AT");
3426
3427 let knows: Vec<_> = store.edges_with_type("KNOWS").collect();
3428 assert_eq!(knows.len(), 1);
3429
3430 let works_at: Vec<_> = store.edges_with_type("WORKS_AT").collect();
3431 assert_eq!(works_at.len(), 1);
3432
3433 let none: Vec<_> = store.edges_with_type("NonExistent").collect();
3434 assert_eq!(none.len(), 0);
3435 }
3436
3437 #[test]
3438 fn test_nodes_by_label_nonexistent() {
3439 let store = LpgStore::new();
3440 store.create_node(&["Person"]);
3441
3442 let empty = store.nodes_by_label("NonExistent");
3443 assert!(empty.is_empty());
3444 }
3445
3446 #[test]
3447 fn test_statistics() {
3448 let store = LpgStore::new();
3449
3450 let a = store.create_node(&["Person"]);
3451 let b = store.create_node(&["Person"]);
3452 let c = store.create_node(&["Company"]);
3453
3454 store.create_edge(a, b, "KNOWS");
3455 store.create_edge(a, c, "WORKS_AT");
3456
3457 store.compute_statistics();
3458 let stats = store.statistics();
3459
3460 assert_eq!(stats.total_nodes, 3);
3461 assert_eq!(stats.total_edges, 2);
3462
3463 let person_card = store.estimate_label_cardinality("Person");
3465 assert!(person_card > 0.0);
3466
3467 let avg_degree = store.estimate_avg_degree("KNOWS", true);
3468 assert!(avg_degree >= 0.0);
3469 }
3470
3471 #[test]
3472 fn test_zone_maps() {
3473 let store = LpgStore::new();
3474
3475 store.create_node_with_props(&["Person"], [("age", Value::from(25i64))]);
3476 store.create_node_with_props(&["Person"], [("age", Value::from(35i64))]);
3477
3478 let might_match =
3480 store.node_property_might_match(&"age".into(), CompareOp::Eq, &Value::from(30i64));
3481 assert!(might_match);
3483
3484 let zone = store.node_property_zone_map(&"age".into());
3485 assert!(zone.is_some());
3486
3487 let no_zone = store.node_property_zone_map(&"nonexistent".into());
3489 assert!(no_zone.is_none());
3490
3491 let a = store.create_node(&["A"]);
3493 let b = store.create_node(&["B"]);
3494 store.create_edge_with_props(a, b, "REL", [("weight", Value::from(1.0))]);
3495
3496 let edge_zone = store.edge_property_zone_map(&"weight".into());
3497 assert!(edge_zone.is_some());
3498 }
3499
3500 #[test]
3501 fn test_rebuild_zone_maps() {
3502 let store = LpgStore::new();
3503 store.create_node_with_props(&["Person"], [("age", Value::from(25i64))]);
3504
3505 store.rebuild_zone_maps();
3507 }
3508
3509 #[test]
3510 fn test_create_node_with_id() {
3511 let store = LpgStore::new();
3512
3513 let specific_id = NodeId::new(100);
3514 store.create_node_with_id(specific_id, &["Person", "Employee"]);
3515
3516 let node = store.get_node(specific_id).unwrap();
3517 assert!(node.has_label("Person"));
3518 assert!(node.has_label("Employee"));
3519
3520 let next = store.create_node(&["Other"]);
3522 assert!(next.as_u64() > 100);
3523 }
3524
3525 #[test]
3526 fn test_create_edge_with_id() {
3527 let store = LpgStore::new();
3528
3529 let a = store.create_node(&["A"]);
3530 let b = store.create_node(&["B"]);
3531
3532 let specific_id = EdgeId::new(500);
3533 store.create_edge_with_id(specific_id, a, b, "REL");
3534
3535 let edge = store.get_edge(specific_id).unwrap();
3536 assert_eq!(edge.src, a);
3537 assert_eq!(edge.dst, b);
3538 assert_eq!(edge.edge_type.as_str(), "REL");
3539
3540 let next = store.create_edge(a, b, "OTHER");
3542 assert!(next.as_u64() > 500);
3543 }
3544
3545 #[test]
3546 fn test_set_epoch() {
3547 let store = LpgStore::new();
3548
3549 assert_eq!(store.current_epoch().as_u64(), 0);
3550
3551 store.set_epoch(EpochId::new(42));
3552 assert_eq!(store.current_epoch().as_u64(), 42);
3553 }
3554
3555 #[test]
3556 fn test_get_node_nonexistent() {
3557 let store = LpgStore::new();
3558 let fake_id = NodeId::new(999);
3559 assert!(store.get_node(fake_id).is_none());
3560 }
3561
3562 #[test]
3563 fn test_get_edge_nonexistent() {
3564 let store = LpgStore::new();
3565 let fake_id = EdgeId::new(999);
3566 assert!(store.get_edge(fake_id).is_none());
3567 }
3568
3569 #[test]
3570 fn test_multiple_labels() {
3571 let store = LpgStore::new();
3572
3573 let id = store.create_node(&["Person", "Employee", "Manager"]);
3574 let node = store.get_node(id).unwrap();
3575
3576 assert!(node.has_label("Person"));
3577 assert!(node.has_label("Employee"));
3578 assert!(node.has_label("Manager"));
3579 assert!(!node.has_label("Other"));
3580 }
3581
3582 #[test]
3583 fn test_default_impl() {
3584 let store: LpgStore = Default::default();
3585 assert_eq!(store.node_count(), 0);
3586 assert_eq!(store.edge_count(), 0);
3587 }
3588
3589 #[test]
3590 fn test_edges_from_both_directions() {
3591 let store = LpgStore::new();
3592
3593 let a = store.create_node(&["A"]);
3594 let b = store.create_node(&["B"]);
3595 let c = store.create_node(&["C"]);
3596
3597 let e1 = store.create_edge(a, b, "R1"); let e2 = store.create_edge(c, a, "R2"); let edges: Vec<_> = store.edges_from(a, Direction::Both).collect();
3602 assert_eq!(edges.len(), 2);
3603 assert!(edges.iter().any(|(_, e)| *e == e1)); assert!(edges.iter().any(|(_, e)| *e == e2)); }
3606
3607 #[test]
3608 fn test_no_backward_adj_in_degree() {
3609 let config = LpgStoreConfig {
3610 backward_edges: false,
3611 initial_node_capacity: 10,
3612 initial_edge_capacity: 10,
3613 };
3614 let store = LpgStore::with_config(config);
3615
3616 let a = store.create_node(&["A"]);
3617 let b = store.create_node(&["B"]);
3618 store.create_edge(a, b, "R");
3619
3620 let degree = store.in_degree(b);
3622 assert_eq!(degree, 1);
3623 }
3624
3625 #[test]
3626 fn test_no_backward_adj_edges_to() {
3627 let config = LpgStoreConfig {
3628 backward_edges: false,
3629 initial_node_capacity: 10,
3630 initial_edge_capacity: 10,
3631 };
3632 let store = LpgStore::with_config(config);
3633
3634 let a = store.create_node(&["A"]);
3635 let b = store.create_node(&["B"]);
3636 let e = store.create_edge(a, b, "R");
3637
3638 let edges = store.edges_to(b);
3640 assert_eq!(edges.len(), 1);
3641 assert_eq!(edges[0].1, e);
3642 }
3643
3644 #[test]
3645 fn test_node_versioned_creation() {
3646 let store = LpgStore::new();
3647
3648 let epoch = store.new_epoch();
3649 let tx_id = TxId::new(1);
3650
3651 let id = store.create_node_versioned(&["Person"], epoch, tx_id);
3652 assert!(store.get_node(id).is_some());
3653 }
3654
3655 #[test]
3656 fn test_edge_versioned_creation() {
3657 let store = LpgStore::new();
3658
3659 let a = store.create_node(&["A"]);
3660 let b = store.create_node(&["B"]);
3661
3662 let epoch = store.new_epoch();
3663 let tx_id = TxId::new(1);
3664
3665 let edge_id = store.create_edge_versioned(a, b, "REL", epoch, tx_id);
3666 assert!(store.get_edge(edge_id).is_some());
3667 }
3668
3669 #[test]
3670 fn test_node_with_props_versioned() {
3671 let store = LpgStore::new();
3672
3673 let epoch = store.new_epoch();
3674 let tx_id = TxId::new(1);
3675
3676 let id = store.create_node_with_props_versioned(
3677 &["Person"],
3678 [("name", Value::from("Alice"))],
3679 epoch,
3680 tx_id,
3681 );
3682
3683 let node = store.get_node(id).unwrap();
3684 assert_eq!(
3685 node.get_property("name").and_then(|v| v.as_str()),
3686 Some("Alice")
3687 );
3688 }
3689
3690 #[test]
3691 fn test_discard_uncommitted_versions() {
3692 let store = LpgStore::new();
3693
3694 let epoch = store.new_epoch();
3695 let tx_id = TxId::new(42);
3696
3697 let node_id = store.create_node_versioned(&["Person"], epoch, tx_id);
3699 assert!(store.get_node(node_id).is_some());
3700
3701 store.discard_uncommitted_versions(tx_id);
3703
3704 assert!(store.get_node(node_id).is_none());
3706 }
3707
3708 #[test]
3711 fn test_property_index_create_and_lookup() {
3712 let store = LpgStore::new();
3713
3714 let alice = store.create_node(&["Person"]);
3716 let bob = store.create_node(&["Person"]);
3717 let charlie = store.create_node(&["Person"]);
3718
3719 store.set_node_property(alice, "city", Value::from("NYC"));
3720 store.set_node_property(bob, "city", Value::from("NYC"));
3721 store.set_node_property(charlie, "city", Value::from("LA"));
3722
3723 let nyc_people = store.find_nodes_by_property("city", &Value::from("NYC"));
3725 assert_eq!(nyc_people.len(), 2);
3726
3727 store.create_property_index("city");
3729 assert!(store.has_property_index("city"));
3730
3731 let nyc_people = store.find_nodes_by_property("city", &Value::from("NYC"));
3733 assert_eq!(nyc_people.len(), 2);
3734 assert!(nyc_people.contains(&alice));
3735 assert!(nyc_people.contains(&bob));
3736
3737 let la_people = store.find_nodes_by_property("city", &Value::from("LA"));
3738 assert_eq!(la_people.len(), 1);
3739 assert!(la_people.contains(&charlie));
3740 }
3741
3742 #[test]
3743 fn test_property_index_maintained_on_update() {
3744 let store = LpgStore::new();
3745
3746 store.create_property_index("status");
3748
3749 let node = store.create_node(&["Task"]);
3750 store.set_node_property(node, "status", Value::from("pending"));
3751
3752 let pending = store.find_nodes_by_property("status", &Value::from("pending"));
3754 assert_eq!(pending.len(), 1);
3755 assert!(pending.contains(&node));
3756
3757 store.set_node_property(node, "status", Value::from("done"));
3759
3760 let pending = store.find_nodes_by_property("status", &Value::from("pending"));
3762 assert!(pending.is_empty());
3763
3764 let done = store.find_nodes_by_property("status", &Value::from("done"));
3766 assert_eq!(done.len(), 1);
3767 assert!(done.contains(&node));
3768 }
3769
3770 #[test]
3771 fn test_property_index_maintained_on_remove() {
3772 let store = LpgStore::new();
3773
3774 store.create_property_index("tag");
3775
3776 let node = store.create_node(&["Item"]);
3777 store.set_node_property(node, "tag", Value::from("important"));
3778
3779 let found = store.find_nodes_by_property("tag", &Value::from("important"));
3781 assert_eq!(found.len(), 1);
3782
3783 store.remove_node_property(node, "tag");
3785
3786 let found = store.find_nodes_by_property("tag", &Value::from("important"));
3788 assert!(found.is_empty());
3789 }
3790
3791 #[test]
3792 fn test_property_index_drop() {
3793 let store = LpgStore::new();
3794
3795 store.create_property_index("key");
3796 assert!(store.has_property_index("key"));
3797
3798 assert!(store.drop_property_index("key"));
3799 assert!(!store.has_property_index("key"));
3800
3801 assert!(!store.drop_property_index("key"));
3803 }
3804
3805 #[test]
3806 fn test_property_index_multiple_values() {
3807 let store = LpgStore::new();
3808
3809 store.create_property_index("age");
3810
3811 let n1 = store.create_node(&["Person"]);
3813 let n2 = store.create_node(&["Person"]);
3814 let n3 = store.create_node(&["Person"]);
3815 let n4 = store.create_node(&["Person"]);
3816
3817 store.set_node_property(n1, "age", Value::from(25i64));
3818 store.set_node_property(n2, "age", Value::from(25i64));
3819 store.set_node_property(n3, "age", Value::from(30i64));
3820 store.set_node_property(n4, "age", Value::from(25i64));
3821
3822 let age_25 = store.find_nodes_by_property("age", &Value::from(25i64));
3823 assert_eq!(age_25.len(), 3);
3824
3825 let age_30 = store.find_nodes_by_property("age", &Value::from(30i64));
3826 assert_eq!(age_30.len(), 1);
3827
3828 let age_40 = store.find_nodes_by_property("age", &Value::from(40i64));
3829 assert!(age_40.is_empty());
3830 }
3831
3832 #[test]
3833 fn test_property_index_builds_from_existing_data() {
3834 let store = LpgStore::new();
3835
3836 let n1 = store.create_node(&["Person"]);
3838 let n2 = store.create_node(&["Person"]);
3839 store.set_node_property(n1, "email", Value::from("alice@example.com"));
3840 store.set_node_property(n2, "email", Value::from("bob@example.com"));
3841
3842 store.create_property_index("email");
3844
3845 let alice = store.find_nodes_by_property("email", &Value::from("alice@example.com"));
3847 assert_eq!(alice.len(), 1);
3848 assert!(alice.contains(&n1));
3849
3850 let bob = store.find_nodes_by_property("email", &Value::from("bob@example.com"));
3851 assert_eq!(bob.len(), 1);
3852 assert!(bob.contains(&n2));
3853 }
3854
3855 #[test]
3856 fn test_get_node_property_batch() {
3857 let store = LpgStore::new();
3858
3859 let n1 = store.create_node(&["Person"]);
3860 let n2 = store.create_node(&["Person"]);
3861 let n3 = store.create_node(&["Person"]);
3862
3863 store.set_node_property(n1, "age", Value::from(25i64));
3864 store.set_node_property(n2, "age", Value::from(30i64));
3865 let age_key = PropertyKey::new("age");
3868 let values = store.get_node_property_batch(&[n1, n2, n3], &age_key);
3869
3870 assert_eq!(values.len(), 3);
3871 assert_eq!(values[0], Some(Value::from(25i64)));
3872 assert_eq!(values[1], Some(Value::from(30i64)));
3873 assert_eq!(values[2], None);
3874 }
3875
3876 #[test]
3877 fn test_get_node_property_batch_empty() {
3878 let store = LpgStore::new();
3879 let key = PropertyKey::new("any");
3880
3881 let values = store.get_node_property_batch(&[], &key);
3882 assert!(values.is_empty());
3883 }
3884
3885 #[test]
3886 fn test_get_nodes_properties_batch() {
3887 let store = LpgStore::new();
3888
3889 let n1 = store.create_node(&["Person"]);
3890 let n2 = store.create_node(&["Person"]);
3891 let n3 = store.create_node(&["Person"]);
3892
3893 store.set_node_property(n1, "name", Value::from("Alice"));
3894 store.set_node_property(n1, "age", Value::from(25i64));
3895 store.set_node_property(n2, "name", Value::from("Bob"));
3896 let all_props = store.get_nodes_properties_batch(&[n1, n2, n3]);
3899
3900 assert_eq!(all_props.len(), 3);
3901 assert_eq!(all_props[0].len(), 2); assert_eq!(all_props[1].len(), 1); assert_eq!(all_props[2].len(), 0); assert_eq!(
3906 all_props[0].get(&PropertyKey::new("name")),
3907 Some(&Value::from("Alice"))
3908 );
3909 assert_eq!(
3910 all_props[1].get(&PropertyKey::new("name")),
3911 Some(&Value::from("Bob"))
3912 );
3913 }
3914
3915 #[test]
3916 fn test_get_nodes_properties_batch_empty() {
3917 let store = LpgStore::new();
3918
3919 let all_props = store.get_nodes_properties_batch(&[]);
3920 assert!(all_props.is_empty());
3921 }
3922
3923 #[test]
3924 fn test_get_nodes_properties_selective_batch() {
3925 let store = LpgStore::new();
3926
3927 let n1 = store.create_node(&["Person"]);
3928 let n2 = store.create_node(&["Person"]);
3929
3930 store.set_node_property(n1, "name", Value::from("Alice"));
3932 store.set_node_property(n1, "age", Value::from(25i64));
3933 store.set_node_property(n1, "email", Value::from("alice@example.com"));
3934 store.set_node_property(n2, "name", Value::from("Bob"));
3935 store.set_node_property(n2, "age", Value::from(30i64));
3936 store.set_node_property(n2, "city", Value::from("NYC"));
3937
3938 let keys = vec![PropertyKey::new("name"), PropertyKey::new("age")];
3940 let props = store.get_nodes_properties_selective_batch(&[n1, n2], &keys);
3941
3942 assert_eq!(props.len(), 2);
3943
3944 assert_eq!(props[0].len(), 2);
3946 assert_eq!(
3947 props[0].get(&PropertyKey::new("name")),
3948 Some(&Value::from("Alice"))
3949 );
3950 assert_eq!(
3951 props[0].get(&PropertyKey::new("age")),
3952 Some(&Value::from(25i64))
3953 );
3954 assert_eq!(props[0].get(&PropertyKey::new("email")), None);
3955
3956 assert_eq!(props[1].len(), 2);
3958 assert_eq!(
3959 props[1].get(&PropertyKey::new("name")),
3960 Some(&Value::from("Bob"))
3961 );
3962 assert_eq!(
3963 props[1].get(&PropertyKey::new("age")),
3964 Some(&Value::from(30i64))
3965 );
3966 assert_eq!(props[1].get(&PropertyKey::new("city")), None);
3967 }
3968
3969 #[test]
3970 fn test_get_nodes_properties_selective_batch_empty_keys() {
3971 let store = LpgStore::new();
3972
3973 let n1 = store.create_node(&["Person"]);
3974 store.set_node_property(n1, "name", Value::from("Alice"));
3975
3976 let props = store.get_nodes_properties_selective_batch(&[n1], &[]);
3978
3979 assert_eq!(props.len(), 1);
3980 assert!(props[0].is_empty()); }
3982
3983 #[test]
3984 fn test_get_nodes_properties_selective_batch_missing_keys() {
3985 let store = LpgStore::new();
3986
3987 let n1 = store.create_node(&["Person"]);
3988 store.set_node_property(n1, "name", Value::from("Alice"));
3989
3990 let keys = vec![PropertyKey::new("nonexistent"), PropertyKey::new("name")];
3992 let props = store.get_nodes_properties_selective_batch(&[n1], &keys);
3993
3994 assert_eq!(props.len(), 1);
3995 assert_eq!(props[0].len(), 1); assert_eq!(
3997 props[0].get(&PropertyKey::new("name")),
3998 Some(&Value::from("Alice"))
3999 );
4000 }
4001}