1use super::property::CompareOp;
13use super::{Edge, EdgeRecord, Node, NodeRecord, PropertyStorage};
14use crate::graph::Direction;
15use crate::index::adjacency::ChunkedAdjacency;
16use crate::index::zone_map::ZoneMapEntry;
17use crate::statistics::{EdgeTypeStatistics, LabelStatistics, Statistics};
18use arcstr::ArcStr;
19use dashmap::DashMap;
20#[cfg(not(feature = "tiered-storage"))]
21use grafeo_common::mvcc::VersionChain;
22use grafeo_common::types::{EdgeId, EpochId, HashableValue, NodeId, PropertyKey, TxId, Value};
23use grafeo_common::utils::hash::{FxHashMap, FxHashSet};
24use parking_lot::RwLock;
25use std::cmp::Ordering as CmpOrdering;
26#[cfg(feature = "tiered-storage")]
27use std::sync::Arc;
28use std::sync::atomic::{AtomicU64, Ordering};
29
30fn compare_values_for_range(a: &Value, b: &Value) -> Option<CmpOrdering> {
32 match (a, b) {
33 (Value::Int64(a), Value::Int64(b)) => Some(a.cmp(b)),
34 (Value::Float64(a), Value::Float64(b)) => a.partial_cmp(b),
35 (Value::String(a), Value::String(b)) => Some(a.cmp(b)),
36 (Value::Bool(a), Value::Bool(b)) => Some(a.cmp(b)),
37 _ => None,
38 }
39}
40
41fn value_in_range(
43 value: &Value,
44 min: Option<&Value>,
45 max: Option<&Value>,
46 min_inclusive: bool,
47 max_inclusive: bool,
48) -> bool {
49 if let Some(min_val) = min {
51 match compare_values_for_range(value, min_val) {
52 Some(CmpOrdering::Less) => return false,
53 Some(CmpOrdering::Equal) if !min_inclusive => return false,
54 None => return false, _ => {}
56 }
57 }
58
59 if let Some(max_val) = max {
61 match compare_values_for_range(value, max_val) {
62 Some(CmpOrdering::Greater) => return false,
63 Some(CmpOrdering::Equal) if !max_inclusive => return false,
64 None => return false,
65 _ => {}
66 }
67 }
68
69 true
70}
71
72#[cfg(feature = "tiered-storage")]
74use crate::storage::EpochStore;
75#[cfg(feature = "tiered-storage")]
76use grafeo_common::memory::arena::ArenaAllocator;
77#[cfg(feature = "tiered-storage")]
78use grafeo_common::mvcc::{ColdVersionRef, HotVersionRef, VersionIndex, VersionRef};
79
80#[derive(Debug, Clone)]
86pub struct LpgStoreConfig {
87 pub backward_edges: bool,
90 pub initial_node_capacity: usize,
92 pub initial_edge_capacity: usize,
94}
95
96impl Default for LpgStoreConfig {
97 fn default() -> Self {
98 Self {
99 backward_edges: true,
100 initial_node_capacity: 1024,
101 initial_edge_capacity: 4096,
102 }
103 }
104}
105
106pub struct LpgStore {
165 #[allow(dead_code)]
167 config: LpgStoreConfig,
168
169 #[cfg(not(feature = "tiered-storage"))]
173 nodes: RwLock<FxHashMap<NodeId, VersionChain<NodeRecord>>>,
174
175 #[cfg(not(feature = "tiered-storage"))]
179 edges: RwLock<FxHashMap<EdgeId, VersionChain<EdgeRecord>>>,
180
181 #[cfg(feature = "tiered-storage")]
185 arena_allocator: Arc<ArenaAllocator>,
186
187 #[cfg(feature = "tiered-storage")]
191 node_versions: RwLock<FxHashMap<NodeId, VersionIndex>>,
192
193 #[cfg(feature = "tiered-storage")]
197 edge_versions: RwLock<FxHashMap<EdgeId, VersionIndex>>,
198
199 #[cfg(feature = "tiered-storage")]
202 epoch_store: Arc<EpochStore>,
203
204 node_properties: PropertyStorage<NodeId>,
206
207 edge_properties: PropertyStorage<EdgeId>,
209
210 label_to_id: RwLock<FxHashMap<ArcStr, u32>>,
213
214 id_to_label: RwLock<Vec<ArcStr>>,
217
218 edge_type_to_id: RwLock<FxHashMap<ArcStr, u32>>,
221
222 id_to_edge_type: RwLock<Vec<ArcStr>>,
225
226 forward_adj: ChunkedAdjacency,
228
229 backward_adj: Option<ChunkedAdjacency>,
232
233 label_index: RwLock<Vec<FxHashMap<NodeId, ()>>>,
236
237 node_labels: RwLock<FxHashMap<NodeId, FxHashSet<u32>>>,
241
242 property_indexes: RwLock<FxHashMap<PropertyKey, DashMap<HashableValue, FxHashSet<NodeId>>>>,
248
249 next_node_id: AtomicU64,
251
252 next_edge_id: AtomicU64,
254
255 current_epoch: AtomicU64,
257
258 statistics: RwLock<Statistics>,
261}
262
263impl LpgStore {
264 #[must_use]
266 pub fn new() -> Self {
267 Self::with_config(LpgStoreConfig::default())
268 }
269
270 #[must_use]
272 pub fn with_config(config: LpgStoreConfig) -> Self {
273 let backward_adj = if config.backward_edges {
274 Some(ChunkedAdjacency::new())
275 } else {
276 None
277 };
278
279 Self {
280 #[cfg(not(feature = "tiered-storage"))]
281 nodes: RwLock::new(FxHashMap::default()),
282 #[cfg(not(feature = "tiered-storage"))]
283 edges: RwLock::new(FxHashMap::default()),
284 #[cfg(feature = "tiered-storage")]
285 arena_allocator: Arc::new(ArenaAllocator::new()),
286 #[cfg(feature = "tiered-storage")]
287 node_versions: RwLock::new(FxHashMap::default()),
288 #[cfg(feature = "tiered-storage")]
289 edge_versions: RwLock::new(FxHashMap::default()),
290 #[cfg(feature = "tiered-storage")]
291 epoch_store: Arc::new(EpochStore::new()),
292 node_properties: PropertyStorage::new(),
293 edge_properties: PropertyStorage::new(),
294 label_to_id: RwLock::new(FxHashMap::default()),
295 id_to_label: RwLock::new(Vec::new()),
296 edge_type_to_id: RwLock::new(FxHashMap::default()),
297 id_to_edge_type: RwLock::new(Vec::new()),
298 forward_adj: ChunkedAdjacency::new(),
299 backward_adj,
300 label_index: RwLock::new(Vec::new()),
301 node_labels: RwLock::new(FxHashMap::default()),
302 property_indexes: RwLock::new(FxHashMap::default()),
303 next_node_id: AtomicU64::new(0),
304 next_edge_id: AtomicU64::new(0),
305 current_epoch: AtomicU64::new(0),
306 statistics: RwLock::new(Statistics::new()),
307 config,
308 }
309 }
310
311 #[must_use]
313 pub fn current_epoch(&self) -> EpochId {
314 EpochId::new(self.current_epoch.load(Ordering::Acquire))
315 }
316
317 pub fn new_epoch(&self) -> EpochId {
319 let id = self.current_epoch.fetch_add(1, Ordering::AcqRel) + 1;
320 EpochId::new(id)
321 }
322
323 pub fn create_node(&self, labels: &[&str]) -> NodeId {
329 self.create_node_versioned(labels, self.current_epoch(), TxId::SYSTEM)
330 }
331
332 #[cfg(not(feature = "tiered-storage"))]
334 pub fn create_node_versioned(&self, labels: &[&str], epoch: EpochId, tx_id: TxId) -> NodeId {
335 let id = NodeId::new(self.next_node_id.fetch_add(1, Ordering::Relaxed));
336
337 let mut record = NodeRecord::new(id, epoch);
338 record.set_label_count(labels.len() as u16);
339
340 let mut node_label_set = FxHashSet::default();
342 for label in labels {
343 let label_id = self.get_or_create_label_id(*label);
344 node_label_set.insert(label_id);
345
346 let mut index = self.label_index.write();
348 while index.len() <= label_id as usize {
349 index.push(FxHashMap::default());
350 }
351 index[label_id as usize].insert(id, ());
352 }
353
354 self.node_labels.write().insert(id, node_label_set);
356
357 let chain = VersionChain::with_initial(record, epoch, tx_id);
359 self.nodes.write().insert(id, chain);
360 id
361 }
362
363 #[cfg(feature = "tiered-storage")]
366 pub fn create_node_versioned(&self, labels: &[&str], epoch: EpochId, tx_id: TxId) -> NodeId {
367 let id = NodeId::new(self.next_node_id.fetch_add(1, Ordering::Relaxed));
368
369 let mut record = NodeRecord::new(id, epoch);
370 record.set_label_count(labels.len() as u16);
371
372 let mut node_label_set = FxHashSet::default();
374 for label in labels {
375 let label_id = self.get_or_create_label_id(*label);
376 node_label_set.insert(label_id);
377
378 let mut index = self.label_index.write();
380 while index.len() <= label_id as usize {
381 index.push(FxHashMap::default());
382 }
383 index[label_id as usize].insert(id, ());
384 }
385
386 self.node_labels.write().insert(id, node_label_set);
388
389 let arena = self.arena_allocator.arena_or_create(epoch);
391 let (offset, _stored) = arena.alloc_value_with_offset(record);
392
393 let hot_ref = HotVersionRef::new(epoch, offset, tx_id);
395
396 let mut versions = self.node_versions.write();
398 if let Some(index) = versions.get_mut(&id) {
399 index.add_hot(hot_ref);
400 } else {
401 versions.insert(id, VersionIndex::with_initial(hot_ref));
402 }
403
404 id
405 }
406
407 pub fn create_node_with_props(
409 &self,
410 labels: &[&str],
411 properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
412 ) -> NodeId {
413 self.create_node_with_props_versioned(
414 labels,
415 properties,
416 self.current_epoch(),
417 TxId::SYSTEM,
418 )
419 }
420
421 #[cfg(not(feature = "tiered-storage"))]
423 pub fn create_node_with_props_versioned(
424 &self,
425 labels: &[&str],
426 properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
427 epoch: EpochId,
428 tx_id: TxId,
429 ) -> NodeId {
430 let id = self.create_node_versioned(labels, epoch, tx_id);
431
432 for (key, value) in properties {
433 self.node_properties.set(id, key.into(), value.into());
434 }
435
436 let count = self.node_properties.get_all(id).len() as u16;
438 if let Some(chain) = self.nodes.write().get_mut(&id) {
439 if let Some(record) = chain.latest_mut() {
440 record.props_count = count;
441 }
442 }
443
444 id
445 }
446
447 #[cfg(feature = "tiered-storage")]
450 pub fn create_node_with_props_versioned(
451 &self,
452 labels: &[&str],
453 properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
454 epoch: EpochId,
455 tx_id: TxId,
456 ) -> NodeId {
457 let id = self.create_node_versioned(labels, epoch, tx_id);
458
459 for (key, value) in properties {
460 self.node_properties.set(id, key.into(), value.into());
461 }
462
463 id
467 }
468
469 #[must_use]
471 pub fn get_node(&self, id: NodeId) -> Option<Node> {
472 self.get_node_at_epoch(id, self.current_epoch())
473 }
474
475 #[must_use]
477 #[cfg(not(feature = "tiered-storage"))]
478 pub fn get_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> Option<Node> {
479 let nodes = self.nodes.read();
480 let chain = nodes.get(&id)?;
481 let record = chain.visible_at(epoch)?;
482
483 if record.is_deleted() {
484 return None;
485 }
486
487 let mut node = Node::new(id);
488
489 let id_to_label = self.id_to_label.read();
491 let node_labels = self.node_labels.read();
492 if let Some(label_ids) = node_labels.get(&id) {
493 for &label_id in label_ids {
494 if let Some(label) = id_to_label.get(label_id as usize) {
495 node.labels.push(label.clone());
496 }
497 }
498 }
499
500 node.properties = self.node_properties.get_all(id).into_iter().collect();
502
503 Some(node)
504 }
505
506 #[must_use]
509 #[cfg(feature = "tiered-storage")]
510 pub fn get_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> Option<Node> {
511 let versions = self.node_versions.read();
512 let index = versions.get(&id)?;
513 let version_ref = index.visible_at(epoch)?;
514
515 let record = self.read_node_record(&version_ref)?;
517
518 if record.is_deleted() {
519 return None;
520 }
521
522 let mut node = Node::new(id);
523
524 let id_to_label = self.id_to_label.read();
526 let node_labels = self.node_labels.read();
527 if let Some(label_ids) = node_labels.get(&id) {
528 for &label_id in label_ids {
529 if let Some(label) = id_to_label.get(label_id as usize) {
530 node.labels.push(label.clone());
531 }
532 }
533 }
534
535 node.properties = self.node_properties.get_all(id).into_iter().collect();
537
538 Some(node)
539 }
540
541 #[must_use]
543 #[cfg(not(feature = "tiered-storage"))]
544 pub fn get_node_versioned(&self, id: NodeId, epoch: EpochId, tx_id: TxId) -> Option<Node> {
545 let nodes = self.nodes.read();
546 let chain = nodes.get(&id)?;
547 let record = chain.visible_to(epoch, tx_id)?;
548
549 if record.is_deleted() {
550 return None;
551 }
552
553 let mut node = Node::new(id);
554
555 let id_to_label = self.id_to_label.read();
557 let node_labels = self.node_labels.read();
558 if let Some(label_ids) = node_labels.get(&id) {
559 for &label_id in label_ids {
560 if let Some(label) = id_to_label.get(label_id as usize) {
561 node.labels.push(label.clone());
562 }
563 }
564 }
565
566 node.properties = self.node_properties.get_all(id).into_iter().collect();
568
569 Some(node)
570 }
571
572 #[must_use]
575 #[cfg(feature = "tiered-storage")]
576 pub fn get_node_versioned(&self, id: NodeId, epoch: EpochId, tx_id: TxId) -> Option<Node> {
577 let versions = self.node_versions.read();
578 let index = versions.get(&id)?;
579 let version_ref = index.visible_to(epoch, tx_id)?;
580
581 let record = self.read_node_record(&version_ref)?;
583
584 if record.is_deleted() {
585 return None;
586 }
587
588 let mut node = Node::new(id);
589
590 let id_to_label = self.id_to_label.read();
592 let node_labels = self.node_labels.read();
593 if let Some(label_ids) = node_labels.get(&id) {
594 for &label_id in label_ids {
595 if let Some(label) = id_to_label.get(label_id as usize) {
596 node.labels.push(label.clone());
597 }
598 }
599 }
600
601 node.properties = self.node_properties.get_all(id).into_iter().collect();
603
604 Some(node)
605 }
606
607 #[cfg(feature = "tiered-storage")]
609 #[allow(unsafe_code)]
610 fn read_node_record(&self, version_ref: &VersionRef) -> Option<NodeRecord> {
611 match version_ref {
612 VersionRef::Hot(hot_ref) => {
613 let arena = self.arena_allocator.arena(hot_ref.epoch);
614 let record: &NodeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
616 Some(record.clone())
617 }
618 VersionRef::Cold(cold_ref) => {
619 self.epoch_store
621 .get_node(cold_ref.epoch, cold_ref.block_offset, cold_ref.length)
622 }
623 }
624 }
625
626 pub fn delete_node(&self, id: NodeId) -> bool {
628 self.delete_node_at_epoch(id, self.current_epoch())
629 }
630
631 #[cfg(not(feature = "tiered-storage"))]
633 pub fn delete_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> bool {
634 let mut nodes = self.nodes.write();
635 if let Some(chain) = nodes.get_mut(&id) {
636 if let Some(record) = chain.visible_at(epoch) {
638 if record.is_deleted() {
639 return false;
640 }
641 } else {
642 return false;
644 }
645
646 chain.mark_deleted(epoch);
648
649 let mut index = self.label_index.write();
651 let mut node_labels = self.node_labels.write();
652 if let Some(label_ids) = node_labels.remove(&id) {
653 for label_id in label_ids {
654 if let Some(set) = index.get_mut(label_id as usize) {
655 set.remove(&id);
656 }
657 }
658 }
659
660 drop(nodes); drop(index);
663 drop(node_labels);
664 self.node_properties.remove_all(id);
665
666 true
669 } else {
670 false
671 }
672 }
673
674 #[cfg(feature = "tiered-storage")]
677 pub fn delete_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> bool {
678 let mut versions = self.node_versions.write();
679 if let Some(index) = versions.get_mut(&id) {
680 if let Some(version_ref) = index.visible_at(epoch) {
682 if let Some(record) = self.read_node_record(&version_ref) {
683 if record.is_deleted() {
684 return false;
685 }
686 } else {
687 return false;
688 }
689 } else {
690 return false;
691 }
692
693 index.mark_deleted(epoch);
695
696 let mut label_index = self.label_index.write();
698 let mut node_labels = self.node_labels.write();
699 if let Some(label_ids) = node_labels.remove(&id) {
700 for label_id in label_ids {
701 if let Some(set) = label_index.get_mut(label_id as usize) {
702 set.remove(&id);
703 }
704 }
705 }
706
707 drop(versions);
709 drop(label_index);
710 drop(node_labels);
711 self.node_properties.remove_all(id);
712
713 true
714 } else {
715 false
716 }
717 }
718
719 #[cfg(not(feature = "tiered-storage"))]
724 pub fn delete_node_edges(&self, node_id: NodeId) {
725 let outgoing: Vec<EdgeId> = self
727 .forward_adj
728 .edges_from(node_id)
729 .into_iter()
730 .map(|(_, edge_id)| edge_id)
731 .collect();
732
733 let incoming: Vec<EdgeId> = if let Some(ref backward) = self.backward_adj {
735 backward
736 .edges_from(node_id)
737 .into_iter()
738 .map(|(_, edge_id)| edge_id)
739 .collect()
740 } else {
741 let epoch = self.current_epoch();
743 self.edges
744 .read()
745 .iter()
746 .filter_map(|(id, chain)| {
747 chain.visible_at(epoch).and_then(|r| {
748 if !r.is_deleted() && r.dst == node_id {
749 Some(*id)
750 } else {
751 None
752 }
753 })
754 })
755 .collect()
756 };
757
758 for edge_id in outgoing.into_iter().chain(incoming) {
760 self.delete_edge(edge_id);
761 }
762 }
763
764 #[cfg(feature = "tiered-storage")]
767 pub fn delete_node_edges(&self, node_id: NodeId) {
768 let outgoing: Vec<EdgeId> = self
770 .forward_adj
771 .edges_from(node_id)
772 .into_iter()
773 .map(|(_, edge_id)| edge_id)
774 .collect();
775
776 let incoming: Vec<EdgeId> = if let Some(ref backward) = self.backward_adj {
778 backward
779 .edges_from(node_id)
780 .into_iter()
781 .map(|(_, edge_id)| edge_id)
782 .collect()
783 } else {
784 let epoch = self.current_epoch();
786 let versions = self.edge_versions.read();
787 versions
788 .iter()
789 .filter_map(|(id, index)| {
790 index.visible_at(epoch).and_then(|vref| {
791 self.read_edge_record(&vref).and_then(|r| {
792 if !r.is_deleted() && r.dst == node_id {
793 Some(*id)
794 } else {
795 None
796 }
797 })
798 })
799 })
800 .collect()
801 };
802
803 for edge_id in outgoing.into_iter().chain(incoming) {
805 self.delete_edge(edge_id);
806 }
807 }
808
809 #[cfg(not(feature = "tiered-storage"))]
811 pub fn set_node_property(&self, id: NodeId, key: &str, value: Value) {
812 let prop_key: PropertyKey = key.into();
813
814 self.update_property_index_on_set(id, &prop_key, &value);
816
817 self.node_properties.set(id, prop_key, value);
818
819 let count = self.node_properties.get_all(id).len() as u16;
821 if let Some(chain) = self.nodes.write().get_mut(&id) {
822 if let Some(record) = chain.latest_mut() {
823 record.props_count = count;
824 }
825 }
826 }
827
828 #[cfg(feature = "tiered-storage")]
831 pub fn set_node_property(&self, id: NodeId, key: &str, value: Value) {
832 let prop_key: PropertyKey = key.into();
833
834 self.update_property_index_on_set(id, &prop_key, &value);
836
837 self.node_properties.set(id, prop_key, value);
838 }
842
843 pub fn set_edge_property(&self, id: EdgeId, key: &str, value: Value) {
845 self.edge_properties.set(id, key.into(), value);
846 }
847
848 #[cfg(not(feature = "tiered-storage"))]
852 pub fn remove_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
853 let prop_key: PropertyKey = key.into();
854
855 self.update_property_index_on_remove(id, &prop_key);
857
858 let result = self.node_properties.remove(id, &prop_key);
859
860 let count = self.node_properties.get_all(id).len() as u16;
862 if let Some(chain) = self.nodes.write().get_mut(&id) {
863 if let Some(record) = chain.latest_mut() {
864 record.props_count = count;
865 }
866 }
867
868 result
869 }
870
871 #[cfg(feature = "tiered-storage")]
874 pub fn remove_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
875 let prop_key: PropertyKey = key.into();
876
877 self.update_property_index_on_remove(id, &prop_key);
879
880 self.node_properties.remove(id, &prop_key)
881 }
883
884 pub fn remove_edge_property(&self, id: EdgeId, key: &str) -> Option<Value> {
888 self.edge_properties.remove(id, &key.into())
889 }
890
891 #[must_use]
906 pub fn get_node_property(&self, id: NodeId, key: &PropertyKey) -> Option<Value> {
907 self.node_properties.get(id, key)
908 }
909
910 #[must_use]
914 pub fn get_edge_property(&self, id: EdgeId, key: &PropertyKey) -> Option<Value> {
915 self.edge_properties.get(id, key)
916 }
917
918 #[must_use]
941 pub fn get_node_property_batch(&self, ids: &[NodeId], key: &PropertyKey) -> Vec<Option<Value>> {
942 self.node_properties.get_batch(ids, key)
943 }
944
945 #[must_use]
950 pub fn get_nodes_properties_batch(&self, ids: &[NodeId]) -> Vec<FxHashMap<PropertyKey, Value>> {
951 self.node_properties.get_all_batch(ids)
952 }
953
954 #[must_use]
990 pub fn find_nodes_in_range(
991 &self,
992 property: &str,
993 min: Option<&Value>,
994 max: Option<&Value>,
995 min_inclusive: bool,
996 max_inclusive: bool,
997 ) -> Vec<NodeId> {
998 let key = PropertyKey::new(property);
999
1000 if !self
1002 .node_properties
1003 .might_match_range(&key, min, max, min_inclusive, max_inclusive)
1004 {
1005 return Vec::new();
1006 }
1007
1008 self.node_ids()
1010 .into_iter()
1011 .filter(|&node_id| {
1012 self.node_properties
1013 .get(node_id, &key)
1014 .is_some_and(|v| value_in_range(&v, min, max, min_inclusive, max_inclusive))
1015 })
1016 .collect()
1017 }
1018
1019 #[must_use]
1044 pub fn find_nodes_by_properties(&self, conditions: &[(&str, Value)]) -> Vec<NodeId> {
1045 if conditions.is_empty() {
1046 return self.node_ids();
1047 }
1048
1049 let mut best_start: Option<(usize, Vec<NodeId>)> = None;
1052 let indexes = self.property_indexes.read();
1053
1054 for (i, (prop, value)) in conditions.iter().enumerate() {
1055 let key = PropertyKey::new(*prop);
1056 let hv = HashableValue::new(value.clone());
1057
1058 if let Some(index) = indexes.get(&key) {
1059 let matches: Vec<NodeId> = index
1060 .get(&hv)
1061 .map(|nodes| nodes.iter().copied().collect())
1062 .unwrap_or_default();
1063
1064 if matches.is_empty() {
1066 return Vec::new();
1067 }
1068
1069 if best_start
1071 .as_ref()
1072 .is_none_or(|(_, best)| matches.len() < best.len())
1073 {
1074 best_start = Some((i, matches));
1075 }
1076 }
1077 }
1078 drop(indexes);
1079
1080 let (start_idx, mut candidates) = best_start.unwrap_or_else(|| {
1082 let (prop, value) = &conditions[0];
1084 (0, self.find_nodes_by_property(prop, value))
1085 });
1086
1087 for (i, (prop, value)) in conditions.iter().enumerate() {
1089 if i == start_idx {
1090 continue;
1091 }
1092
1093 let key = PropertyKey::new(*prop);
1094 candidates.retain(|&node_id| {
1095 self.node_properties
1096 .get(node_id, &key)
1097 .is_some_and(|v| v == *value)
1098 });
1099
1100 if candidates.is_empty() {
1102 return Vec::new();
1103 }
1104 }
1105
1106 candidates
1107 }
1108
1109 pub fn create_property_index(&self, property: &str) {
1137 let key = PropertyKey::new(property);
1138
1139 let mut indexes = self.property_indexes.write();
1140 if indexes.contains_key(&key) {
1141 return; }
1143
1144 let index: DashMap<HashableValue, FxHashSet<NodeId>> = DashMap::new();
1146
1147 for node_id in self.node_ids() {
1149 if let Some(value) = self.node_properties.get(node_id, &key) {
1150 let hv = HashableValue::new(value);
1151 index
1152 .entry(hv)
1153 .or_insert_with(FxHashSet::default)
1154 .insert(node_id);
1155 }
1156 }
1157
1158 indexes.insert(key, index);
1159 }
1160
1161 pub fn drop_property_index(&self, property: &str) -> bool {
1165 let key = PropertyKey::new(property);
1166 self.property_indexes.write().remove(&key).is_some()
1167 }
1168
1169 #[must_use]
1171 pub fn has_property_index(&self, property: &str) -> bool {
1172 let key = PropertyKey::new(property);
1173 self.property_indexes.read().contains_key(&key)
1174 }
1175
1176 #[must_use]
1199 pub fn find_nodes_by_property(&self, property: &str, value: &Value) -> Vec<NodeId> {
1200 let key = PropertyKey::new(property);
1201 let hv = HashableValue::new(value.clone());
1202
1203 let indexes = self.property_indexes.read();
1205 if let Some(index) = indexes.get(&key) {
1206 if let Some(nodes) = index.get(&hv) {
1207 return nodes.iter().copied().collect();
1208 }
1209 return Vec::new();
1210 }
1211 drop(indexes);
1212
1213 self.node_ids()
1215 .into_iter()
1216 .filter(|&node_id| {
1217 self.node_properties
1218 .get(node_id, &key)
1219 .is_some_and(|v| v == *value)
1220 })
1221 .collect()
1222 }
1223
1224 fn update_property_index_on_set(&self, node_id: NodeId, key: &PropertyKey, new_value: &Value) {
1226 let indexes = self.property_indexes.read();
1227 if let Some(index) = indexes.get(key) {
1228 if let Some(old_value) = self.node_properties.get(node_id, key) {
1230 let old_hv = HashableValue::new(old_value);
1231 if let Some(mut nodes) = index.get_mut(&old_hv) {
1232 nodes.remove(&node_id);
1233 if nodes.is_empty() {
1234 drop(nodes);
1235 index.remove(&old_hv);
1236 }
1237 }
1238 }
1239
1240 let new_hv = HashableValue::new(new_value.clone());
1242 index
1243 .entry(new_hv)
1244 .or_insert_with(FxHashSet::default)
1245 .insert(node_id);
1246 }
1247 }
1248
1249 fn update_property_index_on_remove(&self, node_id: NodeId, key: &PropertyKey) {
1251 let indexes = self.property_indexes.read();
1252 if let Some(index) = indexes.get(key) {
1253 if let Some(old_value) = self.node_properties.get(node_id, key) {
1255 let old_hv = HashableValue::new(old_value);
1256 if let Some(mut nodes) = index.get_mut(&old_hv) {
1257 nodes.remove(&node_id);
1258 if nodes.is_empty() {
1259 drop(nodes);
1260 index.remove(&old_hv);
1261 }
1262 }
1263 }
1264 }
1265 }
1266
1267 #[cfg(not(feature = "tiered-storage"))]
1272 pub fn add_label(&self, node_id: NodeId, label: &str) -> bool {
1273 let epoch = self.current_epoch();
1274
1275 let nodes = self.nodes.read();
1277 if let Some(chain) = nodes.get(&node_id) {
1278 if chain.visible_at(epoch).map_or(true, |r| r.is_deleted()) {
1279 return false;
1280 }
1281 } else {
1282 return false;
1283 }
1284 drop(nodes);
1285
1286 let label_id = self.get_or_create_label_id(label);
1288
1289 let mut node_labels = self.node_labels.write();
1291 let label_set = node_labels
1292 .entry(node_id)
1293 .or_insert_with(FxHashSet::default);
1294
1295 if label_set.contains(&label_id) {
1296 return false; }
1298
1299 label_set.insert(label_id);
1300 drop(node_labels);
1301
1302 let mut index = self.label_index.write();
1304 if (label_id as usize) >= index.len() {
1305 index.resize(label_id as usize + 1, FxHashMap::default());
1306 }
1307 index[label_id as usize].insert(node_id, ());
1308
1309 if let Some(chain) = self.nodes.write().get_mut(&node_id) {
1311 if let Some(record) = chain.latest_mut() {
1312 let count = self.node_labels.read().get(&node_id).map_or(0, |s| s.len());
1313 record.set_label_count(count as u16);
1314 }
1315 }
1316
1317 true
1318 }
1319
1320 #[cfg(feature = "tiered-storage")]
1323 pub fn add_label(&self, node_id: NodeId, label: &str) -> bool {
1324 let epoch = self.current_epoch();
1325
1326 let versions = self.node_versions.read();
1328 if let Some(index) = versions.get(&node_id) {
1329 if let Some(vref) = index.visible_at(epoch) {
1330 if let Some(record) = self.read_node_record(&vref) {
1331 if record.is_deleted() {
1332 return false;
1333 }
1334 } else {
1335 return false;
1336 }
1337 } else {
1338 return false;
1339 }
1340 } else {
1341 return false;
1342 }
1343 drop(versions);
1344
1345 let label_id = self.get_or_create_label_id(label);
1347
1348 let mut node_labels = self.node_labels.write();
1350 let label_set = node_labels
1351 .entry(node_id)
1352 .or_insert_with(FxHashSet::default);
1353
1354 if label_set.contains(&label_id) {
1355 return false; }
1357
1358 label_set.insert(label_id);
1359 drop(node_labels);
1360
1361 let mut index = self.label_index.write();
1363 if (label_id as usize) >= index.len() {
1364 index.resize(label_id as usize + 1, FxHashMap::default());
1365 }
1366 index[label_id as usize].insert(node_id, ());
1367
1368 true
1372 }
1373
1374 #[cfg(not(feature = "tiered-storage"))]
1379 pub fn remove_label(&self, node_id: NodeId, label: &str) -> bool {
1380 let epoch = self.current_epoch();
1381
1382 let nodes = self.nodes.read();
1384 if let Some(chain) = nodes.get(&node_id) {
1385 if chain.visible_at(epoch).map_or(true, |r| r.is_deleted()) {
1386 return false;
1387 }
1388 } else {
1389 return false;
1390 }
1391 drop(nodes);
1392
1393 let label_id = {
1395 let label_ids = self.label_to_id.read();
1396 match label_ids.get(label) {
1397 Some(&id) => id,
1398 None => return false, }
1400 };
1401
1402 let mut node_labels = self.node_labels.write();
1404 if let Some(label_set) = node_labels.get_mut(&node_id) {
1405 if !label_set.remove(&label_id) {
1406 return false; }
1408 } else {
1409 return false;
1410 }
1411 drop(node_labels);
1412
1413 let mut index = self.label_index.write();
1415 if (label_id as usize) < index.len() {
1416 index[label_id as usize].remove(&node_id);
1417 }
1418
1419 if let Some(chain) = self.nodes.write().get_mut(&node_id) {
1421 if let Some(record) = chain.latest_mut() {
1422 let count = self.node_labels.read().get(&node_id).map_or(0, |s| s.len());
1423 record.set_label_count(count as u16);
1424 }
1425 }
1426
1427 true
1428 }
1429
1430 #[cfg(feature = "tiered-storage")]
1433 pub fn remove_label(&self, node_id: NodeId, label: &str) -> bool {
1434 let epoch = self.current_epoch();
1435
1436 let versions = self.node_versions.read();
1438 if let Some(index) = versions.get(&node_id) {
1439 if let Some(vref) = index.visible_at(epoch) {
1440 if let Some(record) = self.read_node_record(&vref) {
1441 if record.is_deleted() {
1442 return false;
1443 }
1444 } else {
1445 return false;
1446 }
1447 } else {
1448 return false;
1449 }
1450 } else {
1451 return false;
1452 }
1453 drop(versions);
1454
1455 let label_id = {
1457 let label_ids = self.label_to_id.read();
1458 match label_ids.get(label) {
1459 Some(&id) => id,
1460 None => return false, }
1462 };
1463
1464 let mut node_labels = self.node_labels.write();
1466 if let Some(label_set) = node_labels.get_mut(&node_id) {
1467 if !label_set.remove(&label_id) {
1468 return false; }
1470 } else {
1471 return false;
1472 }
1473 drop(node_labels);
1474
1475 let mut index = self.label_index.write();
1477 if (label_id as usize) < index.len() {
1478 index[label_id as usize].remove(&node_id);
1479 }
1480
1481 true
1484 }
1485
1486 #[must_use]
1488 #[cfg(not(feature = "tiered-storage"))]
1489 pub fn node_count(&self) -> usize {
1490 let epoch = self.current_epoch();
1491 self.nodes
1492 .read()
1493 .values()
1494 .filter_map(|chain| chain.visible_at(epoch))
1495 .filter(|r| !r.is_deleted())
1496 .count()
1497 }
1498
1499 #[must_use]
1502 #[cfg(feature = "tiered-storage")]
1503 pub fn node_count(&self) -> usize {
1504 let epoch = self.current_epoch();
1505 let versions = self.node_versions.read();
1506 versions
1507 .iter()
1508 .filter(|(_, index)| {
1509 index.visible_at(epoch).map_or(false, |vref| {
1510 self.read_node_record(&vref)
1511 .map_or(false, |r| !r.is_deleted())
1512 })
1513 })
1514 .count()
1515 }
1516
1517 #[must_use]
1523 #[cfg(not(feature = "tiered-storage"))]
1524 pub fn node_ids(&self) -> Vec<NodeId> {
1525 let epoch = self.current_epoch();
1526 let mut ids: Vec<NodeId> = self
1527 .nodes
1528 .read()
1529 .iter()
1530 .filter_map(|(id, chain)| {
1531 chain
1532 .visible_at(epoch)
1533 .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
1534 })
1535 .collect();
1536 ids.sort_unstable();
1537 ids
1538 }
1539
1540 #[must_use]
1543 #[cfg(feature = "tiered-storage")]
1544 pub fn node_ids(&self) -> Vec<NodeId> {
1545 let epoch = self.current_epoch();
1546 let versions = self.node_versions.read();
1547 let mut ids: Vec<NodeId> = versions
1548 .iter()
1549 .filter_map(|(id, index)| {
1550 index.visible_at(epoch).and_then(|vref| {
1551 self.read_node_record(&vref)
1552 .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
1553 })
1554 })
1555 .collect();
1556 ids.sort_unstable();
1557 ids
1558 }
1559
1560 pub fn create_edge(&self, src: NodeId, dst: NodeId, edge_type: &str) -> EdgeId {
1564 self.create_edge_versioned(src, dst, edge_type, self.current_epoch(), TxId::SYSTEM)
1565 }
1566
1567 #[cfg(not(feature = "tiered-storage"))]
1569 pub fn create_edge_versioned(
1570 &self,
1571 src: NodeId,
1572 dst: NodeId,
1573 edge_type: &str,
1574 epoch: EpochId,
1575 tx_id: TxId,
1576 ) -> EdgeId {
1577 let id = EdgeId::new(self.next_edge_id.fetch_add(1, Ordering::Relaxed));
1578 let type_id = self.get_or_create_edge_type_id(edge_type);
1579
1580 let record = EdgeRecord::new(id, src, dst, type_id, epoch);
1581 let chain = VersionChain::with_initial(record, epoch, tx_id);
1582 self.edges.write().insert(id, chain);
1583
1584 self.forward_adj.add_edge(src, dst, id);
1586 if let Some(ref backward) = self.backward_adj {
1587 backward.add_edge(dst, src, id);
1588 }
1589
1590 id
1591 }
1592
1593 #[cfg(feature = "tiered-storage")]
1596 pub fn create_edge_versioned(
1597 &self,
1598 src: NodeId,
1599 dst: NodeId,
1600 edge_type: &str,
1601 epoch: EpochId,
1602 tx_id: TxId,
1603 ) -> EdgeId {
1604 let id = EdgeId::new(self.next_edge_id.fetch_add(1, Ordering::Relaxed));
1605 let type_id = self.get_or_create_edge_type_id(edge_type);
1606
1607 let record = EdgeRecord::new(id, src, dst, type_id, epoch);
1608
1609 let arena = self.arena_allocator.arena_or_create(epoch);
1611 let (offset, _stored) = arena.alloc_value_with_offset(record);
1612
1613 let hot_ref = HotVersionRef::new(epoch, offset, tx_id);
1615
1616 let mut versions = self.edge_versions.write();
1618 if let Some(index) = versions.get_mut(&id) {
1619 index.add_hot(hot_ref);
1620 } else {
1621 versions.insert(id, VersionIndex::with_initial(hot_ref));
1622 }
1623
1624 self.forward_adj.add_edge(src, dst, id);
1626 if let Some(ref backward) = self.backward_adj {
1627 backward.add_edge(dst, src, id);
1628 }
1629
1630 id
1631 }
1632
1633 pub fn create_edge_with_props(
1635 &self,
1636 src: NodeId,
1637 dst: NodeId,
1638 edge_type: &str,
1639 properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
1640 ) -> EdgeId {
1641 let id = self.create_edge(src, dst, edge_type);
1642
1643 for (key, value) in properties {
1644 self.edge_properties.set(id, key.into(), value.into());
1645 }
1646
1647 id
1648 }
1649
1650 #[must_use]
1652 pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
1653 self.get_edge_at_epoch(id, self.current_epoch())
1654 }
1655
1656 #[must_use]
1658 #[cfg(not(feature = "tiered-storage"))]
1659 pub fn get_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> Option<Edge> {
1660 let edges = self.edges.read();
1661 let chain = edges.get(&id)?;
1662 let record = chain.visible_at(epoch)?;
1663
1664 if record.is_deleted() {
1665 return None;
1666 }
1667
1668 let edge_type = {
1669 let id_to_type = self.id_to_edge_type.read();
1670 id_to_type.get(record.type_id as usize)?.clone()
1671 };
1672
1673 let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1674
1675 edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1677
1678 Some(edge)
1679 }
1680
1681 #[must_use]
1684 #[cfg(feature = "tiered-storage")]
1685 pub fn get_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> Option<Edge> {
1686 let versions = self.edge_versions.read();
1687 let index = versions.get(&id)?;
1688 let version_ref = index.visible_at(epoch)?;
1689
1690 let record = self.read_edge_record(&version_ref)?;
1691
1692 if record.is_deleted() {
1693 return None;
1694 }
1695
1696 let edge_type = {
1697 let id_to_type = self.id_to_edge_type.read();
1698 id_to_type.get(record.type_id as usize)?.clone()
1699 };
1700
1701 let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1702
1703 edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1705
1706 Some(edge)
1707 }
1708
1709 #[must_use]
1711 #[cfg(not(feature = "tiered-storage"))]
1712 pub fn get_edge_versioned(&self, id: EdgeId, epoch: EpochId, tx_id: TxId) -> Option<Edge> {
1713 let edges = self.edges.read();
1714 let chain = edges.get(&id)?;
1715 let record = chain.visible_to(epoch, tx_id)?;
1716
1717 if record.is_deleted() {
1718 return None;
1719 }
1720
1721 let edge_type = {
1722 let id_to_type = self.id_to_edge_type.read();
1723 id_to_type.get(record.type_id as usize)?.clone()
1724 };
1725
1726 let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1727
1728 edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1730
1731 Some(edge)
1732 }
1733
1734 #[must_use]
1737 #[cfg(feature = "tiered-storage")]
1738 pub fn get_edge_versioned(&self, id: EdgeId, epoch: EpochId, tx_id: TxId) -> Option<Edge> {
1739 let versions = self.edge_versions.read();
1740 let index = versions.get(&id)?;
1741 let version_ref = index.visible_to(epoch, tx_id)?;
1742
1743 let record = self.read_edge_record(&version_ref)?;
1744
1745 if record.is_deleted() {
1746 return None;
1747 }
1748
1749 let edge_type = {
1750 let id_to_type = self.id_to_edge_type.read();
1751 id_to_type.get(record.type_id as usize)?.clone()
1752 };
1753
1754 let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1755
1756 edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1758
1759 Some(edge)
1760 }
1761
1762 #[cfg(feature = "tiered-storage")]
1764 #[allow(unsafe_code)]
1765 fn read_edge_record(&self, version_ref: &VersionRef) -> Option<EdgeRecord> {
1766 match version_ref {
1767 VersionRef::Hot(hot_ref) => {
1768 let arena = self.arena_allocator.arena(hot_ref.epoch);
1769 let record: &EdgeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
1771 Some(record.clone())
1772 }
1773 VersionRef::Cold(cold_ref) => {
1774 self.epoch_store
1776 .get_edge(cold_ref.epoch, cold_ref.block_offset, cold_ref.length)
1777 }
1778 }
1779 }
1780
1781 pub fn delete_edge(&self, id: EdgeId) -> bool {
1783 self.delete_edge_at_epoch(id, self.current_epoch())
1784 }
1785
1786 #[cfg(not(feature = "tiered-storage"))]
1788 pub fn delete_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> bool {
1789 let mut edges = self.edges.write();
1790 if let Some(chain) = edges.get_mut(&id) {
1791 let (src, dst) = {
1793 match chain.visible_at(epoch) {
1794 Some(record) => {
1795 if record.is_deleted() {
1796 return false;
1797 }
1798 (record.src, record.dst)
1799 }
1800 None => return false, }
1802 };
1803
1804 chain.mark_deleted(epoch);
1806
1807 drop(edges); self.forward_adj.mark_deleted(src, id);
1811 if let Some(ref backward) = self.backward_adj {
1812 backward.mark_deleted(dst, id);
1813 }
1814
1815 self.edge_properties.remove_all(id);
1817
1818 true
1819 } else {
1820 false
1821 }
1822 }
1823
1824 #[cfg(feature = "tiered-storage")]
1827 pub fn delete_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> bool {
1828 let mut versions = self.edge_versions.write();
1829 if let Some(index) = versions.get_mut(&id) {
1830 let (src, dst) = {
1832 match index.visible_at(epoch) {
1833 Some(version_ref) => {
1834 if let Some(record) = self.read_edge_record(&version_ref) {
1835 if record.is_deleted() {
1836 return false;
1837 }
1838 (record.src, record.dst)
1839 } else {
1840 return false;
1841 }
1842 }
1843 None => return false,
1844 }
1845 };
1846
1847 index.mark_deleted(epoch);
1849
1850 drop(versions); self.forward_adj.mark_deleted(src, id);
1854 if let Some(ref backward) = self.backward_adj {
1855 backward.mark_deleted(dst, id);
1856 }
1857
1858 self.edge_properties.remove_all(id);
1860
1861 true
1862 } else {
1863 false
1864 }
1865 }
1866
1867 #[must_use]
1869 #[cfg(not(feature = "tiered-storage"))]
1870 pub fn edge_count(&self) -> usize {
1871 let epoch = self.current_epoch();
1872 self.edges
1873 .read()
1874 .values()
1875 .filter_map(|chain| chain.visible_at(epoch))
1876 .filter(|r| !r.is_deleted())
1877 .count()
1878 }
1879
1880 #[must_use]
1883 #[cfg(feature = "tiered-storage")]
1884 pub fn edge_count(&self) -> usize {
1885 let epoch = self.current_epoch();
1886 let versions = self.edge_versions.read();
1887 versions
1888 .iter()
1889 .filter(|(_, index)| {
1890 index.visible_at(epoch).map_or(false, |vref| {
1891 self.read_edge_record(&vref)
1892 .map_or(false, |r| !r.is_deleted())
1893 })
1894 })
1895 .count()
1896 }
1897
1898 #[cfg(not(feature = "tiered-storage"))]
1903 pub fn discard_uncommitted_versions(&self, tx_id: TxId) {
1904 {
1906 let mut nodes = self.nodes.write();
1907 for chain in nodes.values_mut() {
1908 chain.remove_versions_by(tx_id);
1909 }
1910 nodes.retain(|_, chain| !chain.is_empty());
1912 }
1913
1914 {
1916 let mut edges = self.edges.write();
1917 for chain in edges.values_mut() {
1918 chain.remove_versions_by(tx_id);
1919 }
1920 edges.retain(|_, chain| !chain.is_empty());
1922 }
1923 }
1924
1925 #[cfg(feature = "tiered-storage")]
1928 pub fn discard_uncommitted_versions(&self, tx_id: TxId) {
1929 {
1931 let mut versions = self.node_versions.write();
1932 for index in versions.values_mut() {
1933 index.remove_versions_by(tx_id);
1934 }
1935 versions.retain(|_, index| !index.is_empty());
1937 }
1938
1939 {
1941 let mut versions = self.edge_versions.write();
1942 for index in versions.values_mut() {
1943 index.remove_versions_by(tx_id);
1944 }
1945 versions.retain(|_, index| !index.is_empty());
1947 }
1948 }
1949
1950 #[cfg(feature = "tiered-storage")]
1969 #[allow(unsafe_code)]
1970 pub fn freeze_epoch(&self, epoch: EpochId) -> usize {
1971 let mut node_records: Vec<(u64, NodeRecord)> = Vec::new();
1973 let mut node_hot_refs: Vec<(NodeId, HotVersionRef)> = Vec::new();
1974
1975 {
1976 let versions = self.node_versions.read();
1977 for (node_id, index) in versions.iter() {
1978 for hot_ref in index.hot_refs_for_epoch(epoch) {
1979 let arena = self.arena_allocator.arena(hot_ref.epoch);
1980 let record: &NodeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
1982 node_records.push((node_id.as_u64(), record.clone()));
1983 node_hot_refs.push((*node_id, *hot_ref));
1984 }
1985 }
1986 }
1987
1988 let mut edge_records: Vec<(u64, EdgeRecord)> = Vec::new();
1990 let mut edge_hot_refs: Vec<(EdgeId, HotVersionRef)> = Vec::new();
1991
1992 {
1993 let versions = self.edge_versions.read();
1994 for (edge_id, index) in versions.iter() {
1995 for hot_ref in index.hot_refs_for_epoch(epoch) {
1996 let arena = self.arena_allocator.arena(hot_ref.epoch);
1997 let record: &EdgeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
1999 edge_records.push((edge_id.as_u64(), record.clone()));
2000 edge_hot_refs.push((*edge_id, *hot_ref));
2001 }
2002 }
2003 }
2004
2005 let total_frozen = node_records.len() + edge_records.len();
2006
2007 if total_frozen == 0 {
2008 return 0;
2009 }
2010
2011 let (node_entries, edge_entries) =
2013 self.epoch_store
2014 .freeze_epoch(epoch, node_records, edge_records);
2015
2016 let node_entry_map: FxHashMap<u64, _> = node_entries
2018 .iter()
2019 .map(|e| (e.entity_id, (e.offset, e.length)))
2020 .collect();
2021 let edge_entry_map: FxHashMap<u64, _> = edge_entries
2022 .iter()
2023 .map(|e| (e.entity_id, (e.offset, e.length)))
2024 .collect();
2025
2026 {
2028 let mut versions = self.node_versions.write();
2029 for (node_id, hot_ref) in &node_hot_refs {
2030 if let Some(index) = versions.get_mut(node_id) {
2031 if let Some(&(offset, length)) = node_entry_map.get(&node_id.as_u64()) {
2032 let cold_ref = ColdVersionRef {
2033 epoch,
2034 block_offset: offset,
2035 length,
2036 created_by: hot_ref.created_by,
2037 deleted_epoch: hot_ref.deleted_epoch,
2038 };
2039 index.freeze_epoch(epoch, std::iter::once(cold_ref));
2040 }
2041 }
2042 }
2043 }
2044
2045 {
2046 let mut versions = self.edge_versions.write();
2047 for (edge_id, hot_ref) in &edge_hot_refs {
2048 if let Some(index) = versions.get_mut(edge_id) {
2049 if let Some(&(offset, length)) = edge_entry_map.get(&edge_id.as_u64()) {
2050 let cold_ref = ColdVersionRef {
2051 epoch,
2052 block_offset: offset,
2053 length,
2054 created_by: hot_ref.created_by,
2055 deleted_epoch: hot_ref.deleted_epoch,
2056 };
2057 index.freeze_epoch(epoch, std::iter::once(cold_ref));
2058 }
2059 }
2060 }
2061 }
2062
2063 total_frozen
2064 }
2065
2066 #[cfg(feature = "tiered-storage")]
2068 #[must_use]
2069 pub fn epoch_store(&self) -> &EpochStore {
2070 &self.epoch_store
2071 }
2072
2073 #[must_use]
2075 pub fn label_count(&self) -> usize {
2076 self.id_to_label.read().len()
2077 }
2078
2079 #[must_use]
2083 pub fn property_key_count(&self) -> usize {
2084 let node_keys = self.node_properties.column_count();
2085 let edge_keys = self.edge_properties.column_count();
2086 node_keys + edge_keys
2090 }
2091
2092 #[must_use]
2094 pub fn edge_type_count(&self) -> usize {
2095 self.id_to_edge_type.read().len()
2096 }
2097
2098 pub fn neighbors(
2105 &self,
2106 node: NodeId,
2107 direction: Direction,
2108 ) -> impl Iterator<Item = NodeId> + '_ {
2109 let forward: Box<dyn Iterator<Item = NodeId>> = match direction {
2110 Direction::Outgoing | Direction::Both => {
2111 Box::new(self.forward_adj.neighbors(node).into_iter())
2112 }
2113 Direction::Incoming => Box::new(std::iter::empty()),
2114 };
2115
2116 let backward: Box<dyn Iterator<Item = NodeId>> = match direction {
2117 Direction::Incoming | Direction::Both => {
2118 if let Some(ref adj) = self.backward_adj {
2119 Box::new(adj.neighbors(node).into_iter())
2120 } else {
2121 Box::new(std::iter::empty())
2122 }
2123 }
2124 Direction::Outgoing => Box::new(std::iter::empty()),
2125 };
2126
2127 forward.chain(backward)
2128 }
2129
2130 pub fn edges_from(
2134 &self,
2135 node: NodeId,
2136 direction: Direction,
2137 ) -> impl Iterator<Item = (NodeId, EdgeId)> + '_ {
2138 let forward: Box<dyn Iterator<Item = (NodeId, EdgeId)>> = match direction {
2139 Direction::Outgoing | Direction::Both => {
2140 Box::new(self.forward_adj.edges_from(node).into_iter())
2141 }
2142 Direction::Incoming => Box::new(std::iter::empty()),
2143 };
2144
2145 let backward: Box<dyn Iterator<Item = (NodeId, EdgeId)>> = match direction {
2146 Direction::Incoming | Direction::Both => {
2147 if let Some(ref adj) = self.backward_adj {
2148 Box::new(adj.edges_from(node).into_iter())
2149 } else {
2150 Box::new(std::iter::empty())
2151 }
2152 }
2153 Direction::Outgoing => Box::new(std::iter::empty()),
2154 };
2155
2156 forward.chain(backward)
2157 }
2158
2159 pub fn edges_to(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
2172 if let Some(ref backward) = self.backward_adj {
2173 backward.edges_from(node)
2174 } else {
2175 self.all_edges()
2177 .filter_map(|edge| {
2178 if edge.dst == node {
2179 Some((edge.src, edge.id))
2180 } else {
2181 None
2182 }
2183 })
2184 .collect()
2185 }
2186 }
2187
2188 #[must_use]
2192 pub fn out_degree(&self, node: NodeId) -> usize {
2193 self.forward_adj.out_degree(node)
2194 }
2195
2196 #[must_use]
2201 pub fn in_degree(&self, node: NodeId) -> usize {
2202 if let Some(ref backward) = self.backward_adj {
2203 backward.in_degree(node)
2204 } else {
2205 self.all_edges().filter(|edge| edge.dst == node).count()
2207 }
2208 }
2209
2210 #[must_use]
2212 #[cfg(not(feature = "tiered-storage"))]
2213 pub fn edge_type(&self, id: EdgeId) -> Option<ArcStr> {
2214 let edges = self.edges.read();
2215 let chain = edges.get(&id)?;
2216 let epoch = self.current_epoch();
2217 let record = chain.visible_at(epoch)?;
2218 let id_to_type = self.id_to_edge_type.read();
2219 id_to_type.get(record.type_id as usize).cloned()
2220 }
2221
2222 #[must_use]
2225 #[cfg(feature = "tiered-storage")]
2226 pub fn edge_type(&self, id: EdgeId) -> Option<ArcStr> {
2227 let versions = self.edge_versions.read();
2228 let index = versions.get(&id)?;
2229 let epoch = self.current_epoch();
2230 let vref = index.visible_at(epoch)?;
2231 let record = self.read_edge_record(&vref)?;
2232 let id_to_type = self.id_to_edge_type.read();
2233 id_to_type.get(record.type_id as usize).cloned()
2234 }
2235
2236 pub fn nodes_by_label(&self, label: &str) -> Vec<NodeId> {
2242 let label_to_id = self.label_to_id.read();
2243 if let Some(&label_id) = label_to_id.get(label) {
2244 let index = self.label_index.read();
2245 if let Some(set) = index.get(label_id as usize) {
2246 let mut ids: Vec<NodeId> = set.keys().copied().collect();
2247 ids.sort_unstable();
2248 return ids;
2249 }
2250 }
2251 Vec::new()
2252 }
2253
2254 #[cfg(not(feature = "tiered-storage"))]
2261 pub fn all_nodes(&self) -> impl Iterator<Item = Node> + '_ {
2262 let epoch = self.current_epoch();
2263 let node_ids: Vec<NodeId> = self
2264 .nodes
2265 .read()
2266 .iter()
2267 .filter_map(|(id, chain)| {
2268 chain
2269 .visible_at(epoch)
2270 .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
2271 })
2272 .collect();
2273
2274 node_ids.into_iter().filter_map(move |id| self.get_node(id))
2275 }
2276
2277 #[cfg(feature = "tiered-storage")]
2280 pub fn all_nodes(&self) -> impl Iterator<Item = Node> + '_ {
2281 let node_ids = self.node_ids();
2282 node_ids.into_iter().filter_map(move |id| self.get_node(id))
2283 }
2284
2285 #[cfg(not(feature = "tiered-storage"))]
2290 pub fn all_edges(&self) -> impl Iterator<Item = Edge> + '_ {
2291 let epoch = self.current_epoch();
2292 let edge_ids: Vec<EdgeId> = self
2293 .edges
2294 .read()
2295 .iter()
2296 .filter_map(|(id, chain)| {
2297 chain
2298 .visible_at(epoch)
2299 .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
2300 })
2301 .collect();
2302
2303 edge_ids.into_iter().filter_map(move |id| self.get_edge(id))
2304 }
2305
2306 #[cfg(feature = "tiered-storage")]
2309 pub fn all_edges(&self) -> impl Iterator<Item = Edge> + '_ {
2310 let epoch = self.current_epoch();
2311 let versions = self.edge_versions.read();
2312 let edge_ids: Vec<EdgeId> = versions
2313 .iter()
2314 .filter_map(|(id, index)| {
2315 index.visible_at(epoch).and_then(|vref| {
2316 self.read_edge_record(&vref)
2317 .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
2318 })
2319 })
2320 .collect();
2321
2322 edge_ids.into_iter().filter_map(move |id| self.get_edge(id))
2323 }
2324
2325 pub fn all_labels(&self) -> Vec<String> {
2327 self.id_to_label
2328 .read()
2329 .iter()
2330 .map(|s| s.to_string())
2331 .collect()
2332 }
2333
2334 pub fn all_edge_types(&self) -> Vec<String> {
2336 self.id_to_edge_type
2337 .read()
2338 .iter()
2339 .map(|s| s.to_string())
2340 .collect()
2341 }
2342
2343 pub fn all_property_keys(&self) -> Vec<String> {
2345 let mut keys = std::collections::HashSet::new();
2346 for key in self.node_properties.keys() {
2347 keys.insert(key.to_string());
2348 }
2349 for key in self.edge_properties.keys() {
2350 keys.insert(key.to_string());
2351 }
2352 keys.into_iter().collect()
2353 }
2354
2355 pub fn nodes_with_label<'a>(&'a self, label: &str) -> impl Iterator<Item = Node> + 'a {
2357 let node_ids = self.nodes_by_label(label);
2358 node_ids.into_iter().filter_map(move |id| self.get_node(id))
2359 }
2360
2361 #[cfg(not(feature = "tiered-storage"))]
2363 pub fn edges_with_type<'a>(&'a self, edge_type: &str) -> impl Iterator<Item = Edge> + 'a {
2364 let epoch = self.current_epoch();
2365 let type_to_id = self.edge_type_to_id.read();
2366
2367 if let Some(&type_id) = type_to_id.get(edge_type) {
2368 let edge_ids: Vec<EdgeId> = self
2369 .edges
2370 .read()
2371 .iter()
2372 .filter_map(|(id, chain)| {
2373 chain.visible_at(epoch).and_then(|r| {
2374 if !r.is_deleted() && r.type_id == type_id {
2375 Some(*id)
2376 } else {
2377 None
2378 }
2379 })
2380 })
2381 .collect();
2382
2383 Box::new(edge_ids.into_iter().filter_map(move |id| self.get_edge(id)))
2385 as Box<dyn Iterator<Item = Edge> + 'a>
2386 } else {
2387 Box::new(std::iter::empty()) as Box<dyn Iterator<Item = Edge> + 'a>
2389 }
2390 }
2391
2392 #[cfg(feature = "tiered-storage")]
2395 pub fn edges_with_type<'a>(&'a self, edge_type: &str) -> impl Iterator<Item = Edge> + 'a {
2396 let epoch = self.current_epoch();
2397 let type_to_id = self.edge_type_to_id.read();
2398
2399 if let Some(&type_id) = type_to_id.get(edge_type) {
2400 let versions = self.edge_versions.read();
2401 let edge_ids: Vec<EdgeId> = versions
2402 .iter()
2403 .filter_map(|(id, index)| {
2404 index.visible_at(epoch).and_then(|vref| {
2405 self.read_edge_record(&vref).and_then(|r| {
2406 if !r.is_deleted() && r.type_id == type_id {
2407 Some(*id)
2408 } else {
2409 None
2410 }
2411 })
2412 })
2413 })
2414 .collect();
2415
2416 Box::new(edge_ids.into_iter().filter_map(move |id| self.get_edge(id)))
2417 as Box<dyn Iterator<Item = Edge> + 'a>
2418 } else {
2419 Box::new(std::iter::empty()) as Box<dyn Iterator<Item = Edge> + 'a>
2420 }
2421 }
2422
2423 #[must_use]
2430 pub fn node_property_might_match(
2431 &self,
2432 property: &PropertyKey,
2433 op: CompareOp,
2434 value: &Value,
2435 ) -> bool {
2436 self.node_properties.might_match(property, op, value)
2437 }
2438
2439 #[must_use]
2441 pub fn edge_property_might_match(
2442 &self,
2443 property: &PropertyKey,
2444 op: CompareOp,
2445 value: &Value,
2446 ) -> bool {
2447 self.edge_properties.might_match(property, op, value)
2448 }
2449
2450 #[must_use]
2452 pub fn node_property_zone_map(&self, property: &PropertyKey) -> Option<ZoneMapEntry> {
2453 self.node_properties.zone_map(property)
2454 }
2455
2456 #[must_use]
2458 pub fn edge_property_zone_map(&self, property: &PropertyKey) -> Option<ZoneMapEntry> {
2459 self.edge_properties.zone_map(property)
2460 }
2461
2462 pub fn rebuild_zone_maps(&self) {
2464 self.node_properties.rebuild_zone_maps();
2465 self.edge_properties.rebuild_zone_maps();
2466 }
2467
2468 #[must_use]
2472 pub fn statistics(&self) -> Statistics {
2473 self.statistics.read().clone()
2474 }
2475
2476 #[cfg(not(feature = "tiered-storage"))]
2481 pub fn compute_statistics(&self) {
2482 let mut stats = Statistics::new();
2483
2484 stats.total_nodes = self.node_count() as u64;
2486 stats.total_edges = self.edge_count() as u64;
2487
2488 let id_to_label = self.id_to_label.read();
2490 let label_index = self.label_index.read();
2491
2492 for (label_id, label_name) in id_to_label.iter().enumerate() {
2493 let node_count = label_index
2494 .get(label_id)
2495 .map(|set| set.len() as u64)
2496 .unwrap_or(0);
2497
2498 if node_count > 0 {
2499 let avg_out_degree = if stats.total_nodes > 0 {
2501 stats.total_edges as f64 / stats.total_nodes as f64
2502 } else {
2503 0.0
2504 };
2505
2506 let label_stats =
2507 LabelStatistics::new(node_count).with_degrees(avg_out_degree, avg_out_degree);
2508
2509 stats.update_label(label_name.as_ref(), label_stats);
2510 }
2511 }
2512
2513 let id_to_edge_type = self.id_to_edge_type.read();
2515 let edges = self.edges.read();
2516 let epoch = self.current_epoch();
2517
2518 let mut edge_type_counts: FxHashMap<u32, u64> = FxHashMap::default();
2519 for chain in edges.values() {
2520 if let Some(record) = chain.visible_at(epoch) {
2521 if !record.is_deleted() {
2522 *edge_type_counts.entry(record.type_id).or_default() += 1;
2523 }
2524 }
2525 }
2526
2527 for (type_id, count) in edge_type_counts {
2528 if let Some(type_name) = id_to_edge_type.get(type_id as usize) {
2529 let avg_degree = if stats.total_nodes > 0 {
2530 count as f64 / stats.total_nodes as f64
2531 } else {
2532 0.0
2533 };
2534
2535 let edge_stats = EdgeTypeStatistics::new(count, avg_degree, avg_degree);
2536 stats.update_edge_type(type_name.as_ref(), edge_stats);
2537 }
2538 }
2539
2540 *self.statistics.write() = stats;
2541 }
2542
2543 #[cfg(feature = "tiered-storage")]
2546 pub fn compute_statistics(&self) {
2547 let mut stats = Statistics::new();
2548
2549 stats.total_nodes = self.node_count() as u64;
2551 stats.total_edges = self.edge_count() as u64;
2552
2553 let id_to_label = self.id_to_label.read();
2555 let label_index = self.label_index.read();
2556
2557 for (label_id, label_name) in id_to_label.iter().enumerate() {
2558 let node_count = label_index
2559 .get(label_id)
2560 .map(|set| set.len() as u64)
2561 .unwrap_or(0);
2562
2563 if node_count > 0 {
2564 let avg_out_degree = if stats.total_nodes > 0 {
2565 stats.total_edges as f64 / stats.total_nodes as f64
2566 } else {
2567 0.0
2568 };
2569
2570 let label_stats =
2571 LabelStatistics::new(node_count).with_degrees(avg_out_degree, avg_out_degree);
2572
2573 stats.update_label(label_name.as_ref(), label_stats);
2574 }
2575 }
2576
2577 let id_to_edge_type = self.id_to_edge_type.read();
2579 let versions = self.edge_versions.read();
2580 let epoch = self.current_epoch();
2581
2582 let mut edge_type_counts: FxHashMap<u32, u64> = FxHashMap::default();
2583 for index in versions.values() {
2584 if let Some(vref) = index.visible_at(epoch) {
2585 if let Some(record) = self.read_edge_record(&vref) {
2586 if !record.is_deleted() {
2587 *edge_type_counts.entry(record.type_id).or_default() += 1;
2588 }
2589 }
2590 }
2591 }
2592
2593 for (type_id, count) in edge_type_counts {
2594 if let Some(type_name) = id_to_edge_type.get(type_id as usize) {
2595 let avg_degree = if stats.total_nodes > 0 {
2596 count as f64 / stats.total_nodes as f64
2597 } else {
2598 0.0
2599 };
2600
2601 let edge_stats = EdgeTypeStatistics::new(count, avg_degree, avg_degree);
2602 stats.update_edge_type(type_name.as_ref(), edge_stats);
2603 }
2604 }
2605
2606 *self.statistics.write() = stats;
2607 }
2608
2609 #[must_use]
2611 pub fn estimate_label_cardinality(&self, label: &str) -> f64 {
2612 self.statistics.read().estimate_label_cardinality(label)
2613 }
2614
2615 #[must_use]
2617 pub fn estimate_avg_degree(&self, edge_type: &str, outgoing: bool) -> f64 {
2618 self.statistics
2619 .read()
2620 .estimate_avg_degree(edge_type, outgoing)
2621 }
2622
2623 fn get_or_create_label_id(&self, label: &str) -> u32 {
2626 {
2627 let label_to_id = self.label_to_id.read();
2628 if let Some(&id) = label_to_id.get(label) {
2629 return id;
2630 }
2631 }
2632
2633 let mut label_to_id = self.label_to_id.write();
2634 let mut id_to_label = self.id_to_label.write();
2635
2636 if let Some(&id) = label_to_id.get(label) {
2638 return id;
2639 }
2640
2641 let id = id_to_label.len() as u32;
2642
2643 let label: ArcStr = label.into();
2644 label_to_id.insert(label.clone(), id);
2645 id_to_label.push(label);
2646
2647 id
2648 }
2649
2650 fn get_or_create_edge_type_id(&self, edge_type: &str) -> u32 {
2651 {
2652 let type_to_id = self.edge_type_to_id.read();
2653 if let Some(&id) = type_to_id.get(edge_type) {
2654 return id;
2655 }
2656 }
2657
2658 let mut type_to_id = self.edge_type_to_id.write();
2659 let mut id_to_type = self.id_to_edge_type.write();
2660
2661 if let Some(&id) = type_to_id.get(edge_type) {
2663 return id;
2664 }
2665
2666 let id = id_to_type.len() as u32;
2667 let edge_type: ArcStr = edge_type.into();
2668 type_to_id.insert(edge_type.clone(), id);
2669 id_to_type.push(edge_type);
2670
2671 id
2672 }
2673
2674 #[cfg(not(feature = "tiered-storage"))]
2681 pub fn create_node_with_id(&self, id: NodeId, labels: &[&str]) {
2682 let epoch = self.current_epoch();
2683 let mut record = NodeRecord::new(id, epoch);
2684 record.set_label_count(labels.len() as u16);
2685
2686 let mut node_label_set = FxHashSet::default();
2688 for label in labels {
2689 let label_id = self.get_or_create_label_id(*label);
2690 node_label_set.insert(label_id);
2691
2692 let mut index = self.label_index.write();
2694 while index.len() <= label_id as usize {
2695 index.push(FxHashMap::default());
2696 }
2697 index[label_id as usize].insert(id, ());
2698 }
2699
2700 self.node_labels.write().insert(id, node_label_set);
2702
2703 let chain = VersionChain::with_initial(record, epoch, TxId::SYSTEM);
2705 self.nodes.write().insert(id, chain);
2706
2707 let id_val = id.as_u64();
2709 let _ = self
2710 .next_node_id
2711 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
2712 if id_val >= current {
2713 Some(id_val + 1)
2714 } else {
2715 None
2716 }
2717 });
2718 }
2719
2720 #[cfg(feature = "tiered-storage")]
2723 pub fn create_node_with_id(&self, id: NodeId, labels: &[&str]) {
2724 let epoch = self.current_epoch();
2725 let mut record = NodeRecord::new(id, epoch);
2726 record.set_label_count(labels.len() as u16);
2727
2728 let mut node_label_set = FxHashSet::default();
2730 for label in labels {
2731 let label_id = self.get_or_create_label_id(*label);
2732 node_label_set.insert(label_id);
2733
2734 let mut index = self.label_index.write();
2736 while index.len() <= label_id as usize {
2737 index.push(FxHashMap::default());
2738 }
2739 index[label_id as usize].insert(id, ());
2740 }
2741
2742 self.node_labels.write().insert(id, node_label_set);
2744
2745 let arena = self.arena_allocator.arena_or_create(epoch);
2747 let (offset, _stored) = arena.alloc_value_with_offset(record);
2748
2749 let hot_ref = HotVersionRef::new(epoch, offset, TxId::SYSTEM);
2751 let mut versions = self.node_versions.write();
2752 versions.insert(id, VersionIndex::with_initial(hot_ref));
2753
2754 let id_val = id.as_u64();
2756 let _ = self
2757 .next_node_id
2758 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
2759 if id_val >= current {
2760 Some(id_val + 1)
2761 } else {
2762 None
2763 }
2764 });
2765 }
2766
2767 #[cfg(not(feature = "tiered-storage"))]
2771 pub fn create_edge_with_id(&self, id: EdgeId, src: NodeId, dst: NodeId, edge_type: &str) {
2772 let epoch = self.current_epoch();
2773 let type_id = self.get_or_create_edge_type_id(edge_type);
2774
2775 let record = EdgeRecord::new(id, src, dst, type_id, epoch);
2776 let chain = VersionChain::with_initial(record, epoch, TxId::SYSTEM);
2777 self.edges.write().insert(id, chain);
2778
2779 self.forward_adj.add_edge(src, dst, id);
2781 if let Some(ref backward) = self.backward_adj {
2782 backward.add_edge(dst, src, id);
2783 }
2784
2785 let id_val = id.as_u64();
2787 let _ = self
2788 .next_edge_id
2789 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
2790 if id_val >= current {
2791 Some(id_val + 1)
2792 } else {
2793 None
2794 }
2795 });
2796 }
2797
2798 #[cfg(feature = "tiered-storage")]
2801 pub fn create_edge_with_id(&self, id: EdgeId, src: NodeId, dst: NodeId, edge_type: &str) {
2802 let epoch = self.current_epoch();
2803 let type_id = self.get_or_create_edge_type_id(edge_type);
2804
2805 let record = EdgeRecord::new(id, src, dst, type_id, epoch);
2806
2807 let arena = self.arena_allocator.arena_or_create(epoch);
2809 let (offset, _stored) = arena.alloc_value_with_offset(record);
2810
2811 let hot_ref = HotVersionRef::new(epoch, offset, TxId::SYSTEM);
2813 let mut versions = self.edge_versions.write();
2814 versions.insert(id, VersionIndex::with_initial(hot_ref));
2815
2816 self.forward_adj.add_edge(src, dst, id);
2818 if let Some(ref backward) = self.backward_adj {
2819 backward.add_edge(dst, src, id);
2820 }
2821
2822 let id_val = id.as_u64();
2824 let _ = self
2825 .next_edge_id
2826 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
2827 if id_val >= current {
2828 Some(id_val + 1)
2829 } else {
2830 None
2831 }
2832 });
2833 }
2834
2835 pub fn set_epoch(&self, epoch: EpochId) {
2837 self.current_epoch.store(epoch.as_u64(), Ordering::SeqCst);
2838 }
2839}
2840
2841impl Default for LpgStore {
2842 fn default() -> Self {
2843 Self::new()
2844 }
2845}
2846
2847#[cfg(test)]
2848mod tests {
2849 use super::*;
2850
2851 #[test]
2852 fn test_create_node() {
2853 let store = LpgStore::new();
2854
2855 let id = store.create_node(&["Person"]);
2856 assert!(id.is_valid());
2857
2858 let node = store.get_node(id).unwrap();
2859 assert!(node.has_label("Person"));
2860 assert!(!node.has_label("Animal"));
2861 }
2862
2863 #[test]
2864 fn test_create_node_with_props() {
2865 let store = LpgStore::new();
2866
2867 let id = store.create_node_with_props(
2868 &["Person"],
2869 [("name", Value::from("Alice")), ("age", Value::from(30i64))],
2870 );
2871
2872 let node = store.get_node(id).unwrap();
2873 assert_eq!(
2874 node.get_property("name").and_then(|v| v.as_str()),
2875 Some("Alice")
2876 );
2877 assert_eq!(
2878 node.get_property("age").and_then(|v| v.as_int64()),
2879 Some(30)
2880 );
2881 }
2882
2883 #[test]
2884 fn test_delete_node() {
2885 let store = LpgStore::new();
2886
2887 let id = store.create_node(&["Person"]);
2888 assert_eq!(store.node_count(), 1);
2889
2890 assert!(store.delete_node(id));
2891 assert_eq!(store.node_count(), 0);
2892 assert!(store.get_node(id).is_none());
2893
2894 assert!(!store.delete_node(id));
2896 }
2897
2898 #[test]
2899 fn test_create_edge() {
2900 let store = LpgStore::new();
2901
2902 let alice = store.create_node(&["Person"]);
2903 let bob = store.create_node(&["Person"]);
2904
2905 let edge_id = store.create_edge(alice, bob, "KNOWS");
2906 assert!(edge_id.is_valid());
2907
2908 let edge = store.get_edge(edge_id).unwrap();
2909 assert_eq!(edge.src, alice);
2910 assert_eq!(edge.dst, bob);
2911 assert_eq!(edge.edge_type.as_str(), "KNOWS");
2912 }
2913
2914 #[test]
2915 fn test_neighbors() {
2916 let store = LpgStore::new();
2917
2918 let a = store.create_node(&["Person"]);
2919 let b = store.create_node(&["Person"]);
2920 let c = store.create_node(&["Person"]);
2921
2922 store.create_edge(a, b, "KNOWS");
2923 store.create_edge(a, c, "KNOWS");
2924
2925 let outgoing: Vec<_> = store.neighbors(a, Direction::Outgoing).collect();
2926 assert_eq!(outgoing.len(), 2);
2927 assert!(outgoing.contains(&b));
2928 assert!(outgoing.contains(&c));
2929
2930 let incoming: Vec<_> = store.neighbors(b, Direction::Incoming).collect();
2931 assert_eq!(incoming.len(), 1);
2932 assert!(incoming.contains(&a));
2933 }
2934
2935 #[test]
2936 fn test_nodes_by_label() {
2937 let store = LpgStore::new();
2938
2939 let p1 = store.create_node(&["Person"]);
2940 let p2 = store.create_node(&["Person"]);
2941 let _a = store.create_node(&["Animal"]);
2942
2943 let persons = store.nodes_by_label("Person");
2944 assert_eq!(persons.len(), 2);
2945 assert!(persons.contains(&p1));
2946 assert!(persons.contains(&p2));
2947
2948 let animals = store.nodes_by_label("Animal");
2949 assert_eq!(animals.len(), 1);
2950 }
2951
2952 #[test]
2953 fn test_delete_edge() {
2954 let store = LpgStore::new();
2955
2956 let a = store.create_node(&["Person"]);
2957 let b = store.create_node(&["Person"]);
2958 let edge_id = store.create_edge(a, b, "KNOWS");
2959
2960 assert_eq!(store.edge_count(), 1);
2961
2962 assert!(store.delete_edge(edge_id));
2963 assert_eq!(store.edge_count(), 0);
2964 assert!(store.get_edge(edge_id).is_none());
2965 }
2966
2967 #[test]
2970 fn test_lpg_store_config() {
2971 let config = LpgStoreConfig {
2973 backward_edges: false,
2974 initial_node_capacity: 100,
2975 initial_edge_capacity: 200,
2976 };
2977 let store = LpgStore::with_config(config);
2978
2979 let a = store.create_node(&["Person"]);
2981 let b = store.create_node(&["Person"]);
2982 store.create_edge(a, b, "KNOWS");
2983
2984 let outgoing: Vec<_> = store.neighbors(a, Direction::Outgoing).collect();
2986 assert_eq!(outgoing.len(), 1);
2987
2988 let incoming: Vec<_> = store.neighbors(b, Direction::Incoming).collect();
2990 assert_eq!(incoming.len(), 0);
2991 }
2992
2993 #[test]
2994 fn test_epoch_management() {
2995 let store = LpgStore::new();
2996
2997 let epoch0 = store.current_epoch();
2998 assert_eq!(epoch0.as_u64(), 0);
2999
3000 let epoch1 = store.new_epoch();
3001 assert_eq!(epoch1.as_u64(), 1);
3002
3003 let current = store.current_epoch();
3004 assert_eq!(current.as_u64(), 1);
3005 }
3006
3007 #[test]
3008 fn test_node_properties() {
3009 let store = LpgStore::new();
3010 let id = store.create_node(&["Person"]);
3011
3012 store.set_node_property(id, "name", Value::from("Alice"));
3014 let name = store.get_node_property(id, &"name".into());
3015 assert!(matches!(name, Some(Value::String(s)) if s.as_str() == "Alice"));
3016
3017 store.set_node_property(id, "name", Value::from("Bob"));
3019 let name = store.get_node_property(id, &"name".into());
3020 assert!(matches!(name, Some(Value::String(s)) if s.as_str() == "Bob"));
3021
3022 let old = store.remove_node_property(id, "name");
3024 assert!(matches!(old, Some(Value::String(s)) if s.as_str() == "Bob"));
3025
3026 let name = store.get_node_property(id, &"name".into());
3028 assert!(name.is_none());
3029
3030 let none = store.remove_node_property(id, "nonexistent");
3032 assert!(none.is_none());
3033 }
3034
3035 #[test]
3036 fn test_edge_properties() {
3037 let store = LpgStore::new();
3038 let a = store.create_node(&["Person"]);
3039 let b = store.create_node(&["Person"]);
3040 let edge_id = store.create_edge(a, b, "KNOWS");
3041
3042 store.set_edge_property(edge_id, "since", Value::from(2020i64));
3044 let since = store.get_edge_property(edge_id, &"since".into());
3045 assert_eq!(since.and_then(|v| v.as_int64()), Some(2020));
3046
3047 let old = store.remove_edge_property(edge_id, "since");
3049 assert_eq!(old.and_then(|v| v.as_int64()), Some(2020));
3050
3051 let since = store.get_edge_property(edge_id, &"since".into());
3052 assert!(since.is_none());
3053 }
3054
3055 #[test]
3056 fn test_add_remove_label() {
3057 let store = LpgStore::new();
3058 let id = store.create_node(&["Person"]);
3059
3060 assert!(store.add_label(id, "Employee"));
3062
3063 let node = store.get_node(id).unwrap();
3064 assert!(node.has_label("Person"));
3065 assert!(node.has_label("Employee"));
3066
3067 assert!(!store.add_label(id, "Employee"));
3069
3070 assert!(store.remove_label(id, "Employee"));
3072
3073 let node = store.get_node(id).unwrap();
3074 assert!(node.has_label("Person"));
3075 assert!(!node.has_label("Employee"));
3076
3077 assert!(!store.remove_label(id, "Employee"));
3079 assert!(!store.remove_label(id, "NonExistent"));
3080 }
3081
3082 #[test]
3083 fn test_add_label_to_nonexistent_node() {
3084 let store = LpgStore::new();
3085 let fake_id = NodeId::new(999);
3086 assert!(!store.add_label(fake_id, "Label"));
3087 }
3088
3089 #[test]
3090 fn test_remove_label_from_nonexistent_node() {
3091 let store = LpgStore::new();
3092 let fake_id = NodeId::new(999);
3093 assert!(!store.remove_label(fake_id, "Label"));
3094 }
3095
3096 #[test]
3097 fn test_node_ids() {
3098 let store = LpgStore::new();
3099
3100 let n1 = store.create_node(&["Person"]);
3101 let n2 = store.create_node(&["Person"]);
3102 let n3 = store.create_node(&["Person"]);
3103
3104 let ids = store.node_ids();
3105 assert_eq!(ids.len(), 3);
3106 assert!(ids.contains(&n1));
3107 assert!(ids.contains(&n2));
3108 assert!(ids.contains(&n3));
3109
3110 store.delete_node(n2);
3112 let ids = store.node_ids();
3113 assert_eq!(ids.len(), 2);
3114 assert!(!ids.contains(&n2));
3115 }
3116
3117 #[test]
3118 fn test_delete_node_nonexistent() {
3119 let store = LpgStore::new();
3120 let fake_id = NodeId::new(999);
3121 assert!(!store.delete_node(fake_id));
3122 }
3123
3124 #[test]
3125 fn test_delete_edge_nonexistent() {
3126 let store = LpgStore::new();
3127 let fake_id = EdgeId::new(999);
3128 assert!(!store.delete_edge(fake_id));
3129 }
3130
3131 #[test]
3132 fn test_delete_edge_double() {
3133 let store = LpgStore::new();
3134 let a = store.create_node(&["Person"]);
3135 let b = store.create_node(&["Person"]);
3136 let edge_id = store.create_edge(a, b, "KNOWS");
3137
3138 assert!(store.delete_edge(edge_id));
3139 assert!(!store.delete_edge(edge_id)); }
3141
3142 #[test]
3143 fn test_create_edge_with_props() {
3144 let store = LpgStore::new();
3145 let a = store.create_node(&["Person"]);
3146 let b = store.create_node(&["Person"]);
3147
3148 let edge_id = store.create_edge_with_props(
3149 a,
3150 b,
3151 "KNOWS",
3152 [
3153 ("since", Value::from(2020i64)),
3154 ("weight", Value::from(1.0)),
3155 ],
3156 );
3157
3158 let edge = store.get_edge(edge_id).unwrap();
3159 assert_eq!(
3160 edge.get_property("since").and_then(|v| v.as_int64()),
3161 Some(2020)
3162 );
3163 assert_eq!(
3164 edge.get_property("weight").and_then(|v| v.as_float64()),
3165 Some(1.0)
3166 );
3167 }
3168
3169 #[test]
3170 fn test_delete_node_edges() {
3171 let store = LpgStore::new();
3172
3173 let a = store.create_node(&["Person"]);
3174 let b = store.create_node(&["Person"]);
3175 let c = store.create_node(&["Person"]);
3176
3177 store.create_edge(a, b, "KNOWS"); store.create_edge(c, a, "KNOWS"); assert_eq!(store.edge_count(), 2);
3181
3182 store.delete_node_edges(a);
3184
3185 assert_eq!(store.edge_count(), 0);
3186 }
3187
3188 #[test]
3189 fn test_neighbors_both_directions() {
3190 let store = LpgStore::new();
3191
3192 let a = store.create_node(&["Person"]);
3193 let b = store.create_node(&["Person"]);
3194 let c = store.create_node(&["Person"]);
3195
3196 store.create_edge(a, b, "KNOWS"); store.create_edge(c, a, "KNOWS"); let neighbors: Vec<_> = store.neighbors(a, Direction::Both).collect();
3201 assert_eq!(neighbors.len(), 2);
3202 assert!(neighbors.contains(&b)); assert!(neighbors.contains(&c)); }
3205
3206 #[test]
3207 fn test_edges_from() {
3208 let store = LpgStore::new();
3209
3210 let a = store.create_node(&["Person"]);
3211 let b = store.create_node(&["Person"]);
3212 let c = store.create_node(&["Person"]);
3213
3214 let e1 = store.create_edge(a, b, "KNOWS");
3215 let e2 = store.create_edge(a, c, "KNOWS");
3216
3217 let edges: Vec<_> = store.edges_from(a, Direction::Outgoing).collect();
3218 assert_eq!(edges.len(), 2);
3219 assert!(edges.iter().any(|(_, e)| *e == e1));
3220 assert!(edges.iter().any(|(_, e)| *e == e2));
3221
3222 let incoming: Vec<_> = store.edges_from(b, Direction::Incoming).collect();
3224 assert_eq!(incoming.len(), 1);
3225 assert_eq!(incoming[0].1, e1);
3226 }
3227
3228 #[test]
3229 fn test_edges_to() {
3230 let store = LpgStore::new();
3231
3232 let a = store.create_node(&["Person"]);
3233 let b = store.create_node(&["Person"]);
3234 let c = store.create_node(&["Person"]);
3235
3236 let e1 = store.create_edge(a, b, "KNOWS");
3237 let e2 = store.create_edge(c, b, "KNOWS");
3238
3239 let to_b = store.edges_to(b);
3241 assert_eq!(to_b.len(), 2);
3242 assert!(to_b.iter().any(|(src, e)| *src == a && *e == e1));
3243 assert!(to_b.iter().any(|(src, e)| *src == c && *e == e2));
3244 }
3245
3246 #[test]
3247 fn test_out_degree_in_degree() {
3248 let store = LpgStore::new();
3249
3250 let a = store.create_node(&["Person"]);
3251 let b = store.create_node(&["Person"]);
3252 let c = store.create_node(&["Person"]);
3253
3254 store.create_edge(a, b, "KNOWS");
3255 store.create_edge(a, c, "KNOWS");
3256 store.create_edge(c, b, "KNOWS");
3257
3258 assert_eq!(store.out_degree(a), 2);
3259 assert_eq!(store.out_degree(b), 0);
3260 assert_eq!(store.out_degree(c), 1);
3261
3262 assert_eq!(store.in_degree(a), 0);
3263 assert_eq!(store.in_degree(b), 2);
3264 assert_eq!(store.in_degree(c), 1);
3265 }
3266
3267 #[test]
3268 fn test_edge_type() {
3269 let store = LpgStore::new();
3270
3271 let a = store.create_node(&["Person"]);
3272 let b = store.create_node(&["Person"]);
3273 let edge_id = store.create_edge(a, b, "KNOWS");
3274
3275 let edge_type = store.edge_type(edge_id);
3276 assert_eq!(edge_type.as_deref(), Some("KNOWS"));
3277
3278 let fake_id = EdgeId::new(999);
3280 assert!(store.edge_type(fake_id).is_none());
3281 }
3282
3283 #[test]
3284 fn test_count_methods() {
3285 let store = LpgStore::new();
3286
3287 assert_eq!(store.label_count(), 0);
3288 assert_eq!(store.edge_type_count(), 0);
3289 assert_eq!(store.property_key_count(), 0);
3290
3291 let a = store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
3292 let b = store.create_node(&["Company"]);
3293 store.create_edge_with_props(a, b, "WORKS_AT", [("since", Value::from(2020i64))]);
3294
3295 assert_eq!(store.label_count(), 2); assert_eq!(store.edge_type_count(), 1); assert_eq!(store.property_key_count(), 2); }
3299
3300 #[test]
3301 fn test_all_nodes_and_edges() {
3302 let store = LpgStore::new();
3303
3304 let a = store.create_node(&["Person"]);
3305 let b = store.create_node(&["Person"]);
3306 store.create_edge(a, b, "KNOWS");
3307
3308 let nodes: Vec<_> = store.all_nodes().collect();
3309 assert_eq!(nodes.len(), 2);
3310
3311 let edges: Vec<_> = store.all_edges().collect();
3312 assert_eq!(edges.len(), 1);
3313 }
3314
3315 #[test]
3316 fn test_all_labels_and_edge_types() {
3317 let store = LpgStore::new();
3318
3319 store.create_node(&["Person"]);
3320 store.create_node(&["Company"]);
3321 let a = store.create_node(&["Animal"]);
3322 let b = store.create_node(&["Animal"]);
3323 store.create_edge(a, b, "EATS");
3324
3325 let labels = store.all_labels();
3326 assert_eq!(labels.len(), 3);
3327 assert!(labels.contains(&"Person".to_string()));
3328 assert!(labels.contains(&"Company".to_string()));
3329 assert!(labels.contains(&"Animal".to_string()));
3330
3331 let edge_types = store.all_edge_types();
3332 assert_eq!(edge_types.len(), 1);
3333 assert!(edge_types.contains(&"EATS".to_string()));
3334 }
3335
3336 #[test]
3337 fn test_all_property_keys() {
3338 let store = LpgStore::new();
3339
3340 let a = store.create_node_with_props(&["Person"], [("name", Value::from("Alice"))]);
3341 let b = store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
3342 store.create_edge_with_props(a, b, "KNOWS", [("since", Value::from(2020i64))]);
3343
3344 let keys = store.all_property_keys();
3345 assert!(keys.contains(&"name".to_string()));
3346 assert!(keys.contains(&"age".to_string()));
3347 assert!(keys.contains(&"since".to_string()));
3348 }
3349
3350 #[test]
3351 fn test_nodes_with_label() {
3352 let store = LpgStore::new();
3353
3354 store.create_node(&["Person"]);
3355 store.create_node(&["Person"]);
3356 store.create_node(&["Company"]);
3357
3358 let persons: Vec<_> = store.nodes_with_label("Person").collect();
3359 assert_eq!(persons.len(), 2);
3360
3361 let companies: Vec<_> = store.nodes_with_label("Company").collect();
3362 assert_eq!(companies.len(), 1);
3363
3364 let none: Vec<_> = store.nodes_with_label("NonExistent").collect();
3365 assert_eq!(none.len(), 0);
3366 }
3367
3368 #[test]
3369 fn test_edges_with_type() {
3370 let store = LpgStore::new();
3371
3372 let a = store.create_node(&["Person"]);
3373 let b = store.create_node(&["Person"]);
3374 let c = store.create_node(&["Company"]);
3375
3376 store.create_edge(a, b, "KNOWS");
3377 store.create_edge(a, c, "WORKS_AT");
3378
3379 let knows: Vec<_> = store.edges_with_type("KNOWS").collect();
3380 assert_eq!(knows.len(), 1);
3381
3382 let works_at: Vec<_> = store.edges_with_type("WORKS_AT").collect();
3383 assert_eq!(works_at.len(), 1);
3384
3385 let none: Vec<_> = store.edges_with_type("NonExistent").collect();
3386 assert_eq!(none.len(), 0);
3387 }
3388
3389 #[test]
3390 fn test_nodes_by_label_nonexistent() {
3391 let store = LpgStore::new();
3392 store.create_node(&["Person"]);
3393
3394 let empty = store.nodes_by_label("NonExistent");
3395 assert!(empty.is_empty());
3396 }
3397
3398 #[test]
3399 fn test_statistics() {
3400 let store = LpgStore::new();
3401
3402 let a = store.create_node(&["Person"]);
3403 let b = store.create_node(&["Person"]);
3404 let c = store.create_node(&["Company"]);
3405
3406 store.create_edge(a, b, "KNOWS");
3407 store.create_edge(a, c, "WORKS_AT");
3408
3409 store.compute_statistics();
3410 let stats = store.statistics();
3411
3412 assert_eq!(stats.total_nodes, 3);
3413 assert_eq!(stats.total_edges, 2);
3414
3415 let person_card = store.estimate_label_cardinality("Person");
3417 assert!(person_card > 0.0);
3418
3419 let avg_degree = store.estimate_avg_degree("KNOWS", true);
3420 assert!(avg_degree >= 0.0);
3421 }
3422
3423 #[test]
3424 fn test_zone_maps() {
3425 let store = LpgStore::new();
3426
3427 store.create_node_with_props(&["Person"], [("age", Value::from(25i64))]);
3428 store.create_node_with_props(&["Person"], [("age", Value::from(35i64))]);
3429
3430 let might_match =
3432 store.node_property_might_match(&"age".into(), CompareOp::Eq, &Value::from(30i64));
3433 assert!(might_match);
3435
3436 let zone = store.node_property_zone_map(&"age".into());
3437 assert!(zone.is_some());
3438
3439 let no_zone = store.node_property_zone_map(&"nonexistent".into());
3441 assert!(no_zone.is_none());
3442
3443 let a = store.create_node(&["A"]);
3445 let b = store.create_node(&["B"]);
3446 store.create_edge_with_props(a, b, "REL", [("weight", Value::from(1.0))]);
3447
3448 let edge_zone = store.edge_property_zone_map(&"weight".into());
3449 assert!(edge_zone.is_some());
3450 }
3451
3452 #[test]
3453 fn test_rebuild_zone_maps() {
3454 let store = LpgStore::new();
3455 store.create_node_with_props(&["Person"], [("age", Value::from(25i64))]);
3456
3457 store.rebuild_zone_maps();
3459 }
3460
3461 #[test]
3462 fn test_create_node_with_id() {
3463 let store = LpgStore::new();
3464
3465 let specific_id = NodeId::new(100);
3466 store.create_node_with_id(specific_id, &["Person", "Employee"]);
3467
3468 let node = store.get_node(specific_id).unwrap();
3469 assert!(node.has_label("Person"));
3470 assert!(node.has_label("Employee"));
3471
3472 let next = store.create_node(&["Other"]);
3474 assert!(next.as_u64() > 100);
3475 }
3476
3477 #[test]
3478 fn test_create_edge_with_id() {
3479 let store = LpgStore::new();
3480
3481 let a = store.create_node(&["A"]);
3482 let b = store.create_node(&["B"]);
3483
3484 let specific_id = EdgeId::new(500);
3485 store.create_edge_with_id(specific_id, a, b, "REL");
3486
3487 let edge = store.get_edge(specific_id).unwrap();
3488 assert_eq!(edge.src, a);
3489 assert_eq!(edge.dst, b);
3490 assert_eq!(edge.edge_type.as_str(), "REL");
3491
3492 let next = store.create_edge(a, b, "OTHER");
3494 assert!(next.as_u64() > 500);
3495 }
3496
3497 #[test]
3498 fn test_set_epoch() {
3499 let store = LpgStore::new();
3500
3501 assert_eq!(store.current_epoch().as_u64(), 0);
3502
3503 store.set_epoch(EpochId::new(42));
3504 assert_eq!(store.current_epoch().as_u64(), 42);
3505 }
3506
3507 #[test]
3508 fn test_get_node_nonexistent() {
3509 let store = LpgStore::new();
3510 let fake_id = NodeId::new(999);
3511 assert!(store.get_node(fake_id).is_none());
3512 }
3513
3514 #[test]
3515 fn test_get_edge_nonexistent() {
3516 let store = LpgStore::new();
3517 let fake_id = EdgeId::new(999);
3518 assert!(store.get_edge(fake_id).is_none());
3519 }
3520
3521 #[test]
3522 fn test_multiple_labels() {
3523 let store = LpgStore::new();
3524
3525 let id = store.create_node(&["Person", "Employee", "Manager"]);
3526 let node = store.get_node(id).unwrap();
3527
3528 assert!(node.has_label("Person"));
3529 assert!(node.has_label("Employee"));
3530 assert!(node.has_label("Manager"));
3531 assert!(!node.has_label("Other"));
3532 }
3533
3534 #[test]
3535 fn test_default_impl() {
3536 let store: LpgStore = Default::default();
3537 assert_eq!(store.node_count(), 0);
3538 assert_eq!(store.edge_count(), 0);
3539 }
3540
3541 #[test]
3542 fn test_edges_from_both_directions() {
3543 let store = LpgStore::new();
3544
3545 let a = store.create_node(&["A"]);
3546 let b = store.create_node(&["B"]);
3547 let c = store.create_node(&["C"]);
3548
3549 let e1 = store.create_edge(a, b, "R1"); let e2 = store.create_edge(c, a, "R2"); let edges: Vec<_> = store.edges_from(a, Direction::Both).collect();
3554 assert_eq!(edges.len(), 2);
3555 assert!(edges.iter().any(|(_, e)| *e == e1)); assert!(edges.iter().any(|(_, e)| *e == e2)); }
3558
3559 #[test]
3560 fn test_no_backward_adj_in_degree() {
3561 let config = LpgStoreConfig {
3562 backward_edges: false,
3563 initial_node_capacity: 10,
3564 initial_edge_capacity: 10,
3565 };
3566 let store = LpgStore::with_config(config);
3567
3568 let a = store.create_node(&["A"]);
3569 let b = store.create_node(&["B"]);
3570 store.create_edge(a, b, "R");
3571
3572 let degree = store.in_degree(b);
3574 assert_eq!(degree, 1);
3575 }
3576
3577 #[test]
3578 fn test_no_backward_adj_edges_to() {
3579 let config = LpgStoreConfig {
3580 backward_edges: false,
3581 initial_node_capacity: 10,
3582 initial_edge_capacity: 10,
3583 };
3584 let store = LpgStore::with_config(config);
3585
3586 let a = store.create_node(&["A"]);
3587 let b = store.create_node(&["B"]);
3588 let e = store.create_edge(a, b, "R");
3589
3590 let edges = store.edges_to(b);
3592 assert_eq!(edges.len(), 1);
3593 assert_eq!(edges[0].1, e);
3594 }
3595
3596 #[test]
3597 fn test_node_versioned_creation() {
3598 let store = LpgStore::new();
3599
3600 let epoch = store.new_epoch();
3601 let tx_id = TxId::new(1);
3602
3603 let id = store.create_node_versioned(&["Person"], epoch, tx_id);
3604 assert!(store.get_node(id).is_some());
3605 }
3606
3607 #[test]
3608 fn test_edge_versioned_creation() {
3609 let store = LpgStore::new();
3610
3611 let a = store.create_node(&["A"]);
3612 let b = store.create_node(&["B"]);
3613
3614 let epoch = store.new_epoch();
3615 let tx_id = TxId::new(1);
3616
3617 let edge_id = store.create_edge_versioned(a, b, "REL", epoch, tx_id);
3618 assert!(store.get_edge(edge_id).is_some());
3619 }
3620
3621 #[test]
3622 fn test_node_with_props_versioned() {
3623 let store = LpgStore::new();
3624
3625 let epoch = store.new_epoch();
3626 let tx_id = TxId::new(1);
3627
3628 let id = store.create_node_with_props_versioned(
3629 &["Person"],
3630 [("name", Value::from("Alice"))],
3631 epoch,
3632 tx_id,
3633 );
3634
3635 let node = store.get_node(id).unwrap();
3636 assert_eq!(
3637 node.get_property("name").and_then(|v| v.as_str()),
3638 Some("Alice")
3639 );
3640 }
3641
3642 #[test]
3643 fn test_discard_uncommitted_versions() {
3644 let store = LpgStore::new();
3645
3646 let epoch = store.new_epoch();
3647 let tx_id = TxId::new(42);
3648
3649 let node_id = store.create_node_versioned(&["Person"], epoch, tx_id);
3651 assert!(store.get_node(node_id).is_some());
3652
3653 store.discard_uncommitted_versions(tx_id);
3655
3656 assert!(store.get_node(node_id).is_none());
3658 }
3659
3660 #[test]
3663 fn test_property_index_create_and_lookup() {
3664 let store = LpgStore::new();
3665
3666 let alice = store.create_node(&["Person"]);
3668 let bob = store.create_node(&["Person"]);
3669 let charlie = store.create_node(&["Person"]);
3670
3671 store.set_node_property(alice, "city", Value::from("NYC"));
3672 store.set_node_property(bob, "city", Value::from("NYC"));
3673 store.set_node_property(charlie, "city", Value::from("LA"));
3674
3675 let nyc_people = store.find_nodes_by_property("city", &Value::from("NYC"));
3677 assert_eq!(nyc_people.len(), 2);
3678
3679 store.create_property_index("city");
3681 assert!(store.has_property_index("city"));
3682
3683 let nyc_people = store.find_nodes_by_property("city", &Value::from("NYC"));
3685 assert_eq!(nyc_people.len(), 2);
3686 assert!(nyc_people.contains(&alice));
3687 assert!(nyc_people.contains(&bob));
3688
3689 let la_people = store.find_nodes_by_property("city", &Value::from("LA"));
3690 assert_eq!(la_people.len(), 1);
3691 assert!(la_people.contains(&charlie));
3692 }
3693
3694 #[test]
3695 fn test_property_index_maintained_on_update() {
3696 let store = LpgStore::new();
3697
3698 store.create_property_index("status");
3700
3701 let node = store.create_node(&["Task"]);
3702 store.set_node_property(node, "status", Value::from("pending"));
3703
3704 let pending = store.find_nodes_by_property("status", &Value::from("pending"));
3706 assert_eq!(pending.len(), 1);
3707 assert!(pending.contains(&node));
3708
3709 store.set_node_property(node, "status", Value::from("done"));
3711
3712 let pending = store.find_nodes_by_property("status", &Value::from("pending"));
3714 assert!(pending.is_empty());
3715
3716 let done = store.find_nodes_by_property("status", &Value::from("done"));
3718 assert_eq!(done.len(), 1);
3719 assert!(done.contains(&node));
3720 }
3721
3722 #[test]
3723 fn test_property_index_maintained_on_remove() {
3724 let store = LpgStore::new();
3725
3726 store.create_property_index("tag");
3727
3728 let node = store.create_node(&["Item"]);
3729 store.set_node_property(node, "tag", Value::from("important"));
3730
3731 let found = store.find_nodes_by_property("tag", &Value::from("important"));
3733 assert_eq!(found.len(), 1);
3734
3735 store.remove_node_property(node, "tag");
3737
3738 let found = store.find_nodes_by_property("tag", &Value::from("important"));
3740 assert!(found.is_empty());
3741 }
3742
3743 #[test]
3744 fn test_property_index_drop() {
3745 let store = LpgStore::new();
3746
3747 store.create_property_index("key");
3748 assert!(store.has_property_index("key"));
3749
3750 assert!(store.drop_property_index("key"));
3751 assert!(!store.has_property_index("key"));
3752
3753 assert!(!store.drop_property_index("key"));
3755 }
3756
3757 #[test]
3758 fn test_property_index_multiple_values() {
3759 let store = LpgStore::new();
3760
3761 store.create_property_index("age");
3762
3763 let n1 = store.create_node(&["Person"]);
3765 let n2 = store.create_node(&["Person"]);
3766 let n3 = store.create_node(&["Person"]);
3767 let n4 = store.create_node(&["Person"]);
3768
3769 store.set_node_property(n1, "age", Value::from(25i64));
3770 store.set_node_property(n2, "age", Value::from(25i64));
3771 store.set_node_property(n3, "age", Value::from(30i64));
3772 store.set_node_property(n4, "age", Value::from(25i64));
3773
3774 let age_25 = store.find_nodes_by_property("age", &Value::from(25i64));
3775 assert_eq!(age_25.len(), 3);
3776
3777 let age_30 = store.find_nodes_by_property("age", &Value::from(30i64));
3778 assert_eq!(age_30.len(), 1);
3779
3780 let age_40 = store.find_nodes_by_property("age", &Value::from(40i64));
3781 assert!(age_40.is_empty());
3782 }
3783
3784 #[test]
3785 fn test_property_index_builds_from_existing_data() {
3786 let store = LpgStore::new();
3787
3788 let n1 = store.create_node(&["Person"]);
3790 let n2 = store.create_node(&["Person"]);
3791 store.set_node_property(n1, "email", Value::from("alice@example.com"));
3792 store.set_node_property(n2, "email", Value::from("bob@example.com"));
3793
3794 store.create_property_index("email");
3796
3797 let alice = store.find_nodes_by_property("email", &Value::from("alice@example.com"));
3799 assert_eq!(alice.len(), 1);
3800 assert!(alice.contains(&n1));
3801
3802 let bob = store.find_nodes_by_property("email", &Value::from("bob@example.com"));
3803 assert_eq!(bob.len(), 1);
3804 assert!(bob.contains(&n2));
3805 }
3806
3807 #[test]
3808 fn test_get_node_property_batch() {
3809 let store = LpgStore::new();
3810
3811 let n1 = store.create_node(&["Person"]);
3812 let n2 = store.create_node(&["Person"]);
3813 let n3 = store.create_node(&["Person"]);
3814
3815 store.set_node_property(n1, "age", Value::from(25i64));
3816 store.set_node_property(n2, "age", Value::from(30i64));
3817 let age_key = PropertyKey::new("age");
3820 let values = store.get_node_property_batch(&[n1, n2, n3], &age_key);
3821
3822 assert_eq!(values.len(), 3);
3823 assert_eq!(values[0], Some(Value::from(25i64)));
3824 assert_eq!(values[1], Some(Value::from(30i64)));
3825 assert_eq!(values[2], None);
3826 }
3827
3828 #[test]
3829 fn test_get_node_property_batch_empty() {
3830 let store = LpgStore::new();
3831 let key = PropertyKey::new("any");
3832
3833 let values = store.get_node_property_batch(&[], &key);
3834 assert!(values.is_empty());
3835 }
3836
3837 #[test]
3838 fn test_get_nodes_properties_batch() {
3839 let store = LpgStore::new();
3840
3841 let n1 = store.create_node(&["Person"]);
3842 let n2 = store.create_node(&["Person"]);
3843 let n3 = store.create_node(&["Person"]);
3844
3845 store.set_node_property(n1, "name", Value::from("Alice"));
3846 store.set_node_property(n1, "age", Value::from(25i64));
3847 store.set_node_property(n2, "name", Value::from("Bob"));
3848 let all_props = store.get_nodes_properties_batch(&[n1, n2, n3]);
3851
3852 assert_eq!(all_props.len(), 3);
3853 assert_eq!(all_props[0].len(), 2); assert_eq!(all_props[1].len(), 1); assert_eq!(all_props[2].len(), 0); assert_eq!(
3858 all_props[0].get(&PropertyKey::new("name")),
3859 Some(&Value::from("Alice"))
3860 );
3861 assert_eq!(
3862 all_props[1].get(&PropertyKey::new("name")),
3863 Some(&Value::from("Bob"))
3864 );
3865 }
3866
3867 #[test]
3868 fn test_get_nodes_properties_batch_empty() {
3869 let store = LpgStore::new();
3870
3871 let all_props = store.get_nodes_properties_batch(&[]);
3872 assert!(all_props.is_empty());
3873 }
3874}