1use super::property::CompareOp;
13use super::{Edge, EdgeRecord, Node, NodeRecord, PropertyStorage};
14use crate::graph::Direction;
15use crate::index::adjacency::ChunkedAdjacency;
16use crate::index::zone_map::ZoneMapEntry;
17use crate::statistics::{EdgeTypeStatistics, LabelStatistics, Statistics};
18use arcstr::ArcStr;
19use dashmap::DashMap;
20#[cfg(not(feature = "tiered-storage"))]
21use grafeo_common::mvcc::VersionChain;
22use grafeo_common::types::{EdgeId, EpochId, HashableValue, NodeId, PropertyKey, TxId, Value};
23use grafeo_common::utils::hash::{FxHashMap, FxHashSet};
24use parking_lot::RwLock;
25use std::cmp::Ordering as CmpOrdering;
26#[cfg(any(feature = "tiered-storage", feature = "vector-index"))]
27use std::sync::Arc;
28use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
29
30#[cfg(feature = "vector-index")]
31use crate::index::vector::HnswIndex;
32
33fn compare_values_for_range(a: &Value, b: &Value) -> Option<CmpOrdering> {
35 match (a, b) {
36 (Value::Int64(a), Value::Int64(b)) => Some(a.cmp(b)),
37 (Value::Float64(a), Value::Float64(b)) => a.partial_cmp(b),
38 (Value::String(a), Value::String(b)) => Some(a.cmp(b)),
39 (Value::Bool(a), Value::Bool(b)) => Some(a.cmp(b)),
40 _ => None,
41 }
42}
43
44fn value_in_range(
46 value: &Value,
47 min: Option<&Value>,
48 max: Option<&Value>,
49 min_inclusive: bool,
50 max_inclusive: bool,
51) -> bool {
52 if let Some(min_val) = min {
54 match compare_values_for_range(value, min_val) {
55 Some(CmpOrdering::Less) => return false,
56 Some(CmpOrdering::Equal) if !min_inclusive => return false,
57 None => return false, _ => {}
59 }
60 }
61
62 if let Some(max_val) = max {
64 match compare_values_for_range(value, max_val) {
65 Some(CmpOrdering::Greater) => return false,
66 Some(CmpOrdering::Equal) if !max_inclusive => return false,
67 None => return false,
68 _ => {}
69 }
70 }
71
72 true
73}
74
75#[cfg(feature = "tiered-storage")]
77use crate::storage::EpochStore;
78#[cfg(feature = "tiered-storage")]
79use grafeo_common::memory::arena::ArenaAllocator;
80#[cfg(feature = "tiered-storage")]
81use grafeo_common::mvcc::{ColdVersionRef, HotVersionRef, VersionIndex, VersionRef};
82
83#[derive(Debug, Clone)]
89pub struct LpgStoreConfig {
90 pub backward_edges: bool,
93 pub initial_node_capacity: usize,
95 pub initial_edge_capacity: usize,
97}
98
99impl Default for LpgStoreConfig {
100 fn default() -> Self {
101 Self {
102 backward_edges: true,
103 initial_node_capacity: 1024,
104 initial_edge_capacity: 4096,
105 }
106 }
107}
108
109pub struct LpgStore {
168 #[allow(dead_code)]
170 config: LpgStoreConfig,
171
172 #[cfg(not(feature = "tiered-storage"))]
176 nodes: RwLock<FxHashMap<NodeId, VersionChain<NodeRecord>>>,
177
178 #[cfg(not(feature = "tiered-storage"))]
182 edges: RwLock<FxHashMap<EdgeId, VersionChain<EdgeRecord>>>,
183
184 #[cfg(feature = "tiered-storage")]
198 arena_allocator: Arc<ArenaAllocator>,
199
200 #[cfg(feature = "tiered-storage")]
204 node_versions: RwLock<FxHashMap<NodeId, VersionIndex>>,
205
206 #[cfg(feature = "tiered-storage")]
210 edge_versions: RwLock<FxHashMap<EdgeId, VersionIndex>>,
211
212 #[cfg(feature = "tiered-storage")]
215 epoch_store: Arc<EpochStore>,
216
217 node_properties: PropertyStorage<NodeId>,
219
220 edge_properties: PropertyStorage<EdgeId>,
222
223 label_to_id: RwLock<FxHashMap<ArcStr, u32>>,
226
227 id_to_label: RwLock<Vec<ArcStr>>,
230
231 edge_type_to_id: RwLock<FxHashMap<ArcStr, u32>>,
234
235 id_to_edge_type: RwLock<Vec<ArcStr>>,
238
239 forward_adj: ChunkedAdjacency,
241
242 backward_adj: Option<ChunkedAdjacency>,
245
246 label_index: RwLock<Vec<FxHashMap<NodeId, ()>>>,
249
250 node_labels: RwLock<FxHashMap<NodeId, FxHashSet<u32>>>,
254
255 property_indexes: RwLock<FxHashMap<PropertyKey, DashMap<HashableValue, FxHashSet<NodeId>>>>,
261
262 #[cfg(feature = "vector-index")]
267 vector_indexes: RwLock<FxHashMap<String, Arc<HnswIndex>>>,
268
269 next_node_id: AtomicU64,
271
272 next_edge_id: AtomicU64,
274
275 current_epoch: AtomicU64,
277
278 statistics: RwLock<Statistics>,
281
282 needs_stats_recompute: AtomicBool,
284}
285
286impl LpgStore {
287 #[must_use]
289 pub fn new() -> Self {
290 Self::with_config(LpgStoreConfig::default())
291 }
292
293 #[must_use]
295 pub fn with_config(config: LpgStoreConfig) -> Self {
296 let backward_adj = if config.backward_edges {
297 Some(ChunkedAdjacency::new())
298 } else {
299 None
300 };
301
302 Self {
303 #[cfg(not(feature = "tiered-storage"))]
304 nodes: RwLock::new(FxHashMap::default()),
305 #[cfg(not(feature = "tiered-storage"))]
306 edges: RwLock::new(FxHashMap::default()),
307 #[cfg(feature = "tiered-storage")]
308 arena_allocator: Arc::new(ArenaAllocator::new()),
309 #[cfg(feature = "tiered-storage")]
310 node_versions: RwLock::new(FxHashMap::default()),
311 #[cfg(feature = "tiered-storage")]
312 edge_versions: RwLock::new(FxHashMap::default()),
313 #[cfg(feature = "tiered-storage")]
314 epoch_store: Arc::new(EpochStore::new()),
315 node_properties: PropertyStorage::new(),
316 edge_properties: PropertyStorage::new(),
317 label_to_id: RwLock::new(FxHashMap::default()),
318 id_to_label: RwLock::new(Vec::new()),
319 edge_type_to_id: RwLock::new(FxHashMap::default()),
320 id_to_edge_type: RwLock::new(Vec::new()),
321 forward_adj: ChunkedAdjacency::new(),
322 backward_adj,
323 label_index: RwLock::new(Vec::new()),
324 node_labels: RwLock::new(FxHashMap::default()),
325 property_indexes: RwLock::new(FxHashMap::default()),
326 #[cfg(feature = "vector-index")]
327 vector_indexes: RwLock::new(FxHashMap::default()),
328 next_node_id: AtomicU64::new(0),
329 next_edge_id: AtomicU64::new(0),
330 current_epoch: AtomicU64::new(0),
331 statistics: RwLock::new(Statistics::new()),
332 needs_stats_recompute: AtomicBool::new(true),
333 config,
334 }
335 }
336
337 #[must_use]
339 pub fn current_epoch(&self) -> EpochId {
340 EpochId::new(self.current_epoch.load(Ordering::Acquire))
341 }
342
343 pub fn new_epoch(&self) -> EpochId {
345 let id = self.current_epoch.fetch_add(1, Ordering::AcqRel) + 1;
346 EpochId::new(id)
347 }
348
349 pub fn create_node(&self, labels: &[&str]) -> NodeId {
355 self.needs_stats_recompute.store(true, Ordering::Relaxed);
356 self.create_node_versioned(labels, self.current_epoch(), TxId::SYSTEM)
357 }
358
359 #[cfg(not(feature = "tiered-storage"))]
361 pub fn create_node_versioned(&self, labels: &[&str], epoch: EpochId, tx_id: TxId) -> NodeId {
362 let id = NodeId::new(self.next_node_id.fetch_add(1, Ordering::Relaxed));
363
364 let mut record = NodeRecord::new(id, epoch);
365 record.set_label_count(labels.len() as u16);
366
367 let mut node_label_set = FxHashSet::default();
369 for label in labels {
370 let label_id = self.get_or_create_label_id(*label);
371 node_label_set.insert(label_id);
372
373 let mut index = self.label_index.write();
375 while index.len() <= label_id as usize {
376 index.push(FxHashMap::default());
377 }
378 index[label_id as usize].insert(id, ());
379 }
380
381 self.node_labels.write().insert(id, node_label_set);
383
384 let chain = VersionChain::with_initial(record, epoch, tx_id);
386 self.nodes.write().insert(id, chain);
387 id
388 }
389
390 #[cfg(feature = "tiered-storage")]
393 pub fn create_node_versioned(&self, labels: &[&str], epoch: EpochId, tx_id: TxId) -> NodeId {
394 let id = NodeId::new(self.next_node_id.fetch_add(1, Ordering::Relaxed));
395
396 let mut record = NodeRecord::new(id, epoch);
397 record.set_label_count(labels.len() as u16);
398
399 let mut node_label_set = FxHashSet::default();
401 for label in labels {
402 let label_id = self.get_or_create_label_id(*label);
403 node_label_set.insert(label_id);
404
405 let mut index = self.label_index.write();
407 while index.len() <= label_id as usize {
408 index.push(FxHashMap::default());
409 }
410 index[label_id as usize].insert(id, ());
411 }
412
413 self.node_labels.write().insert(id, node_label_set);
415
416 let arena = self.arena_allocator.arena_or_create(epoch);
418 let (offset, _stored) = arena.alloc_value_with_offset(record);
419
420 let hot_ref = HotVersionRef::new(epoch, offset, tx_id);
422
423 let mut versions = self.node_versions.write();
425 if let Some(index) = versions.get_mut(&id) {
426 index.add_hot(hot_ref);
427 } else {
428 versions.insert(id, VersionIndex::with_initial(hot_ref));
429 }
430
431 id
432 }
433
434 pub fn create_node_with_props(
436 &self,
437 labels: &[&str],
438 properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
439 ) -> NodeId {
440 self.create_node_with_props_versioned(
441 labels,
442 properties,
443 self.current_epoch(),
444 TxId::SYSTEM,
445 )
446 }
447
448 #[cfg(not(feature = "tiered-storage"))]
450 pub fn create_node_with_props_versioned(
451 &self,
452 labels: &[&str],
453 properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
454 epoch: EpochId,
455 tx_id: TxId,
456 ) -> NodeId {
457 let id = self.create_node_versioned(labels, epoch, tx_id);
458
459 for (key, value) in properties {
460 let prop_key: PropertyKey = key.into();
461 let prop_value: Value = value.into();
462 self.update_property_index_on_set(id, &prop_key, &prop_value);
464 self.node_properties.set(id, prop_key, prop_value);
465 }
466
467 let count = self.node_properties.get_all(id).len() as u16;
469 if let Some(chain) = self.nodes.write().get_mut(&id)
470 && let Some(record) = chain.latest_mut()
471 {
472 record.props_count = count;
473 }
474
475 id
476 }
477
478 #[cfg(feature = "tiered-storage")]
481 pub fn create_node_with_props_versioned(
482 &self,
483 labels: &[&str],
484 properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
485 epoch: EpochId,
486 tx_id: TxId,
487 ) -> NodeId {
488 let id = self.create_node_versioned(labels, epoch, tx_id);
489
490 for (key, value) in properties {
491 let prop_key: PropertyKey = key.into();
492 let prop_value: Value = value.into();
493 self.update_property_index_on_set(id, &prop_key, &prop_value);
495 self.node_properties.set(id, prop_key, prop_value);
496 }
497
498 id
502 }
503
504 #[must_use]
506 pub fn get_node(&self, id: NodeId) -> Option<Node> {
507 self.get_node_at_epoch(id, self.current_epoch())
508 }
509
510 #[must_use]
512 #[cfg(not(feature = "tiered-storage"))]
513 pub fn get_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> Option<Node> {
514 let nodes = self.nodes.read();
515 let chain = nodes.get(&id)?;
516 let record = chain.visible_at(epoch)?;
517
518 if record.is_deleted() {
519 return None;
520 }
521
522 let mut node = Node::new(id);
523
524 let id_to_label = self.id_to_label.read();
526 let node_labels = self.node_labels.read();
527 if let Some(label_ids) = node_labels.get(&id) {
528 for &label_id in label_ids {
529 if let Some(label) = id_to_label.get(label_id as usize) {
530 node.labels.push(label.clone());
531 }
532 }
533 }
534
535 node.properties = self.node_properties.get_all(id).into_iter().collect();
537
538 Some(node)
539 }
540
541 #[must_use]
544 #[cfg(feature = "tiered-storage")]
545 pub fn get_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> Option<Node> {
546 let versions = self.node_versions.read();
547 let index = versions.get(&id)?;
548 let version_ref = index.visible_at(epoch)?;
549
550 let record = self.read_node_record(&version_ref)?;
552
553 if record.is_deleted() {
554 return None;
555 }
556
557 let mut node = Node::new(id);
558
559 let id_to_label = self.id_to_label.read();
561 let node_labels = self.node_labels.read();
562 if let Some(label_ids) = node_labels.get(&id) {
563 for &label_id in label_ids {
564 if let Some(label) = id_to_label.get(label_id as usize) {
565 node.labels.push(label.clone());
566 }
567 }
568 }
569
570 node.properties = self.node_properties.get_all(id).into_iter().collect();
572
573 Some(node)
574 }
575
576 #[must_use]
578 #[cfg(not(feature = "tiered-storage"))]
579 pub fn get_node_versioned(&self, id: NodeId, epoch: EpochId, tx_id: TxId) -> Option<Node> {
580 let nodes = self.nodes.read();
581 let chain = nodes.get(&id)?;
582 let record = chain.visible_to(epoch, tx_id)?;
583
584 if record.is_deleted() {
585 return None;
586 }
587
588 let mut node = Node::new(id);
589
590 let id_to_label = self.id_to_label.read();
592 let node_labels = self.node_labels.read();
593 if let Some(label_ids) = node_labels.get(&id) {
594 for &label_id in label_ids {
595 if let Some(label) = id_to_label.get(label_id as usize) {
596 node.labels.push(label.clone());
597 }
598 }
599 }
600
601 node.properties = self.node_properties.get_all(id).into_iter().collect();
603
604 Some(node)
605 }
606
607 #[must_use]
610 #[cfg(feature = "tiered-storage")]
611 pub fn get_node_versioned(&self, id: NodeId, epoch: EpochId, tx_id: TxId) -> Option<Node> {
612 let versions = self.node_versions.read();
613 let index = versions.get(&id)?;
614 let version_ref = index.visible_to(epoch, tx_id)?;
615
616 let record = self.read_node_record(&version_ref)?;
618
619 if record.is_deleted() {
620 return None;
621 }
622
623 let mut node = Node::new(id);
624
625 let id_to_label = self.id_to_label.read();
627 let node_labels = self.node_labels.read();
628 if let Some(label_ids) = node_labels.get(&id) {
629 for &label_id in label_ids {
630 if let Some(label) = id_to_label.get(label_id as usize) {
631 node.labels.push(label.clone());
632 }
633 }
634 }
635
636 node.properties = self.node_properties.get_all(id).into_iter().collect();
638
639 Some(node)
640 }
641
642 #[cfg(feature = "tiered-storage")]
644 #[allow(unsafe_code)]
645 fn read_node_record(&self, version_ref: &VersionRef) -> Option<NodeRecord> {
646 match version_ref {
647 VersionRef::Hot(hot_ref) => {
648 let arena = self.arena_allocator.arena(hot_ref.epoch);
649 let record: &NodeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
651 Some(*record)
652 }
653 VersionRef::Cold(cold_ref) => {
654 self.epoch_store
656 .get_node(cold_ref.epoch, cold_ref.block_offset, cold_ref.length)
657 }
658 }
659 }
660
661 pub fn delete_node(&self, id: NodeId) -> bool {
663 self.needs_stats_recompute.store(true, Ordering::Relaxed);
664 self.delete_node_at_epoch(id, self.current_epoch())
665 }
666
667 #[cfg(not(feature = "tiered-storage"))]
669 pub fn delete_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> bool {
670 let mut nodes = self.nodes.write();
671 if let Some(chain) = nodes.get_mut(&id) {
672 if let Some(record) = chain.visible_at(epoch) {
674 if record.is_deleted() {
675 return false;
676 }
677 } else {
678 return false;
680 }
681
682 chain.mark_deleted(epoch);
684
685 let mut index = self.label_index.write();
687 let mut node_labels = self.node_labels.write();
688 if let Some(label_ids) = node_labels.remove(&id) {
689 for label_id in label_ids {
690 if let Some(set) = index.get_mut(label_id as usize) {
691 set.remove(&id);
692 }
693 }
694 }
695
696 drop(nodes); drop(index);
699 drop(node_labels);
700 self.node_properties.remove_all(id);
701
702 true
705 } else {
706 false
707 }
708 }
709
710 #[cfg(feature = "tiered-storage")]
713 pub fn delete_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> bool {
714 let mut versions = self.node_versions.write();
715 if let Some(index) = versions.get_mut(&id) {
716 if let Some(version_ref) = index.visible_at(epoch) {
718 if let Some(record) = self.read_node_record(&version_ref) {
719 if record.is_deleted() {
720 return false;
721 }
722 } else {
723 return false;
724 }
725 } else {
726 return false;
727 }
728
729 index.mark_deleted(epoch);
731
732 let mut label_index = self.label_index.write();
734 let mut node_labels = self.node_labels.write();
735 if let Some(label_ids) = node_labels.remove(&id) {
736 for label_id in label_ids {
737 if let Some(set) = label_index.get_mut(label_id as usize) {
738 set.remove(&id);
739 }
740 }
741 }
742
743 drop(versions);
745 drop(label_index);
746 drop(node_labels);
747 self.node_properties.remove_all(id);
748
749 true
750 } else {
751 false
752 }
753 }
754
755 #[cfg(not(feature = "tiered-storage"))]
760 pub fn delete_node_edges(&self, node_id: NodeId) {
761 let outgoing: Vec<EdgeId> = self
763 .forward_adj
764 .edges_from(node_id)
765 .into_iter()
766 .map(|(_, edge_id)| edge_id)
767 .collect();
768
769 let incoming: Vec<EdgeId> = if let Some(ref backward) = self.backward_adj {
771 backward
772 .edges_from(node_id)
773 .into_iter()
774 .map(|(_, edge_id)| edge_id)
775 .collect()
776 } else {
777 let epoch = self.current_epoch();
779 self.edges
780 .read()
781 .iter()
782 .filter_map(|(id, chain)| {
783 chain.visible_at(epoch).and_then(|r| {
784 if !r.is_deleted() && r.dst == node_id {
785 Some(*id)
786 } else {
787 None
788 }
789 })
790 })
791 .collect()
792 };
793
794 for edge_id in outgoing.into_iter().chain(incoming) {
796 self.delete_edge(edge_id);
797 }
798 }
799
800 #[cfg(feature = "tiered-storage")]
803 pub fn delete_node_edges(&self, node_id: NodeId) {
804 let outgoing: Vec<EdgeId> = self
806 .forward_adj
807 .edges_from(node_id)
808 .into_iter()
809 .map(|(_, edge_id)| edge_id)
810 .collect();
811
812 let incoming: Vec<EdgeId> = if let Some(ref backward) = self.backward_adj {
814 backward
815 .edges_from(node_id)
816 .into_iter()
817 .map(|(_, edge_id)| edge_id)
818 .collect()
819 } else {
820 let epoch = self.current_epoch();
822 let versions = self.edge_versions.read();
823 versions
824 .iter()
825 .filter_map(|(id, index)| {
826 index.visible_at(epoch).and_then(|vref| {
827 self.read_edge_record(&vref).and_then(|r| {
828 if !r.is_deleted() && r.dst == node_id {
829 Some(*id)
830 } else {
831 None
832 }
833 })
834 })
835 })
836 .collect()
837 };
838
839 for edge_id in outgoing.into_iter().chain(incoming) {
841 self.delete_edge(edge_id);
842 }
843 }
844
845 #[cfg(not(feature = "tiered-storage"))]
847 pub fn set_node_property(&self, id: NodeId, key: &str, value: Value) {
848 let prop_key: PropertyKey = key.into();
849
850 self.update_property_index_on_set(id, &prop_key, &value);
852
853 self.node_properties.set(id, prop_key, value);
854
855 let count = self.node_properties.get_all(id).len() as u16;
857 if let Some(chain) = self.nodes.write().get_mut(&id)
858 && let Some(record) = chain.latest_mut()
859 {
860 record.props_count = count;
861 }
862 }
863
864 #[cfg(feature = "tiered-storage")]
867 pub fn set_node_property(&self, id: NodeId, key: &str, value: Value) {
868 let prop_key: PropertyKey = key.into();
869
870 self.update_property_index_on_set(id, &prop_key, &value);
872
873 self.node_properties.set(id, prop_key, value);
874 }
878
879 pub fn set_edge_property(&self, id: EdgeId, key: &str, value: Value) {
881 self.edge_properties.set(id, key.into(), value);
882 }
883
884 #[cfg(not(feature = "tiered-storage"))]
888 pub fn remove_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
889 let prop_key: PropertyKey = key.into();
890
891 self.update_property_index_on_remove(id, &prop_key);
893
894 let result = self.node_properties.remove(id, &prop_key);
895
896 let count = self.node_properties.get_all(id).len() as u16;
898 if let Some(chain) = self.nodes.write().get_mut(&id)
899 && let Some(record) = chain.latest_mut()
900 {
901 record.props_count = count;
902 }
903
904 result
905 }
906
907 #[cfg(feature = "tiered-storage")]
910 pub fn remove_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
911 let prop_key: PropertyKey = key.into();
912
913 self.update_property_index_on_remove(id, &prop_key);
915
916 self.node_properties.remove(id, &prop_key)
917 }
919
920 pub fn remove_edge_property(&self, id: EdgeId, key: &str) -> Option<Value> {
924 self.edge_properties.remove(id, &key.into())
925 }
926
927 #[must_use]
942 pub fn get_node_property(&self, id: NodeId, key: &PropertyKey) -> Option<Value> {
943 self.node_properties.get(id, key)
944 }
945
946 #[must_use]
950 pub fn get_edge_property(&self, id: EdgeId, key: &PropertyKey) -> Option<Value> {
951 self.edge_properties.get(id, key)
952 }
953
954 #[must_use]
977 pub fn get_node_property_batch(&self, ids: &[NodeId], key: &PropertyKey) -> Vec<Option<Value>> {
978 self.node_properties.get_batch(ids, key)
979 }
980
981 #[must_use]
986 pub fn get_nodes_properties_batch(&self, ids: &[NodeId]) -> Vec<FxHashMap<PropertyKey, Value>> {
987 self.node_properties.get_all_batch(ids)
988 }
989
990 #[must_use]
1018 pub fn get_nodes_properties_selective_batch(
1019 &self,
1020 ids: &[NodeId],
1021 keys: &[PropertyKey],
1022 ) -> Vec<FxHashMap<PropertyKey, Value>> {
1023 self.node_properties.get_selective_batch(ids, keys)
1024 }
1025
1026 #[must_use]
1030 pub fn get_edges_properties_selective_batch(
1031 &self,
1032 ids: &[EdgeId],
1033 keys: &[PropertyKey],
1034 ) -> Vec<FxHashMap<PropertyKey, Value>> {
1035 self.edge_properties.get_selective_batch(ids, keys)
1036 }
1037
1038 #[must_use]
1074 pub fn find_nodes_in_range(
1075 &self,
1076 property: &str,
1077 min: Option<&Value>,
1078 max: Option<&Value>,
1079 min_inclusive: bool,
1080 max_inclusive: bool,
1081 ) -> Vec<NodeId> {
1082 let key = PropertyKey::new(property);
1083
1084 if !self
1086 .node_properties
1087 .might_match_range(&key, min, max, min_inclusive, max_inclusive)
1088 {
1089 return Vec::new();
1090 }
1091
1092 self.node_ids()
1094 .into_iter()
1095 .filter(|&node_id| {
1096 self.node_properties
1097 .get(node_id, &key)
1098 .is_some_and(|v| value_in_range(&v, min, max, min_inclusive, max_inclusive))
1099 })
1100 .collect()
1101 }
1102
1103 #[must_use]
1128 pub fn find_nodes_by_properties(&self, conditions: &[(&str, Value)]) -> Vec<NodeId> {
1129 if conditions.is_empty() {
1130 return self.node_ids();
1131 }
1132
1133 let mut best_start: Option<(usize, Vec<NodeId>)> = None;
1136 let indexes = self.property_indexes.read();
1137
1138 for (i, (prop, value)) in conditions.iter().enumerate() {
1139 let key = PropertyKey::new(*prop);
1140 let hv = HashableValue::new(value.clone());
1141
1142 if let Some(index) = indexes.get(&key) {
1143 let matches: Vec<NodeId> = index
1144 .get(&hv)
1145 .map(|nodes| nodes.iter().copied().collect())
1146 .unwrap_or_default();
1147
1148 if matches.is_empty() {
1150 return Vec::new();
1151 }
1152
1153 if best_start
1155 .as_ref()
1156 .is_none_or(|(_, best)| matches.len() < best.len())
1157 {
1158 best_start = Some((i, matches));
1159 }
1160 }
1161 }
1162 drop(indexes);
1163
1164 let (start_idx, mut candidates) = best_start.unwrap_or_else(|| {
1166 let (prop, value) = &conditions[0];
1168 (0, self.find_nodes_by_property(prop, value))
1169 });
1170
1171 for (i, (prop, value)) in conditions.iter().enumerate() {
1173 if i == start_idx {
1174 continue;
1175 }
1176
1177 let key = PropertyKey::new(*prop);
1178 candidates.retain(|&node_id| {
1179 self.node_properties
1180 .get(node_id, &key)
1181 .is_some_and(|v| v == *value)
1182 });
1183
1184 if candidates.is_empty() {
1186 return Vec::new();
1187 }
1188 }
1189
1190 candidates
1191 }
1192
1193 pub fn create_property_index(&self, property: &str) {
1221 let key = PropertyKey::new(property);
1222
1223 let mut indexes = self.property_indexes.write();
1224 if indexes.contains_key(&key) {
1225 return; }
1227
1228 let index: DashMap<HashableValue, FxHashSet<NodeId>> = DashMap::new();
1230
1231 for node_id in self.node_ids() {
1233 if let Some(value) = self.node_properties.get(node_id, &key) {
1234 let hv = HashableValue::new(value);
1235 index.entry(hv).or_default().insert(node_id);
1236 }
1237 }
1238
1239 indexes.insert(key, index);
1240 }
1241
1242 pub fn drop_property_index(&self, property: &str) -> bool {
1246 let key = PropertyKey::new(property);
1247 self.property_indexes.write().remove(&key).is_some()
1248 }
1249
1250 #[must_use]
1252 pub fn has_property_index(&self, property: &str) -> bool {
1253 let key = PropertyKey::new(property);
1254 self.property_indexes.read().contains_key(&key)
1255 }
1256
1257 #[cfg(feature = "vector-index")]
1259 pub fn add_vector_index(&self, label: &str, property: &str, index: Arc<HnswIndex>) {
1260 let key = format!("{label}:{property}");
1261 self.vector_indexes.write().insert(key, index);
1262 }
1263
1264 #[cfg(feature = "vector-index")]
1266 #[must_use]
1267 pub fn get_vector_index(&self, label: &str, property: &str) -> Option<Arc<HnswIndex>> {
1268 let key = format!("{label}:{property}");
1269 self.vector_indexes.read().get(&key).cloned()
1270 }
1271
1272 #[cfg(feature = "vector-index")]
1276 pub fn remove_vector_index(&self, label: &str, property: &str) -> bool {
1277 let key = format!("{label}:{property}");
1278 self.vector_indexes.write().remove(&key).is_some()
1279 }
1280
1281 #[cfg(feature = "vector-index")]
1285 #[must_use]
1286 pub fn vector_index_entries(&self) -> Vec<(String, Arc<HnswIndex>)> {
1287 self.vector_indexes
1288 .read()
1289 .iter()
1290 .map(|(k, v)| (k.clone(), v.clone()))
1291 .collect()
1292 }
1293
1294 #[must_use]
1317 pub fn find_nodes_by_property(&self, property: &str, value: &Value) -> Vec<NodeId> {
1318 let key = PropertyKey::new(property);
1319 let hv = HashableValue::new(value.clone());
1320
1321 let indexes = self.property_indexes.read();
1323 if let Some(index) = indexes.get(&key) {
1324 if let Some(nodes) = index.get(&hv) {
1325 return nodes.iter().copied().collect();
1326 }
1327 return Vec::new();
1328 }
1329 drop(indexes);
1330
1331 self.node_ids()
1333 .into_iter()
1334 .filter(|&node_id| {
1335 self.node_properties
1336 .get(node_id, &key)
1337 .is_some_and(|v| v == *value)
1338 })
1339 .collect()
1340 }
1341
1342 fn update_property_index_on_set(&self, node_id: NodeId, key: &PropertyKey, new_value: &Value) {
1344 let indexes = self.property_indexes.read();
1345 if let Some(index) = indexes.get(key) {
1346 if let Some(old_value) = self.node_properties.get(node_id, key) {
1348 let old_hv = HashableValue::new(old_value);
1349 if let Some(mut nodes) = index.get_mut(&old_hv) {
1350 nodes.remove(&node_id);
1351 if nodes.is_empty() {
1352 drop(nodes);
1353 index.remove(&old_hv);
1354 }
1355 }
1356 }
1357
1358 let new_hv = HashableValue::new(new_value.clone());
1360 index
1361 .entry(new_hv)
1362 .or_insert_with(FxHashSet::default)
1363 .insert(node_id);
1364 }
1365 }
1366
1367 fn update_property_index_on_remove(&self, node_id: NodeId, key: &PropertyKey) {
1369 let indexes = self.property_indexes.read();
1370 if let Some(index) = indexes.get(key) {
1371 if let Some(old_value) = self.node_properties.get(node_id, key) {
1373 let old_hv = HashableValue::new(old_value);
1374 if let Some(mut nodes) = index.get_mut(&old_hv) {
1375 nodes.remove(&node_id);
1376 if nodes.is_empty() {
1377 drop(nodes);
1378 index.remove(&old_hv);
1379 }
1380 }
1381 }
1382 }
1383 }
1384
1385 #[cfg(not(feature = "tiered-storage"))]
1390 pub fn add_label(&self, node_id: NodeId, label: &str) -> bool {
1391 let epoch = self.current_epoch();
1392
1393 let nodes = self.nodes.read();
1395 if let Some(chain) = nodes.get(&node_id) {
1396 if chain.visible_at(epoch).map_or(true, |r| r.is_deleted()) {
1397 return false;
1398 }
1399 } else {
1400 return false;
1401 }
1402 drop(nodes);
1403
1404 let label_id = self.get_or_create_label_id(label);
1406
1407 let mut node_labels = self.node_labels.write();
1409 let label_set = node_labels.entry(node_id).or_default();
1410
1411 if label_set.contains(&label_id) {
1412 return false; }
1414
1415 label_set.insert(label_id);
1416 drop(node_labels);
1417
1418 let mut index = self.label_index.write();
1420 if (label_id as usize) >= index.len() {
1421 index.resize(label_id as usize + 1, FxHashMap::default());
1422 }
1423 index[label_id as usize].insert(node_id, ());
1424
1425 if let Some(chain) = self.nodes.write().get_mut(&node_id)
1427 && let Some(record) = chain.latest_mut()
1428 {
1429 let count = self.node_labels.read().get(&node_id).map_or(0, |s| s.len());
1430 record.set_label_count(count as u16);
1431 }
1432
1433 true
1434 }
1435
1436 #[cfg(feature = "tiered-storage")]
1439 pub fn add_label(&self, node_id: NodeId, label: &str) -> bool {
1440 let epoch = self.current_epoch();
1441
1442 let versions = self.node_versions.read();
1444 if let Some(index) = versions.get(&node_id) {
1445 if let Some(vref) = index.visible_at(epoch) {
1446 if let Some(record) = self.read_node_record(&vref) {
1447 if record.is_deleted() {
1448 return false;
1449 }
1450 } else {
1451 return false;
1452 }
1453 } else {
1454 return false;
1455 }
1456 } else {
1457 return false;
1458 }
1459 drop(versions);
1460
1461 let label_id = self.get_or_create_label_id(label);
1463
1464 let mut node_labels = self.node_labels.write();
1466 let label_set = node_labels.entry(node_id).or_default();
1467
1468 if label_set.contains(&label_id) {
1469 return false; }
1471
1472 label_set.insert(label_id);
1473 drop(node_labels);
1474
1475 let mut index = self.label_index.write();
1477 if (label_id as usize) >= index.len() {
1478 index.resize(label_id as usize + 1, FxHashMap::default());
1479 }
1480 index[label_id as usize].insert(node_id, ());
1481
1482 true
1486 }
1487
1488 #[cfg(not(feature = "tiered-storage"))]
1493 pub fn remove_label(&self, node_id: NodeId, label: &str) -> bool {
1494 let epoch = self.current_epoch();
1495
1496 let nodes = self.nodes.read();
1498 if let Some(chain) = nodes.get(&node_id) {
1499 if chain.visible_at(epoch).map_or(true, |r| r.is_deleted()) {
1500 return false;
1501 }
1502 } else {
1503 return false;
1504 }
1505 drop(nodes);
1506
1507 let label_id = {
1509 let label_ids = self.label_to_id.read();
1510 match label_ids.get(label) {
1511 Some(&id) => id,
1512 None => return false, }
1514 };
1515
1516 let mut node_labels = self.node_labels.write();
1518 if let Some(label_set) = node_labels.get_mut(&node_id) {
1519 if !label_set.remove(&label_id) {
1520 return false; }
1522 } else {
1523 return false;
1524 }
1525 drop(node_labels);
1526
1527 let mut index = self.label_index.write();
1529 if (label_id as usize) < index.len() {
1530 index[label_id as usize].remove(&node_id);
1531 }
1532
1533 if let Some(chain) = self.nodes.write().get_mut(&node_id)
1535 && let Some(record) = chain.latest_mut()
1536 {
1537 let count = self.node_labels.read().get(&node_id).map_or(0, |s| s.len());
1538 record.set_label_count(count as u16);
1539 }
1540
1541 true
1542 }
1543
1544 #[cfg(feature = "tiered-storage")]
1547 pub fn remove_label(&self, node_id: NodeId, label: &str) -> bool {
1548 let epoch = self.current_epoch();
1549
1550 let versions = self.node_versions.read();
1552 if let Some(index) = versions.get(&node_id) {
1553 if let Some(vref) = index.visible_at(epoch) {
1554 if let Some(record) = self.read_node_record(&vref) {
1555 if record.is_deleted() {
1556 return false;
1557 }
1558 } else {
1559 return false;
1560 }
1561 } else {
1562 return false;
1563 }
1564 } else {
1565 return false;
1566 }
1567 drop(versions);
1568
1569 let label_id = {
1571 let label_ids = self.label_to_id.read();
1572 match label_ids.get(label) {
1573 Some(&id) => id,
1574 None => return false, }
1576 };
1577
1578 let mut node_labels = self.node_labels.write();
1580 if let Some(label_set) = node_labels.get_mut(&node_id) {
1581 if !label_set.remove(&label_id) {
1582 return false; }
1584 } else {
1585 return false;
1586 }
1587 drop(node_labels);
1588
1589 let mut index = self.label_index.write();
1591 if (label_id as usize) < index.len() {
1592 index[label_id as usize].remove(&node_id);
1593 }
1594
1595 true
1598 }
1599
1600 #[must_use]
1602 #[cfg(not(feature = "tiered-storage"))]
1603 pub fn node_count(&self) -> usize {
1604 let epoch = self.current_epoch();
1605 self.nodes
1606 .read()
1607 .values()
1608 .filter_map(|chain| chain.visible_at(epoch))
1609 .filter(|r| !r.is_deleted())
1610 .count()
1611 }
1612
1613 #[must_use]
1616 #[cfg(feature = "tiered-storage")]
1617 pub fn node_count(&self) -> usize {
1618 let epoch = self.current_epoch();
1619 let versions = self.node_versions.read();
1620 versions
1621 .iter()
1622 .filter(|(_, index)| {
1623 index.visible_at(epoch).map_or(false, |vref| {
1624 self.read_node_record(&vref)
1625 .map_or(false, |r| !r.is_deleted())
1626 })
1627 })
1628 .count()
1629 }
1630
1631 #[must_use]
1637 #[cfg(not(feature = "tiered-storage"))]
1638 pub fn node_ids(&self) -> Vec<NodeId> {
1639 let epoch = self.current_epoch();
1640 let mut ids: Vec<NodeId> = self
1641 .nodes
1642 .read()
1643 .iter()
1644 .filter_map(|(id, chain)| {
1645 chain
1646 .visible_at(epoch)
1647 .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
1648 })
1649 .collect();
1650 ids.sort_unstable();
1651 ids
1652 }
1653
1654 #[must_use]
1657 #[cfg(feature = "tiered-storage")]
1658 pub fn node_ids(&self) -> Vec<NodeId> {
1659 let epoch = self.current_epoch();
1660 let versions = self.node_versions.read();
1661 let mut ids: Vec<NodeId> = versions
1662 .iter()
1663 .filter_map(|(id, index)| {
1664 index.visible_at(epoch).and_then(|vref| {
1665 self.read_node_record(&vref)
1666 .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
1667 })
1668 })
1669 .collect();
1670 ids.sort_unstable();
1671 ids
1672 }
1673
1674 pub fn create_edge(&self, src: NodeId, dst: NodeId, edge_type: &str) -> EdgeId {
1678 self.needs_stats_recompute.store(true, Ordering::Relaxed);
1679 self.create_edge_versioned(src, dst, edge_type, self.current_epoch(), TxId::SYSTEM)
1680 }
1681
1682 #[cfg(not(feature = "tiered-storage"))]
1684 pub fn create_edge_versioned(
1685 &self,
1686 src: NodeId,
1687 dst: NodeId,
1688 edge_type: &str,
1689 epoch: EpochId,
1690 tx_id: TxId,
1691 ) -> EdgeId {
1692 let id = EdgeId::new(self.next_edge_id.fetch_add(1, Ordering::Relaxed));
1693 let type_id = self.get_or_create_edge_type_id(edge_type);
1694
1695 let record = EdgeRecord::new(id, src, dst, type_id, epoch);
1696 let chain = VersionChain::with_initial(record, epoch, tx_id);
1697 self.edges.write().insert(id, chain);
1698
1699 self.forward_adj.add_edge(src, dst, id);
1701 if let Some(ref backward) = self.backward_adj {
1702 backward.add_edge(dst, src, id);
1703 }
1704
1705 id
1706 }
1707
1708 #[cfg(feature = "tiered-storage")]
1711 pub fn create_edge_versioned(
1712 &self,
1713 src: NodeId,
1714 dst: NodeId,
1715 edge_type: &str,
1716 epoch: EpochId,
1717 tx_id: TxId,
1718 ) -> EdgeId {
1719 let id = EdgeId::new(self.next_edge_id.fetch_add(1, Ordering::Relaxed));
1720 let type_id = self.get_or_create_edge_type_id(edge_type);
1721
1722 let record = EdgeRecord::new(id, src, dst, type_id, epoch);
1723
1724 let arena = self.arena_allocator.arena_or_create(epoch);
1726 let (offset, _stored) = arena.alloc_value_with_offset(record);
1727
1728 let hot_ref = HotVersionRef::new(epoch, offset, tx_id);
1730
1731 let mut versions = self.edge_versions.write();
1733 if let Some(index) = versions.get_mut(&id) {
1734 index.add_hot(hot_ref);
1735 } else {
1736 versions.insert(id, VersionIndex::with_initial(hot_ref));
1737 }
1738
1739 self.forward_adj.add_edge(src, dst, id);
1741 if let Some(ref backward) = self.backward_adj {
1742 backward.add_edge(dst, src, id);
1743 }
1744
1745 id
1746 }
1747
1748 pub fn create_edge_with_props(
1750 &self,
1751 src: NodeId,
1752 dst: NodeId,
1753 edge_type: &str,
1754 properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
1755 ) -> EdgeId {
1756 let id = self.create_edge(src, dst, edge_type);
1757
1758 for (key, value) in properties {
1759 self.edge_properties.set(id, key.into(), value.into());
1760 }
1761
1762 id
1763 }
1764
1765 #[must_use]
1767 pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
1768 self.get_edge_at_epoch(id, self.current_epoch())
1769 }
1770
1771 #[must_use]
1773 #[cfg(not(feature = "tiered-storage"))]
1774 pub fn get_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> Option<Edge> {
1775 let edges = self.edges.read();
1776 let chain = edges.get(&id)?;
1777 let record = chain.visible_at(epoch)?;
1778
1779 if record.is_deleted() {
1780 return None;
1781 }
1782
1783 let edge_type = {
1784 let id_to_type = self.id_to_edge_type.read();
1785 id_to_type.get(record.type_id as usize)?.clone()
1786 };
1787
1788 let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1789
1790 edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1792
1793 Some(edge)
1794 }
1795
1796 #[must_use]
1799 #[cfg(feature = "tiered-storage")]
1800 pub fn get_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> Option<Edge> {
1801 let versions = self.edge_versions.read();
1802 let index = versions.get(&id)?;
1803 let version_ref = index.visible_at(epoch)?;
1804
1805 let record = self.read_edge_record(&version_ref)?;
1806
1807 if record.is_deleted() {
1808 return None;
1809 }
1810
1811 let edge_type = {
1812 let id_to_type = self.id_to_edge_type.read();
1813 id_to_type.get(record.type_id as usize)?.clone()
1814 };
1815
1816 let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1817
1818 edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1820
1821 Some(edge)
1822 }
1823
1824 #[must_use]
1826 #[cfg(not(feature = "tiered-storage"))]
1827 pub fn get_edge_versioned(&self, id: EdgeId, epoch: EpochId, tx_id: TxId) -> Option<Edge> {
1828 let edges = self.edges.read();
1829 let chain = edges.get(&id)?;
1830 let record = chain.visible_to(epoch, tx_id)?;
1831
1832 if record.is_deleted() {
1833 return None;
1834 }
1835
1836 let edge_type = {
1837 let id_to_type = self.id_to_edge_type.read();
1838 id_to_type.get(record.type_id as usize)?.clone()
1839 };
1840
1841 let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1842
1843 edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1845
1846 Some(edge)
1847 }
1848
1849 #[must_use]
1852 #[cfg(feature = "tiered-storage")]
1853 pub fn get_edge_versioned(&self, id: EdgeId, epoch: EpochId, tx_id: TxId) -> Option<Edge> {
1854 let versions = self.edge_versions.read();
1855 let index = versions.get(&id)?;
1856 let version_ref = index.visible_to(epoch, tx_id)?;
1857
1858 let record = self.read_edge_record(&version_ref)?;
1859
1860 if record.is_deleted() {
1861 return None;
1862 }
1863
1864 let edge_type = {
1865 let id_to_type = self.id_to_edge_type.read();
1866 id_to_type.get(record.type_id as usize)?.clone()
1867 };
1868
1869 let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1870
1871 edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1873
1874 Some(edge)
1875 }
1876
1877 #[cfg(feature = "tiered-storage")]
1879 #[allow(unsafe_code)]
1880 fn read_edge_record(&self, version_ref: &VersionRef) -> Option<EdgeRecord> {
1881 match version_ref {
1882 VersionRef::Hot(hot_ref) => {
1883 let arena = self.arena_allocator.arena(hot_ref.epoch);
1884 let record: &EdgeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
1886 Some(*record)
1887 }
1888 VersionRef::Cold(cold_ref) => {
1889 self.epoch_store
1891 .get_edge(cold_ref.epoch, cold_ref.block_offset, cold_ref.length)
1892 }
1893 }
1894 }
1895
1896 pub fn delete_edge(&self, id: EdgeId) -> bool {
1898 self.needs_stats_recompute.store(true, Ordering::Relaxed);
1899 self.delete_edge_at_epoch(id, self.current_epoch())
1900 }
1901
1902 #[cfg(not(feature = "tiered-storage"))]
1904 pub fn delete_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> bool {
1905 let mut edges = self.edges.write();
1906 if let Some(chain) = edges.get_mut(&id) {
1907 let (src, dst) = {
1909 match chain.visible_at(epoch) {
1910 Some(record) => {
1911 if record.is_deleted() {
1912 return false;
1913 }
1914 (record.src, record.dst)
1915 }
1916 None => return false, }
1918 };
1919
1920 chain.mark_deleted(epoch);
1922
1923 drop(edges); self.forward_adj.mark_deleted(src, id);
1927 if let Some(ref backward) = self.backward_adj {
1928 backward.mark_deleted(dst, id);
1929 }
1930
1931 self.edge_properties.remove_all(id);
1933
1934 true
1935 } else {
1936 false
1937 }
1938 }
1939
1940 #[cfg(feature = "tiered-storage")]
1943 pub fn delete_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> bool {
1944 let mut versions = self.edge_versions.write();
1945 if let Some(index) = versions.get_mut(&id) {
1946 let (src, dst) = {
1948 match index.visible_at(epoch) {
1949 Some(version_ref) => {
1950 if let Some(record) = self.read_edge_record(&version_ref) {
1951 if record.is_deleted() {
1952 return false;
1953 }
1954 (record.src, record.dst)
1955 } else {
1956 return false;
1957 }
1958 }
1959 None => return false,
1960 }
1961 };
1962
1963 index.mark_deleted(epoch);
1965
1966 drop(versions); self.forward_adj.mark_deleted(src, id);
1970 if let Some(ref backward) = self.backward_adj {
1971 backward.mark_deleted(dst, id);
1972 }
1973
1974 self.edge_properties.remove_all(id);
1976
1977 true
1978 } else {
1979 false
1980 }
1981 }
1982
1983 #[must_use]
1985 #[cfg(not(feature = "tiered-storage"))]
1986 pub fn edge_count(&self) -> usize {
1987 let epoch = self.current_epoch();
1988 self.edges
1989 .read()
1990 .values()
1991 .filter_map(|chain| chain.visible_at(epoch))
1992 .filter(|r| !r.is_deleted())
1993 .count()
1994 }
1995
1996 #[must_use]
1999 #[cfg(feature = "tiered-storage")]
2000 pub fn edge_count(&self) -> usize {
2001 let epoch = self.current_epoch();
2002 let versions = self.edge_versions.read();
2003 versions
2004 .iter()
2005 .filter(|(_, index)| {
2006 index.visible_at(epoch).map_or(false, |vref| {
2007 self.read_edge_record(&vref)
2008 .map_or(false, |r| !r.is_deleted())
2009 })
2010 })
2011 .count()
2012 }
2013
2014 #[cfg(not(feature = "tiered-storage"))]
2019 pub fn discard_uncommitted_versions(&self, tx_id: TxId) {
2020 {
2022 let mut nodes = self.nodes.write();
2023 for chain in nodes.values_mut() {
2024 chain.remove_versions_by(tx_id);
2025 }
2026 nodes.retain(|_, chain| !chain.is_empty());
2028 }
2029
2030 {
2032 let mut edges = self.edges.write();
2033 for chain in edges.values_mut() {
2034 chain.remove_versions_by(tx_id);
2035 }
2036 edges.retain(|_, chain| !chain.is_empty());
2038 }
2039 }
2040
2041 #[cfg(feature = "tiered-storage")]
2044 pub fn discard_uncommitted_versions(&self, tx_id: TxId) {
2045 {
2047 let mut versions = self.node_versions.write();
2048 for index in versions.values_mut() {
2049 index.remove_versions_by(tx_id);
2050 }
2051 versions.retain(|_, index| !index.is_empty());
2053 }
2054
2055 {
2057 let mut versions = self.edge_versions.write();
2058 for index in versions.values_mut() {
2059 index.remove_versions_by(tx_id);
2060 }
2061 versions.retain(|_, index| !index.is_empty());
2063 }
2064 }
2065
2066 #[cfg(not(feature = "tiered-storage"))]
2071 pub fn gc_versions(&self, min_epoch: EpochId) {
2072 {
2073 let mut nodes = self.nodes.write();
2074 for chain in nodes.values_mut() {
2075 chain.gc(min_epoch);
2076 }
2077 nodes.retain(|_, chain| !chain.is_empty());
2078 }
2079 {
2080 let mut edges = self.edges.write();
2081 for chain in edges.values_mut() {
2082 chain.gc(min_epoch);
2083 }
2084 edges.retain(|_, chain| !chain.is_empty());
2085 }
2086 }
2087
2088 #[cfg(feature = "tiered-storage")]
2090 pub fn gc_versions(&self, min_epoch: EpochId) {
2091 {
2092 let mut versions = self.node_versions.write();
2093 for index in versions.values_mut() {
2094 index.gc(min_epoch);
2095 }
2096 versions.retain(|_, index| !index.is_empty());
2097 }
2098 {
2099 let mut versions = self.edge_versions.write();
2100 for index in versions.values_mut() {
2101 index.gc(min_epoch);
2102 }
2103 versions.retain(|_, index| !index.is_empty());
2104 }
2105 }
2106
2107 #[cfg(feature = "tiered-storage")]
2126 #[allow(unsafe_code)]
2127 pub fn freeze_epoch(&self, epoch: EpochId) -> usize {
2128 let mut node_records: Vec<(u64, NodeRecord)> = Vec::new();
2130 let mut node_hot_refs: Vec<(NodeId, HotVersionRef)> = Vec::new();
2131
2132 {
2133 let versions = self.node_versions.read();
2134 for (node_id, index) in versions.iter() {
2135 for hot_ref in index.hot_refs_for_epoch(epoch) {
2136 let arena = self.arena_allocator.arena(hot_ref.epoch);
2137 let record: &NodeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
2139 node_records.push((node_id.as_u64(), *record));
2140 node_hot_refs.push((*node_id, *hot_ref));
2141 }
2142 }
2143 }
2144
2145 let mut edge_records: Vec<(u64, EdgeRecord)> = Vec::new();
2147 let mut edge_hot_refs: Vec<(EdgeId, HotVersionRef)> = Vec::new();
2148
2149 {
2150 let versions = self.edge_versions.read();
2151 for (edge_id, index) in versions.iter() {
2152 for hot_ref in index.hot_refs_for_epoch(epoch) {
2153 let arena = self.arena_allocator.arena(hot_ref.epoch);
2154 let record: &EdgeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
2156 edge_records.push((edge_id.as_u64(), *record));
2157 edge_hot_refs.push((*edge_id, *hot_ref));
2158 }
2159 }
2160 }
2161
2162 let total_frozen = node_records.len() + edge_records.len();
2163
2164 if total_frozen == 0 {
2165 return 0;
2166 }
2167
2168 let (node_entries, edge_entries) =
2170 self.epoch_store
2171 .freeze_epoch(epoch, node_records, edge_records);
2172
2173 let node_entry_map: FxHashMap<u64, _> = node_entries
2175 .iter()
2176 .map(|e| (e.entity_id, (e.offset, e.length)))
2177 .collect();
2178 let edge_entry_map: FxHashMap<u64, _> = edge_entries
2179 .iter()
2180 .map(|e| (e.entity_id, (e.offset, e.length)))
2181 .collect();
2182
2183 {
2185 let mut versions = self.node_versions.write();
2186 for (node_id, hot_ref) in &node_hot_refs {
2187 if let Some(index) = versions.get_mut(node_id)
2188 && let Some(&(offset, length)) = node_entry_map.get(&node_id.as_u64())
2189 {
2190 let cold_ref = ColdVersionRef {
2191 epoch,
2192 block_offset: offset,
2193 length,
2194 created_by: hot_ref.created_by,
2195 deleted_epoch: hot_ref.deleted_epoch,
2196 };
2197 index.freeze_epoch(epoch, std::iter::once(cold_ref));
2198 }
2199 }
2200 }
2201
2202 {
2203 let mut versions = self.edge_versions.write();
2204 for (edge_id, hot_ref) in &edge_hot_refs {
2205 if let Some(index) = versions.get_mut(edge_id)
2206 && let Some(&(offset, length)) = edge_entry_map.get(&edge_id.as_u64())
2207 {
2208 let cold_ref = ColdVersionRef {
2209 epoch,
2210 block_offset: offset,
2211 length,
2212 created_by: hot_ref.created_by,
2213 deleted_epoch: hot_ref.deleted_epoch,
2214 };
2215 index.freeze_epoch(epoch, std::iter::once(cold_ref));
2216 }
2217 }
2218 }
2219
2220 total_frozen
2221 }
2222
2223 #[cfg(feature = "tiered-storage")]
2225 #[must_use]
2226 pub fn epoch_store(&self) -> &EpochStore {
2227 &self.epoch_store
2228 }
2229
2230 #[must_use]
2232 pub fn label_count(&self) -> usize {
2233 self.id_to_label.read().len()
2234 }
2235
2236 #[must_use]
2240 pub fn property_key_count(&self) -> usize {
2241 let node_keys = self.node_properties.column_count();
2242 let edge_keys = self.edge_properties.column_count();
2243 node_keys + edge_keys
2247 }
2248
2249 #[must_use]
2251 pub fn edge_type_count(&self) -> usize {
2252 self.id_to_edge_type.read().len()
2253 }
2254
2255 pub fn neighbors(
2262 &self,
2263 node: NodeId,
2264 direction: Direction,
2265 ) -> impl Iterator<Item = NodeId> + '_ {
2266 let forward: Box<dyn Iterator<Item = NodeId>> = match direction {
2267 Direction::Outgoing | Direction::Both => {
2268 Box::new(self.forward_adj.neighbors(node).into_iter())
2269 }
2270 Direction::Incoming => Box::new(std::iter::empty()),
2271 };
2272
2273 let backward: Box<dyn Iterator<Item = NodeId>> = match direction {
2274 Direction::Incoming | Direction::Both => {
2275 if let Some(ref adj) = self.backward_adj {
2276 Box::new(adj.neighbors(node).into_iter())
2277 } else {
2278 Box::new(std::iter::empty())
2279 }
2280 }
2281 Direction::Outgoing => Box::new(std::iter::empty()),
2282 };
2283
2284 forward.chain(backward)
2285 }
2286
2287 pub fn edges_from(
2291 &self,
2292 node: NodeId,
2293 direction: Direction,
2294 ) -> impl Iterator<Item = (NodeId, EdgeId)> + '_ {
2295 let forward: Box<dyn Iterator<Item = (NodeId, EdgeId)>> = match direction {
2296 Direction::Outgoing | Direction::Both => {
2297 Box::new(self.forward_adj.edges_from(node).into_iter())
2298 }
2299 Direction::Incoming => Box::new(std::iter::empty()),
2300 };
2301
2302 let backward: Box<dyn Iterator<Item = (NodeId, EdgeId)>> = match direction {
2303 Direction::Incoming | Direction::Both => {
2304 if let Some(ref adj) = self.backward_adj {
2305 Box::new(adj.edges_from(node).into_iter())
2306 } else {
2307 Box::new(std::iter::empty())
2308 }
2309 }
2310 Direction::Outgoing => Box::new(std::iter::empty()),
2311 };
2312
2313 forward.chain(backward)
2314 }
2315
2316 pub fn edges_to(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
2329 if let Some(ref backward) = self.backward_adj {
2330 backward.edges_from(node)
2331 } else {
2332 self.all_edges()
2334 .filter_map(|edge| {
2335 if edge.dst == node {
2336 Some((edge.src, edge.id))
2337 } else {
2338 None
2339 }
2340 })
2341 .collect()
2342 }
2343 }
2344
2345 #[must_use]
2349 pub fn out_degree(&self, node: NodeId) -> usize {
2350 self.forward_adj.out_degree(node)
2351 }
2352
2353 #[must_use]
2358 pub fn in_degree(&self, node: NodeId) -> usize {
2359 if let Some(ref backward) = self.backward_adj {
2360 backward.in_degree(node)
2361 } else {
2362 self.all_edges().filter(|edge| edge.dst == node).count()
2364 }
2365 }
2366
2367 #[must_use]
2369 #[cfg(not(feature = "tiered-storage"))]
2370 pub fn edge_type(&self, id: EdgeId) -> Option<ArcStr> {
2371 let edges = self.edges.read();
2372 let chain = edges.get(&id)?;
2373 let epoch = self.current_epoch();
2374 let record = chain.visible_at(epoch)?;
2375 let id_to_type = self.id_to_edge_type.read();
2376 id_to_type.get(record.type_id as usize).cloned()
2377 }
2378
2379 #[must_use]
2382 #[cfg(feature = "tiered-storage")]
2383 pub fn edge_type(&self, id: EdgeId) -> Option<ArcStr> {
2384 let versions = self.edge_versions.read();
2385 let index = versions.get(&id)?;
2386 let epoch = self.current_epoch();
2387 let vref = index.visible_at(epoch)?;
2388 let record = self.read_edge_record(&vref)?;
2389 let id_to_type = self.id_to_edge_type.read();
2390 id_to_type.get(record.type_id as usize).cloned()
2391 }
2392
2393 pub fn nodes_by_label(&self, label: &str) -> Vec<NodeId> {
2399 let label_to_id = self.label_to_id.read();
2400 if let Some(&label_id) = label_to_id.get(label) {
2401 let index = self.label_index.read();
2402 if let Some(set) = index.get(label_id as usize) {
2403 let mut ids: Vec<NodeId> = set.keys().copied().collect();
2404 ids.sort_unstable();
2405 return ids;
2406 }
2407 }
2408 Vec::new()
2409 }
2410
2411 #[cfg(not(feature = "tiered-storage"))]
2418 pub fn all_nodes(&self) -> impl Iterator<Item = Node> + '_ {
2419 let epoch = self.current_epoch();
2420 let node_ids: Vec<NodeId> = self
2421 .nodes
2422 .read()
2423 .iter()
2424 .filter_map(|(id, chain)| {
2425 chain
2426 .visible_at(epoch)
2427 .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
2428 })
2429 .collect();
2430
2431 node_ids.into_iter().filter_map(move |id| self.get_node(id))
2432 }
2433
2434 #[cfg(feature = "tiered-storage")]
2437 pub fn all_nodes(&self) -> impl Iterator<Item = Node> + '_ {
2438 let node_ids = self.node_ids();
2439 node_ids.into_iter().filter_map(move |id| self.get_node(id))
2440 }
2441
2442 #[cfg(not(feature = "tiered-storage"))]
2447 pub fn all_edges(&self) -> impl Iterator<Item = Edge> + '_ {
2448 let epoch = self.current_epoch();
2449 let edge_ids: Vec<EdgeId> = self
2450 .edges
2451 .read()
2452 .iter()
2453 .filter_map(|(id, chain)| {
2454 chain
2455 .visible_at(epoch)
2456 .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
2457 })
2458 .collect();
2459
2460 edge_ids.into_iter().filter_map(move |id| self.get_edge(id))
2461 }
2462
2463 #[cfg(feature = "tiered-storage")]
2466 pub fn all_edges(&self) -> impl Iterator<Item = Edge> + '_ {
2467 let epoch = self.current_epoch();
2468 let versions = self.edge_versions.read();
2469 let edge_ids: Vec<EdgeId> = versions
2470 .iter()
2471 .filter_map(|(id, index)| {
2472 index.visible_at(epoch).and_then(|vref| {
2473 self.read_edge_record(&vref)
2474 .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
2475 })
2476 })
2477 .collect();
2478
2479 edge_ids.into_iter().filter_map(move |id| self.get_edge(id))
2480 }
2481
2482 pub fn all_labels(&self) -> Vec<String> {
2484 self.id_to_label
2485 .read()
2486 .iter()
2487 .map(|s| s.to_string())
2488 .collect()
2489 }
2490
2491 pub fn all_edge_types(&self) -> Vec<String> {
2493 self.id_to_edge_type
2494 .read()
2495 .iter()
2496 .map(|s| s.to_string())
2497 .collect()
2498 }
2499
2500 pub fn all_property_keys(&self) -> Vec<String> {
2502 let mut keys = std::collections::HashSet::new();
2503 for key in self.node_properties.keys() {
2504 keys.insert(key.to_string());
2505 }
2506 for key in self.edge_properties.keys() {
2507 keys.insert(key.to_string());
2508 }
2509 keys.into_iter().collect()
2510 }
2511
2512 pub fn nodes_with_label<'a>(&'a self, label: &str) -> impl Iterator<Item = Node> + 'a {
2514 let node_ids = self.nodes_by_label(label);
2515 node_ids.into_iter().filter_map(move |id| self.get_node(id))
2516 }
2517
2518 #[cfg(not(feature = "tiered-storage"))]
2520 pub fn edges_with_type<'a>(&'a self, edge_type: &str) -> impl Iterator<Item = Edge> + 'a {
2521 let epoch = self.current_epoch();
2522 let type_to_id = self.edge_type_to_id.read();
2523
2524 if let Some(&type_id) = type_to_id.get(edge_type) {
2525 let edge_ids: Vec<EdgeId> = self
2526 .edges
2527 .read()
2528 .iter()
2529 .filter_map(|(id, chain)| {
2530 chain.visible_at(epoch).and_then(|r| {
2531 if !r.is_deleted() && r.type_id == type_id {
2532 Some(*id)
2533 } else {
2534 None
2535 }
2536 })
2537 })
2538 .collect();
2539
2540 Box::new(edge_ids.into_iter().filter_map(move |id| self.get_edge(id)))
2542 as Box<dyn Iterator<Item = Edge> + 'a>
2543 } else {
2544 Box::new(std::iter::empty()) as Box<dyn Iterator<Item = Edge> + 'a>
2546 }
2547 }
2548
2549 #[cfg(feature = "tiered-storage")]
2552 pub fn edges_with_type<'a>(&'a self, edge_type: &str) -> impl Iterator<Item = Edge> + 'a {
2553 let epoch = self.current_epoch();
2554 let type_to_id = self.edge_type_to_id.read();
2555
2556 if let Some(&type_id) = type_to_id.get(edge_type) {
2557 let versions = self.edge_versions.read();
2558 let edge_ids: Vec<EdgeId> = versions
2559 .iter()
2560 .filter_map(|(id, index)| {
2561 index.visible_at(epoch).and_then(|vref| {
2562 self.read_edge_record(&vref).and_then(|r| {
2563 if !r.is_deleted() && r.type_id == type_id {
2564 Some(*id)
2565 } else {
2566 None
2567 }
2568 })
2569 })
2570 })
2571 .collect();
2572
2573 Box::new(edge_ids.into_iter().filter_map(move |id| self.get_edge(id)))
2574 as Box<dyn Iterator<Item = Edge> + 'a>
2575 } else {
2576 Box::new(std::iter::empty()) as Box<dyn Iterator<Item = Edge> + 'a>
2577 }
2578 }
2579
2580 #[must_use]
2587 pub fn node_property_might_match(
2588 &self,
2589 property: &PropertyKey,
2590 op: CompareOp,
2591 value: &Value,
2592 ) -> bool {
2593 self.node_properties.might_match(property, op, value)
2594 }
2595
2596 #[must_use]
2598 pub fn edge_property_might_match(
2599 &self,
2600 property: &PropertyKey,
2601 op: CompareOp,
2602 value: &Value,
2603 ) -> bool {
2604 self.edge_properties.might_match(property, op, value)
2605 }
2606
2607 #[must_use]
2609 pub fn node_property_zone_map(&self, property: &PropertyKey) -> Option<ZoneMapEntry> {
2610 self.node_properties.zone_map(property)
2611 }
2612
2613 #[must_use]
2615 pub fn edge_property_zone_map(&self, property: &PropertyKey) -> Option<ZoneMapEntry> {
2616 self.edge_properties.zone_map(property)
2617 }
2618
2619 pub fn rebuild_zone_maps(&self) {
2621 self.node_properties.rebuild_zone_maps();
2622 self.edge_properties.rebuild_zone_maps();
2623 }
2624
2625 #[must_use]
2629 pub fn statistics(&self) -> Statistics {
2630 self.statistics.read().clone()
2631 }
2632
2633 pub fn ensure_statistics_fresh(&self) {
2638 if self.needs_stats_recompute.swap(false, Ordering::Relaxed) {
2639 self.compute_statistics();
2640 }
2641 }
2642
2643 #[cfg(not(feature = "tiered-storage"))]
2648 pub fn compute_statistics(&self) {
2649 let mut stats = Statistics::new();
2650
2651 stats.total_nodes = self.node_count() as u64;
2653 stats.total_edges = self.edge_count() as u64;
2654
2655 let id_to_label = self.id_to_label.read();
2657 let label_index = self.label_index.read();
2658
2659 for (label_id, label_name) in id_to_label.iter().enumerate() {
2660 let node_count = label_index.get(label_id).map_or(0, |set| set.len() as u64);
2661
2662 if node_count > 0 {
2663 let avg_out_degree = if stats.total_nodes > 0 {
2665 stats.total_edges as f64 / stats.total_nodes as f64
2666 } else {
2667 0.0
2668 };
2669
2670 let label_stats =
2671 LabelStatistics::new(node_count).with_degrees(avg_out_degree, avg_out_degree);
2672
2673 stats.update_label(label_name.as_ref(), label_stats);
2674 }
2675 }
2676
2677 let id_to_edge_type = self.id_to_edge_type.read();
2679 let edges = self.edges.read();
2680 let epoch = self.current_epoch();
2681
2682 let mut edge_type_counts: FxHashMap<u32, u64> = FxHashMap::default();
2683 for chain in edges.values() {
2684 if let Some(record) = chain.visible_at(epoch)
2685 && !record.is_deleted()
2686 {
2687 *edge_type_counts.entry(record.type_id).or_default() += 1;
2688 }
2689 }
2690
2691 for (type_id, count) in edge_type_counts {
2692 if let Some(type_name) = id_to_edge_type.get(type_id as usize) {
2693 let avg_degree = if stats.total_nodes > 0 {
2694 count as f64 / stats.total_nodes as f64
2695 } else {
2696 0.0
2697 };
2698
2699 let edge_stats = EdgeTypeStatistics::new(count, avg_degree, avg_degree);
2700 stats.update_edge_type(type_name.as_ref(), edge_stats);
2701 }
2702 }
2703
2704 *self.statistics.write() = stats;
2705 }
2706
2707 #[cfg(feature = "tiered-storage")]
2710 pub fn compute_statistics(&self) {
2711 let mut stats = Statistics::new();
2712
2713 stats.total_nodes = self.node_count() as u64;
2715 stats.total_edges = self.edge_count() as u64;
2716
2717 let id_to_label = self.id_to_label.read();
2719 let label_index = self.label_index.read();
2720
2721 for (label_id, label_name) in id_to_label.iter().enumerate() {
2722 let node_count = label_index.get(label_id).map_or(0, |set| set.len() as u64);
2723
2724 if node_count > 0 {
2725 let avg_out_degree = if stats.total_nodes > 0 {
2726 stats.total_edges as f64 / stats.total_nodes as f64
2727 } else {
2728 0.0
2729 };
2730
2731 let label_stats =
2732 LabelStatistics::new(node_count).with_degrees(avg_out_degree, avg_out_degree);
2733
2734 stats.update_label(label_name.as_ref(), label_stats);
2735 }
2736 }
2737
2738 let id_to_edge_type = self.id_to_edge_type.read();
2740 let versions = self.edge_versions.read();
2741 let epoch = self.current_epoch();
2742
2743 let mut edge_type_counts: FxHashMap<u32, u64> = FxHashMap::default();
2744 for index in versions.values() {
2745 if let Some(vref) = index.visible_at(epoch)
2746 && let Some(record) = self.read_edge_record(&vref)
2747 && !record.is_deleted()
2748 {
2749 *edge_type_counts.entry(record.type_id).or_default() += 1;
2750 }
2751 }
2752
2753 for (type_id, count) in edge_type_counts {
2754 if let Some(type_name) = id_to_edge_type.get(type_id as usize) {
2755 let avg_degree = if stats.total_nodes > 0 {
2756 count as f64 / stats.total_nodes as f64
2757 } else {
2758 0.0
2759 };
2760
2761 let edge_stats = EdgeTypeStatistics::new(count, avg_degree, avg_degree);
2762 stats.update_edge_type(type_name.as_ref(), edge_stats);
2763 }
2764 }
2765
2766 *self.statistics.write() = stats;
2767 }
2768
2769 #[must_use]
2771 pub fn estimate_label_cardinality(&self, label: &str) -> f64 {
2772 self.statistics.read().estimate_label_cardinality(label)
2773 }
2774
2775 #[must_use]
2777 pub fn estimate_avg_degree(&self, edge_type: &str, outgoing: bool) -> f64 {
2778 self.statistics
2779 .read()
2780 .estimate_avg_degree(edge_type, outgoing)
2781 }
2782
2783 fn get_or_create_label_id(&self, label: &str) -> u32 {
2786 {
2787 let label_to_id = self.label_to_id.read();
2788 if let Some(&id) = label_to_id.get(label) {
2789 return id;
2790 }
2791 }
2792
2793 let mut label_to_id = self.label_to_id.write();
2794 let mut id_to_label = self.id_to_label.write();
2795
2796 if let Some(&id) = label_to_id.get(label) {
2798 return id;
2799 }
2800
2801 let id = id_to_label.len() as u32;
2802
2803 let label: ArcStr = label.into();
2804 label_to_id.insert(label.clone(), id);
2805 id_to_label.push(label);
2806
2807 id
2808 }
2809
2810 fn get_or_create_edge_type_id(&self, edge_type: &str) -> u32 {
2811 {
2812 let type_to_id = self.edge_type_to_id.read();
2813 if let Some(&id) = type_to_id.get(edge_type) {
2814 return id;
2815 }
2816 }
2817
2818 let mut type_to_id = self.edge_type_to_id.write();
2819 let mut id_to_type = self.id_to_edge_type.write();
2820
2821 if let Some(&id) = type_to_id.get(edge_type) {
2823 return id;
2824 }
2825
2826 let id = id_to_type.len() as u32;
2827 let edge_type: ArcStr = edge_type.into();
2828 type_to_id.insert(edge_type.clone(), id);
2829 id_to_type.push(edge_type);
2830
2831 id
2832 }
2833
2834 #[cfg(not(feature = "tiered-storage"))]
2841 pub fn create_node_with_id(&self, id: NodeId, labels: &[&str]) {
2842 let epoch = self.current_epoch();
2843 let mut record = NodeRecord::new(id, epoch);
2844 record.set_label_count(labels.len() as u16);
2845
2846 let mut node_label_set = FxHashSet::default();
2848 for label in labels {
2849 let label_id = self.get_or_create_label_id(*label);
2850 node_label_set.insert(label_id);
2851
2852 let mut index = self.label_index.write();
2854 while index.len() <= label_id as usize {
2855 index.push(FxHashMap::default());
2856 }
2857 index[label_id as usize].insert(id, ());
2858 }
2859
2860 self.node_labels.write().insert(id, node_label_set);
2862
2863 let chain = VersionChain::with_initial(record, epoch, TxId::SYSTEM);
2865 self.nodes.write().insert(id, chain);
2866
2867 let id_val = id.as_u64();
2869 let _ = self
2870 .next_node_id
2871 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
2872 if id_val >= current {
2873 Some(id_val + 1)
2874 } else {
2875 None
2876 }
2877 });
2878 }
2879
2880 #[cfg(feature = "tiered-storage")]
2883 pub fn create_node_with_id(&self, id: NodeId, labels: &[&str]) {
2884 let epoch = self.current_epoch();
2885 let mut record = NodeRecord::new(id, epoch);
2886 record.set_label_count(labels.len() as u16);
2887
2888 let mut node_label_set = FxHashSet::default();
2890 for label in labels {
2891 let label_id = self.get_or_create_label_id(*label);
2892 node_label_set.insert(label_id);
2893
2894 let mut index = self.label_index.write();
2896 while index.len() <= label_id as usize {
2897 index.push(FxHashMap::default());
2898 }
2899 index[label_id as usize].insert(id, ());
2900 }
2901
2902 self.node_labels.write().insert(id, node_label_set);
2904
2905 let arena = self.arena_allocator.arena_or_create(epoch);
2907 let (offset, _stored) = arena.alloc_value_with_offset(record);
2908
2909 let hot_ref = HotVersionRef::new(epoch, offset, TxId::SYSTEM);
2911 let mut versions = self.node_versions.write();
2912 versions.insert(id, VersionIndex::with_initial(hot_ref));
2913
2914 let id_val = id.as_u64();
2916 let _ = self
2917 .next_node_id
2918 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
2919 if id_val >= current {
2920 Some(id_val + 1)
2921 } else {
2922 None
2923 }
2924 });
2925 }
2926
2927 #[cfg(not(feature = "tiered-storage"))]
2931 pub fn create_edge_with_id(&self, id: EdgeId, src: NodeId, dst: NodeId, edge_type: &str) {
2932 let epoch = self.current_epoch();
2933 let type_id = self.get_or_create_edge_type_id(edge_type);
2934
2935 let record = EdgeRecord::new(id, src, dst, type_id, epoch);
2936 let chain = VersionChain::with_initial(record, epoch, TxId::SYSTEM);
2937 self.edges.write().insert(id, chain);
2938
2939 self.forward_adj.add_edge(src, dst, id);
2941 if let Some(ref backward) = self.backward_adj {
2942 backward.add_edge(dst, src, id);
2943 }
2944
2945 let id_val = id.as_u64();
2947 let _ = self
2948 .next_edge_id
2949 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
2950 if id_val >= current {
2951 Some(id_val + 1)
2952 } else {
2953 None
2954 }
2955 });
2956 }
2957
2958 #[cfg(feature = "tiered-storage")]
2961 pub fn create_edge_with_id(&self, id: EdgeId, src: NodeId, dst: NodeId, edge_type: &str) {
2962 let epoch = self.current_epoch();
2963 let type_id = self.get_or_create_edge_type_id(edge_type);
2964
2965 let record = EdgeRecord::new(id, src, dst, type_id, epoch);
2966
2967 let arena = self.arena_allocator.arena_or_create(epoch);
2969 let (offset, _stored) = arena.alloc_value_with_offset(record);
2970
2971 let hot_ref = HotVersionRef::new(epoch, offset, TxId::SYSTEM);
2973 let mut versions = self.edge_versions.write();
2974 versions.insert(id, VersionIndex::with_initial(hot_ref));
2975
2976 self.forward_adj.add_edge(src, dst, id);
2978 if let Some(ref backward) = self.backward_adj {
2979 backward.add_edge(dst, src, id);
2980 }
2981
2982 let id_val = id.as_u64();
2984 let _ = self
2985 .next_edge_id
2986 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
2987 if id_val >= current {
2988 Some(id_val + 1)
2989 } else {
2990 None
2991 }
2992 });
2993 }
2994
2995 pub fn set_epoch(&self, epoch: EpochId) {
2997 self.current_epoch.store(epoch.as_u64(), Ordering::SeqCst);
2998 }
2999}
3000
3001impl Default for LpgStore {
3002 fn default() -> Self {
3003 Self::new()
3004 }
3005}
3006
3007#[cfg(test)]
3008mod tests {
3009 use super::*;
3010
3011 #[test]
3012 fn test_create_node() {
3013 let store = LpgStore::new();
3014
3015 let id = store.create_node(&["Person"]);
3016 assert!(id.is_valid());
3017
3018 let node = store.get_node(id).unwrap();
3019 assert!(node.has_label("Person"));
3020 assert!(!node.has_label("Animal"));
3021 }
3022
3023 #[test]
3024 fn test_create_node_with_props() {
3025 let store = LpgStore::new();
3026
3027 let id = store.create_node_with_props(
3028 &["Person"],
3029 [("name", Value::from("Alice")), ("age", Value::from(30i64))],
3030 );
3031
3032 let node = store.get_node(id).unwrap();
3033 assert_eq!(
3034 node.get_property("name").and_then(|v| v.as_str()),
3035 Some("Alice")
3036 );
3037 assert_eq!(
3038 node.get_property("age").and_then(|v| v.as_int64()),
3039 Some(30)
3040 );
3041 }
3042
3043 #[test]
3044 fn test_delete_node() {
3045 let store = LpgStore::new();
3046
3047 let id = store.create_node(&["Person"]);
3048 assert_eq!(store.node_count(), 1);
3049
3050 assert!(store.delete_node(id));
3051 assert_eq!(store.node_count(), 0);
3052 assert!(store.get_node(id).is_none());
3053
3054 assert!(!store.delete_node(id));
3056 }
3057
3058 #[test]
3059 fn test_create_edge() {
3060 let store = LpgStore::new();
3061
3062 let alice = store.create_node(&["Person"]);
3063 let bob = store.create_node(&["Person"]);
3064
3065 let edge_id = store.create_edge(alice, bob, "KNOWS");
3066 assert!(edge_id.is_valid());
3067
3068 let edge = store.get_edge(edge_id).unwrap();
3069 assert_eq!(edge.src, alice);
3070 assert_eq!(edge.dst, bob);
3071 assert_eq!(edge.edge_type.as_str(), "KNOWS");
3072 }
3073
3074 #[test]
3075 fn test_neighbors() {
3076 let store = LpgStore::new();
3077
3078 let a = store.create_node(&["Person"]);
3079 let b = store.create_node(&["Person"]);
3080 let c = store.create_node(&["Person"]);
3081
3082 store.create_edge(a, b, "KNOWS");
3083 store.create_edge(a, c, "KNOWS");
3084
3085 let outgoing: Vec<_> = store.neighbors(a, Direction::Outgoing).collect();
3086 assert_eq!(outgoing.len(), 2);
3087 assert!(outgoing.contains(&b));
3088 assert!(outgoing.contains(&c));
3089
3090 let incoming: Vec<_> = store.neighbors(b, Direction::Incoming).collect();
3091 assert_eq!(incoming.len(), 1);
3092 assert!(incoming.contains(&a));
3093 }
3094
3095 #[test]
3096 fn test_nodes_by_label() {
3097 let store = LpgStore::new();
3098
3099 let p1 = store.create_node(&["Person"]);
3100 let p2 = store.create_node(&["Person"]);
3101 let _a = store.create_node(&["Animal"]);
3102
3103 let persons = store.nodes_by_label("Person");
3104 assert_eq!(persons.len(), 2);
3105 assert!(persons.contains(&p1));
3106 assert!(persons.contains(&p2));
3107
3108 let animals = store.nodes_by_label("Animal");
3109 assert_eq!(animals.len(), 1);
3110 }
3111
3112 #[test]
3113 fn test_delete_edge() {
3114 let store = LpgStore::new();
3115
3116 let a = store.create_node(&["Person"]);
3117 let b = store.create_node(&["Person"]);
3118 let edge_id = store.create_edge(a, b, "KNOWS");
3119
3120 assert_eq!(store.edge_count(), 1);
3121
3122 assert!(store.delete_edge(edge_id));
3123 assert_eq!(store.edge_count(), 0);
3124 assert!(store.get_edge(edge_id).is_none());
3125 }
3126
3127 #[test]
3130 fn test_lpg_store_config() {
3131 let config = LpgStoreConfig {
3133 backward_edges: false,
3134 initial_node_capacity: 100,
3135 initial_edge_capacity: 200,
3136 };
3137 let store = LpgStore::with_config(config);
3138
3139 let a = store.create_node(&["Person"]);
3141 let b = store.create_node(&["Person"]);
3142 store.create_edge(a, b, "KNOWS");
3143
3144 let outgoing: Vec<_> = store.neighbors(a, Direction::Outgoing).collect();
3146 assert_eq!(outgoing.len(), 1);
3147
3148 let incoming: Vec<_> = store.neighbors(b, Direction::Incoming).collect();
3150 assert_eq!(incoming.len(), 0);
3151 }
3152
3153 #[test]
3154 fn test_epoch_management() {
3155 let store = LpgStore::new();
3156
3157 let epoch0 = store.current_epoch();
3158 assert_eq!(epoch0.as_u64(), 0);
3159
3160 let epoch1 = store.new_epoch();
3161 assert_eq!(epoch1.as_u64(), 1);
3162
3163 let current = store.current_epoch();
3164 assert_eq!(current.as_u64(), 1);
3165 }
3166
3167 #[test]
3168 fn test_node_properties() {
3169 let store = LpgStore::new();
3170 let id = store.create_node(&["Person"]);
3171
3172 store.set_node_property(id, "name", Value::from("Alice"));
3174 let name = store.get_node_property(id, &"name".into());
3175 assert!(matches!(name, Some(Value::String(s)) if s.as_str() == "Alice"));
3176
3177 store.set_node_property(id, "name", Value::from("Bob"));
3179 let name = store.get_node_property(id, &"name".into());
3180 assert!(matches!(name, Some(Value::String(s)) if s.as_str() == "Bob"));
3181
3182 let old = store.remove_node_property(id, "name");
3184 assert!(matches!(old, Some(Value::String(s)) if s.as_str() == "Bob"));
3185
3186 let name = store.get_node_property(id, &"name".into());
3188 assert!(name.is_none());
3189
3190 let none = store.remove_node_property(id, "nonexistent");
3192 assert!(none.is_none());
3193 }
3194
3195 #[test]
3196 fn test_edge_properties() {
3197 let store = LpgStore::new();
3198 let a = store.create_node(&["Person"]);
3199 let b = store.create_node(&["Person"]);
3200 let edge_id = store.create_edge(a, b, "KNOWS");
3201
3202 store.set_edge_property(edge_id, "since", Value::from(2020i64));
3204 let since = store.get_edge_property(edge_id, &"since".into());
3205 assert_eq!(since.and_then(|v| v.as_int64()), Some(2020));
3206
3207 let old = store.remove_edge_property(edge_id, "since");
3209 assert_eq!(old.and_then(|v| v.as_int64()), Some(2020));
3210
3211 let since = store.get_edge_property(edge_id, &"since".into());
3212 assert!(since.is_none());
3213 }
3214
3215 #[test]
3216 fn test_add_remove_label() {
3217 let store = LpgStore::new();
3218 let id = store.create_node(&["Person"]);
3219
3220 assert!(store.add_label(id, "Employee"));
3222
3223 let node = store.get_node(id).unwrap();
3224 assert!(node.has_label("Person"));
3225 assert!(node.has_label("Employee"));
3226
3227 assert!(!store.add_label(id, "Employee"));
3229
3230 assert!(store.remove_label(id, "Employee"));
3232
3233 let node = store.get_node(id).unwrap();
3234 assert!(node.has_label("Person"));
3235 assert!(!node.has_label("Employee"));
3236
3237 assert!(!store.remove_label(id, "Employee"));
3239 assert!(!store.remove_label(id, "NonExistent"));
3240 }
3241
3242 #[test]
3243 fn test_add_label_to_nonexistent_node() {
3244 let store = LpgStore::new();
3245 let fake_id = NodeId::new(999);
3246 assert!(!store.add_label(fake_id, "Label"));
3247 }
3248
3249 #[test]
3250 fn test_remove_label_from_nonexistent_node() {
3251 let store = LpgStore::new();
3252 let fake_id = NodeId::new(999);
3253 assert!(!store.remove_label(fake_id, "Label"));
3254 }
3255
3256 #[test]
3257 fn test_node_ids() {
3258 let store = LpgStore::new();
3259
3260 let n1 = store.create_node(&["Person"]);
3261 let n2 = store.create_node(&["Person"]);
3262 let n3 = store.create_node(&["Person"]);
3263
3264 let ids = store.node_ids();
3265 assert_eq!(ids.len(), 3);
3266 assert!(ids.contains(&n1));
3267 assert!(ids.contains(&n2));
3268 assert!(ids.contains(&n3));
3269
3270 store.delete_node(n2);
3272 let ids = store.node_ids();
3273 assert_eq!(ids.len(), 2);
3274 assert!(!ids.contains(&n2));
3275 }
3276
3277 #[test]
3278 fn test_delete_node_nonexistent() {
3279 let store = LpgStore::new();
3280 let fake_id = NodeId::new(999);
3281 assert!(!store.delete_node(fake_id));
3282 }
3283
3284 #[test]
3285 fn test_delete_edge_nonexistent() {
3286 let store = LpgStore::new();
3287 let fake_id = EdgeId::new(999);
3288 assert!(!store.delete_edge(fake_id));
3289 }
3290
3291 #[test]
3292 fn test_delete_edge_double() {
3293 let store = LpgStore::new();
3294 let a = store.create_node(&["Person"]);
3295 let b = store.create_node(&["Person"]);
3296 let edge_id = store.create_edge(a, b, "KNOWS");
3297
3298 assert!(store.delete_edge(edge_id));
3299 assert!(!store.delete_edge(edge_id)); }
3301
3302 #[test]
3303 fn test_create_edge_with_props() {
3304 let store = LpgStore::new();
3305 let a = store.create_node(&["Person"]);
3306 let b = store.create_node(&["Person"]);
3307
3308 let edge_id = store.create_edge_with_props(
3309 a,
3310 b,
3311 "KNOWS",
3312 [
3313 ("since", Value::from(2020i64)),
3314 ("weight", Value::from(1.0)),
3315 ],
3316 );
3317
3318 let edge = store.get_edge(edge_id).unwrap();
3319 assert_eq!(
3320 edge.get_property("since").and_then(|v| v.as_int64()),
3321 Some(2020)
3322 );
3323 assert_eq!(
3324 edge.get_property("weight").and_then(|v| v.as_float64()),
3325 Some(1.0)
3326 );
3327 }
3328
3329 #[test]
3330 fn test_delete_node_edges() {
3331 let store = LpgStore::new();
3332
3333 let a = store.create_node(&["Person"]);
3334 let b = store.create_node(&["Person"]);
3335 let c = store.create_node(&["Person"]);
3336
3337 store.create_edge(a, b, "KNOWS"); store.create_edge(c, a, "KNOWS"); assert_eq!(store.edge_count(), 2);
3341
3342 store.delete_node_edges(a);
3344
3345 assert_eq!(store.edge_count(), 0);
3346 }
3347
3348 #[test]
3349 fn test_neighbors_both_directions() {
3350 let store = LpgStore::new();
3351
3352 let a = store.create_node(&["Person"]);
3353 let b = store.create_node(&["Person"]);
3354 let c = store.create_node(&["Person"]);
3355
3356 store.create_edge(a, b, "KNOWS"); store.create_edge(c, a, "KNOWS"); let neighbors: Vec<_> = store.neighbors(a, Direction::Both).collect();
3361 assert_eq!(neighbors.len(), 2);
3362 assert!(neighbors.contains(&b)); assert!(neighbors.contains(&c)); }
3365
3366 #[test]
3367 fn test_edges_from() {
3368 let store = LpgStore::new();
3369
3370 let a = store.create_node(&["Person"]);
3371 let b = store.create_node(&["Person"]);
3372 let c = store.create_node(&["Person"]);
3373
3374 let e1 = store.create_edge(a, b, "KNOWS");
3375 let e2 = store.create_edge(a, c, "KNOWS");
3376
3377 let edges: Vec<_> = store.edges_from(a, Direction::Outgoing).collect();
3378 assert_eq!(edges.len(), 2);
3379 assert!(edges.iter().any(|(_, e)| *e == e1));
3380 assert!(edges.iter().any(|(_, e)| *e == e2));
3381
3382 let incoming: Vec<_> = store.edges_from(b, Direction::Incoming).collect();
3384 assert_eq!(incoming.len(), 1);
3385 assert_eq!(incoming[0].1, e1);
3386 }
3387
3388 #[test]
3389 fn test_edges_to() {
3390 let store = LpgStore::new();
3391
3392 let a = store.create_node(&["Person"]);
3393 let b = store.create_node(&["Person"]);
3394 let c = store.create_node(&["Person"]);
3395
3396 let e1 = store.create_edge(a, b, "KNOWS");
3397 let e2 = store.create_edge(c, b, "KNOWS");
3398
3399 let to_b = store.edges_to(b);
3401 assert_eq!(to_b.len(), 2);
3402 assert!(to_b.iter().any(|(src, e)| *src == a && *e == e1));
3403 assert!(to_b.iter().any(|(src, e)| *src == c && *e == e2));
3404 }
3405
3406 #[test]
3407 fn test_out_degree_in_degree() {
3408 let store = LpgStore::new();
3409
3410 let a = store.create_node(&["Person"]);
3411 let b = store.create_node(&["Person"]);
3412 let c = store.create_node(&["Person"]);
3413
3414 store.create_edge(a, b, "KNOWS");
3415 store.create_edge(a, c, "KNOWS");
3416 store.create_edge(c, b, "KNOWS");
3417
3418 assert_eq!(store.out_degree(a), 2);
3419 assert_eq!(store.out_degree(b), 0);
3420 assert_eq!(store.out_degree(c), 1);
3421
3422 assert_eq!(store.in_degree(a), 0);
3423 assert_eq!(store.in_degree(b), 2);
3424 assert_eq!(store.in_degree(c), 1);
3425 }
3426
3427 #[test]
3428 fn test_edge_type() {
3429 let store = LpgStore::new();
3430
3431 let a = store.create_node(&["Person"]);
3432 let b = store.create_node(&["Person"]);
3433 let edge_id = store.create_edge(a, b, "KNOWS");
3434
3435 let edge_type = store.edge_type(edge_id);
3436 assert_eq!(edge_type.as_deref(), Some("KNOWS"));
3437
3438 let fake_id = EdgeId::new(999);
3440 assert!(store.edge_type(fake_id).is_none());
3441 }
3442
3443 #[test]
3444 fn test_count_methods() {
3445 let store = LpgStore::new();
3446
3447 assert_eq!(store.label_count(), 0);
3448 assert_eq!(store.edge_type_count(), 0);
3449 assert_eq!(store.property_key_count(), 0);
3450
3451 let a = store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
3452 let b = store.create_node(&["Company"]);
3453 store.create_edge_with_props(a, b, "WORKS_AT", [("since", Value::from(2020i64))]);
3454
3455 assert_eq!(store.label_count(), 2); assert_eq!(store.edge_type_count(), 1); assert_eq!(store.property_key_count(), 2); }
3459
3460 #[test]
3461 fn test_all_nodes_and_edges() {
3462 let store = LpgStore::new();
3463
3464 let a = store.create_node(&["Person"]);
3465 let b = store.create_node(&["Person"]);
3466 store.create_edge(a, b, "KNOWS");
3467
3468 let nodes: Vec<_> = store.all_nodes().collect();
3469 assert_eq!(nodes.len(), 2);
3470
3471 let edges: Vec<_> = store.all_edges().collect();
3472 assert_eq!(edges.len(), 1);
3473 }
3474
3475 #[test]
3476 fn test_all_labels_and_edge_types() {
3477 let store = LpgStore::new();
3478
3479 store.create_node(&["Person"]);
3480 store.create_node(&["Company"]);
3481 let a = store.create_node(&["Animal"]);
3482 let b = store.create_node(&["Animal"]);
3483 store.create_edge(a, b, "EATS");
3484
3485 let labels = store.all_labels();
3486 assert_eq!(labels.len(), 3);
3487 assert!(labels.contains(&"Person".to_string()));
3488 assert!(labels.contains(&"Company".to_string()));
3489 assert!(labels.contains(&"Animal".to_string()));
3490
3491 let edge_types = store.all_edge_types();
3492 assert_eq!(edge_types.len(), 1);
3493 assert!(edge_types.contains(&"EATS".to_string()));
3494 }
3495
3496 #[test]
3497 fn test_all_property_keys() {
3498 let store = LpgStore::new();
3499
3500 let a = store.create_node_with_props(&["Person"], [("name", Value::from("Alice"))]);
3501 let b = store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
3502 store.create_edge_with_props(a, b, "KNOWS", [("since", Value::from(2020i64))]);
3503
3504 let keys = store.all_property_keys();
3505 assert!(keys.contains(&"name".to_string()));
3506 assert!(keys.contains(&"age".to_string()));
3507 assert!(keys.contains(&"since".to_string()));
3508 }
3509
3510 #[test]
3511 fn test_nodes_with_label() {
3512 let store = LpgStore::new();
3513
3514 store.create_node(&["Person"]);
3515 store.create_node(&["Person"]);
3516 store.create_node(&["Company"]);
3517
3518 let persons: Vec<_> = store.nodes_with_label("Person").collect();
3519 assert_eq!(persons.len(), 2);
3520
3521 let companies: Vec<_> = store.nodes_with_label("Company").collect();
3522 assert_eq!(companies.len(), 1);
3523
3524 let none: Vec<_> = store.nodes_with_label("NonExistent").collect();
3525 assert_eq!(none.len(), 0);
3526 }
3527
3528 #[test]
3529 fn test_edges_with_type() {
3530 let store = LpgStore::new();
3531
3532 let a = store.create_node(&["Person"]);
3533 let b = store.create_node(&["Person"]);
3534 let c = store.create_node(&["Company"]);
3535
3536 store.create_edge(a, b, "KNOWS");
3537 store.create_edge(a, c, "WORKS_AT");
3538
3539 let knows: Vec<_> = store.edges_with_type("KNOWS").collect();
3540 assert_eq!(knows.len(), 1);
3541
3542 let works_at: Vec<_> = store.edges_with_type("WORKS_AT").collect();
3543 assert_eq!(works_at.len(), 1);
3544
3545 let none: Vec<_> = store.edges_with_type("NonExistent").collect();
3546 assert_eq!(none.len(), 0);
3547 }
3548
3549 #[test]
3550 fn test_nodes_by_label_nonexistent() {
3551 let store = LpgStore::new();
3552 store.create_node(&["Person"]);
3553
3554 let empty = store.nodes_by_label("NonExistent");
3555 assert!(empty.is_empty());
3556 }
3557
3558 #[test]
3559 fn test_statistics() {
3560 let store = LpgStore::new();
3561
3562 let a = store.create_node(&["Person"]);
3563 let b = store.create_node(&["Person"]);
3564 let c = store.create_node(&["Company"]);
3565
3566 store.create_edge(a, b, "KNOWS");
3567 store.create_edge(a, c, "WORKS_AT");
3568
3569 store.compute_statistics();
3570 let stats = store.statistics();
3571
3572 assert_eq!(stats.total_nodes, 3);
3573 assert_eq!(stats.total_edges, 2);
3574
3575 let person_card = store.estimate_label_cardinality("Person");
3577 assert!(person_card > 0.0);
3578
3579 let avg_degree = store.estimate_avg_degree("KNOWS", true);
3580 assert!(avg_degree >= 0.0);
3581 }
3582
3583 #[test]
3584 fn test_zone_maps() {
3585 let store = LpgStore::new();
3586
3587 store.create_node_with_props(&["Person"], [("age", Value::from(25i64))]);
3588 store.create_node_with_props(&["Person"], [("age", Value::from(35i64))]);
3589
3590 let might_match =
3592 store.node_property_might_match(&"age".into(), CompareOp::Eq, &Value::from(30i64));
3593 assert!(might_match);
3595
3596 let zone = store.node_property_zone_map(&"age".into());
3597 assert!(zone.is_some());
3598
3599 let no_zone = store.node_property_zone_map(&"nonexistent".into());
3601 assert!(no_zone.is_none());
3602
3603 let a = store.create_node(&["A"]);
3605 let b = store.create_node(&["B"]);
3606 store.create_edge_with_props(a, b, "REL", [("weight", Value::from(1.0))]);
3607
3608 let edge_zone = store.edge_property_zone_map(&"weight".into());
3609 assert!(edge_zone.is_some());
3610 }
3611
3612 #[test]
3613 fn test_rebuild_zone_maps() {
3614 let store = LpgStore::new();
3615 store.create_node_with_props(&["Person"], [("age", Value::from(25i64))]);
3616
3617 store.rebuild_zone_maps();
3619 }
3620
3621 #[test]
3622 fn test_create_node_with_id() {
3623 let store = LpgStore::new();
3624
3625 let specific_id = NodeId::new(100);
3626 store.create_node_with_id(specific_id, &["Person", "Employee"]);
3627
3628 let node = store.get_node(specific_id).unwrap();
3629 assert!(node.has_label("Person"));
3630 assert!(node.has_label("Employee"));
3631
3632 let next = store.create_node(&["Other"]);
3634 assert!(next.as_u64() > 100);
3635 }
3636
3637 #[test]
3638 fn test_create_edge_with_id() {
3639 let store = LpgStore::new();
3640
3641 let a = store.create_node(&["A"]);
3642 let b = store.create_node(&["B"]);
3643
3644 let specific_id = EdgeId::new(500);
3645 store.create_edge_with_id(specific_id, a, b, "REL");
3646
3647 let edge = store.get_edge(specific_id).unwrap();
3648 assert_eq!(edge.src, a);
3649 assert_eq!(edge.dst, b);
3650 assert_eq!(edge.edge_type.as_str(), "REL");
3651
3652 let next = store.create_edge(a, b, "OTHER");
3654 assert!(next.as_u64() > 500);
3655 }
3656
3657 #[test]
3658 fn test_set_epoch() {
3659 let store = LpgStore::new();
3660
3661 assert_eq!(store.current_epoch().as_u64(), 0);
3662
3663 store.set_epoch(EpochId::new(42));
3664 assert_eq!(store.current_epoch().as_u64(), 42);
3665 }
3666
3667 #[test]
3668 fn test_get_node_nonexistent() {
3669 let store = LpgStore::new();
3670 let fake_id = NodeId::new(999);
3671 assert!(store.get_node(fake_id).is_none());
3672 }
3673
3674 #[test]
3675 fn test_get_edge_nonexistent() {
3676 let store = LpgStore::new();
3677 let fake_id = EdgeId::new(999);
3678 assert!(store.get_edge(fake_id).is_none());
3679 }
3680
3681 #[test]
3682 fn test_multiple_labels() {
3683 let store = LpgStore::new();
3684
3685 let id = store.create_node(&["Person", "Employee", "Manager"]);
3686 let node = store.get_node(id).unwrap();
3687
3688 assert!(node.has_label("Person"));
3689 assert!(node.has_label("Employee"));
3690 assert!(node.has_label("Manager"));
3691 assert!(!node.has_label("Other"));
3692 }
3693
3694 #[test]
3695 fn test_default_impl() {
3696 let store: LpgStore = Default::default();
3697 assert_eq!(store.node_count(), 0);
3698 assert_eq!(store.edge_count(), 0);
3699 }
3700
3701 #[test]
3702 fn test_edges_from_both_directions() {
3703 let store = LpgStore::new();
3704
3705 let a = store.create_node(&["A"]);
3706 let b = store.create_node(&["B"]);
3707 let c = store.create_node(&["C"]);
3708
3709 let e1 = store.create_edge(a, b, "R1"); let e2 = store.create_edge(c, a, "R2"); let edges: Vec<_> = store.edges_from(a, Direction::Both).collect();
3714 assert_eq!(edges.len(), 2);
3715 assert!(edges.iter().any(|(_, e)| *e == e1)); assert!(edges.iter().any(|(_, e)| *e == e2)); }
3718
3719 #[test]
3720 fn test_no_backward_adj_in_degree() {
3721 let config = LpgStoreConfig {
3722 backward_edges: false,
3723 initial_node_capacity: 10,
3724 initial_edge_capacity: 10,
3725 };
3726 let store = LpgStore::with_config(config);
3727
3728 let a = store.create_node(&["A"]);
3729 let b = store.create_node(&["B"]);
3730 store.create_edge(a, b, "R");
3731
3732 let degree = store.in_degree(b);
3734 assert_eq!(degree, 1);
3735 }
3736
3737 #[test]
3738 fn test_no_backward_adj_edges_to() {
3739 let config = LpgStoreConfig {
3740 backward_edges: false,
3741 initial_node_capacity: 10,
3742 initial_edge_capacity: 10,
3743 };
3744 let store = LpgStore::with_config(config);
3745
3746 let a = store.create_node(&["A"]);
3747 let b = store.create_node(&["B"]);
3748 let e = store.create_edge(a, b, "R");
3749
3750 let edges = store.edges_to(b);
3752 assert_eq!(edges.len(), 1);
3753 assert_eq!(edges[0].1, e);
3754 }
3755
3756 #[test]
3757 fn test_node_versioned_creation() {
3758 let store = LpgStore::new();
3759
3760 let epoch = store.new_epoch();
3761 let tx_id = TxId::new(1);
3762
3763 let id = store.create_node_versioned(&["Person"], epoch, tx_id);
3764 assert!(store.get_node(id).is_some());
3765 }
3766
3767 #[test]
3768 fn test_edge_versioned_creation() {
3769 let store = LpgStore::new();
3770
3771 let a = store.create_node(&["A"]);
3772 let b = store.create_node(&["B"]);
3773
3774 let epoch = store.new_epoch();
3775 let tx_id = TxId::new(1);
3776
3777 let edge_id = store.create_edge_versioned(a, b, "REL", epoch, tx_id);
3778 assert!(store.get_edge(edge_id).is_some());
3779 }
3780
3781 #[test]
3782 fn test_node_with_props_versioned() {
3783 let store = LpgStore::new();
3784
3785 let epoch = store.new_epoch();
3786 let tx_id = TxId::new(1);
3787
3788 let id = store.create_node_with_props_versioned(
3789 &["Person"],
3790 [("name", Value::from("Alice"))],
3791 epoch,
3792 tx_id,
3793 );
3794
3795 let node = store.get_node(id).unwrap();
3796 assert_eq!(
3797 node.get_property("name").and_then(|v| v.as_str()),
3798 Some("Alice")
3799 );
3800 }
3801
3802 #[test]
3803 fn test_discard_uncommitted_versions() {
3804 let store = LpgStore::new();
3805
3806 let epoch = store.new_epoch();
3807 let tx_id = TxId::new(42);
3808
3809 let node_id = store.create_node_versioned(&["Person"], epoch, tx_id);
3811 assert!(store.get_node(node_id).is_some());
3812
3813 store.discard_uncommitted_versions(tx_id);
3815
3816 assert!(store.get_node(node_id).is_none());
3818 }
3819
3820 #[test]
3823 fn test_property_index_create_and_lookup() {
3824 let store = LpgStore::new();
3825
3826 let alice = store.create_node(&["Person"]);
3828 let bob = store.create_node(&["Person"]);
3829 let charlie = store.create_node(&["Person"]);
3830
3831 store.set_node_property(alice, "city", Value::from("NYC"));
3832 store.set_node_property(bob, "city", Value::from("NYC"));
3833 store.set_node_property(charlie, "city", Value::from("LA"));
3834
3835 let nyc_people = store.find_nodes_by_property("city", &Value::from("NYC"));
3837 assert_eq!(nyc_people.len(), 2);
3838
3839 store.create_property_index("city");
3841 assert!(store.has_property_index("city"));
3842
3843 let nyc_people = store.find_nodes_by_property("city", &Value::from("NYC"));
3845 assert_eq!(nyc_people.len(), 2);
3846 assert!(nyc_people.contains(&alice));
3847 assert!(nyc_people.contains(&bob));
3848
3849 let la_people = store.find_nodes_by_property("city", &Value::from("LA"));
3850 assert_eq!(la_people.len(), 1);
3851 assert!(la_people.contains(&charlie));
3852 }
3853
3854 #[test]
3855 fn test_property_index_maintained_on_update() {
3856 let store = LpgStore::new();
3857
3858 store.create_property_index("status");
3860
3861 let node = store.create_node(&["Task"]);
3862 store.set_node_property(node, "status", Value::from("pending"));
3863
3864 let pending = store.find_nodes_by_property("status", &Value::from("pending"));
3866 assert_eq!(pending.len(), 1);
3867 assert!(pending.contains(&node));
3868
3869 store.set_node_property(node, "status", Value::from("done"));
3871
3872 let pending = store.find_nodes_by_property("status", &Value::from("pending"));
3874 assert!(pending.is_empty());
3875
3876 let done = store.find_nodes_by_property("status", &Value::from("done"));
3878 assert_eq!(done.len(), 1);
3879 assert!(done.contains(&node));
3880 }
3881
3882 #[test]
3883 fn test_property_index_maintained_on_remove() {
3884 let store = LpgStore::new();
3885
3886 store.create_property_index("tag");
3887
3888 let node = store.create_node(&["Item"]);
3889 store.set_node_property(node, "tag", Value::from("important"));
3890
3891 let found = store.find_nodes_by_property("tag", &Value::from("important"));
3893 assert_eq!(found.len(), 1);
3894
3895 store.remove_node_property(node, "tag");
3897
3898 let found = store.find_nodes_by_property("tag", &Value::from("important"));
3900 assert!(found.is_empty());
3901 }
3902
3903 #[test]
3904 fn test_property_index_drop() {
3905 let store = LpgStore::new();
3906
3907 store.create_property_index("key");
3908 assert!(store.has_property_index("key"));
3909
3910 assert!(store.drop_property_index("key"));
3911 assert!(!store.has_property_index("key"));
3912
3913 assert!(!store.drop_property_index("key"));
3915 }
3916
3917 #[test]
3918 fn test_property_index_multiple_values() {
3919 let store = LpgStore::new();
3920
3921 store.create_property_index("age");
3922
3923 let n1 = store.create_node(&["Person"]);
3925 let n2 = store.create_node(&["Person"]);
3926 let n3 = store.create_node(&["Person"]);
3927 let n4 = store.create_node(&["Person"]);
3928
3929 store.set_node_property(n1, "age", Value::from(25i64));
3930 store.set_node_property(n2, "age", Value::from(25i64));
3931 store.set_node_property(n3, "age", Value::from(30i64));
3932 store.set_node_property(n4, "age", Value::from(25i64));
3933
3934 let age_25 = store.find_nodes_by_property("age", &Value::from(25i64));
3935 assert_eq!(age_25.len(), 3);
3936
3937 let age_30 = store.find_nodes_by_property("age", &Value::from(30i64));
3938 assert_eq!(age_30.len(), 1);
3939
3940 let age_40 = store.find_nodes_by_property("age", &Value::from(40i64));
3941 assert!(age_40.is_empty());
3942 }
3943
3944 #[test]
3945 fn test_property_index_builds_from_existing_data() {
3946 let store = LpgStore::new();
3947
3948 let n1 = store.create_node(&["Person"]);
3950 let n2 = store.create_node(&["Person"]);
3951 store.set_node_property(n1, "email", Value::from("alice@example.com"));
3952 store.set_node_property(n2, "email", Value::from("bob@example.com"));
3953
3954 store.create_property_index("email");
3956
3957 let alice = store.find_nodes_by_property("email", &Value::from("alice@example.com"));
3959 assert_eq!(alice.len(), 1);
3960 assert!(alice.contains(&n1));
3961
3962 let bob = store.find_nodes_by_property("email", &Value::from("bob@example.com"));
3963 assert_eq!(bob.len(), 1);
3964 assert!(bob.contains(&n2));
3965 }
3966
3967 #[test]
3968 fn test_get_node_property_batch() {
3969 let store = LpgStore::new();
3970
3971 let n1 = store.create_node(&["Person"]);
3972 let n2 = store.create_node(&["Person"]);
3973 let n3 = store.create_node(&["Person"]);
3974
3975 store.set_node_property(n1, "age", Value::from(25i64));
3976 store.set_node_property(n2, "age", Value::from(30i64));
3977 let age_key = PropertyKey::new("age");
3980 let values = store.get_node_property_batch(&[n1, n2, n3], &age_key);
3981
3982 assert_eq!(values.len(), 3);
3983 assert_eq!(values[0], Some(Value::from(25i64)));
3984 assert_eq!(values[1], Some(Value::from(30i64)));
3985 assert_eq!(values[2], None);
3986 }
3987
3988 #[test]
3989 fn test_get_node_property_batch_empty() {
3990 let store = LpgStore::new();
3991 let key = PropertyKey::new("any");
3992
3993 let values = store.get_node_property_batch(&[], &key);
3994 assert!(values.is_empty());
3995 }
3996
3997 #[test]
3998 fn test_get_nodes_properties_batch() {
3999 let store = LpgStore::new();
4000
4001 let n1 = store.create_node(&["Person"]);
4002 let n2 = store.create_node(&["Person"]);
4003 let n3 = store.create_node(&["Person"]);
4004
4005 store.set_node_property(n1, "name", Value::from("Alice"));
4006 store.set_node_property(n1, "age", Value::from(25i64));
4007 store.set_node_property(n2, "name", Value::from("Bob"));
4008 let all_props = store.get_nodes_properties_batch(&[n1, n2, n3]);
4011
4012 assert_eq!(all_props.len(), 3);
4013 assert_eq!(all_props[0].len(), 2); assert_eq!(all_props[1].len(), 1); assert_eq!(all_props[2].len(), 0); assert_eq!(
4018 all_props[0].get(&PropertyKey::new("name")),
4019 Some(&Value::from("Alice"))
4020 );
4021 assert_eq!(
4022 all_props[1].get(&PropertyKey::new("name")),
4023 Some(&Value::from("Bob"))
4024 );
4025 }
4026
4027 #[test]
4028 fn test_get_nodes_properties_batch_empty() {
4029 let store = LpgStore::new();
4030
4031 let all_props = store.get_nodes_properties_batch(&[]);
4032 assert!(all_props.is_empty());
4033 }
4034
4035 #[test]
4036 fn test_get_nodes_properties_selective_batch() {
4037 let store = LpgStore::new();
4038
4039 let n1 = store.create_node(&["Person"]);
4040 let n2 = store.create_node(&["Person"]);
4041
4042 store.set_node_property(n1, "name", Value::from("Alice"));
4044 store.set_node_property(n1, "age", Value::from(25i64));
4045 store.set_node_property(n1, "email", Value::from("alice@example.com"));
4046 store.set_node_property(n2, "name", Value::from("Bob"));
4047 store.set_node_property(n2, "age", Value::from(30i64));
4048 store.set_node_property(n2, "city", Value::from("NYC"));
4049
4050 let keys = vec![PropertyKey::new("name"), PropertyKey::new("age")];
4052 let props = store.get_nodes_properties_selective_batch(&[n1, n2], &keys);
4053
4054 assert_eq!(props.len(), 2);
4055
4056 assert_eq!(props[0].len(), 2);
4058 assert_eq!(
4059 props[0].get(&PropertyKey::new("name")),
4060 Some(&Value::from("Alice"))
4061 );
4062 assert_eq!(
4063 props[0].get(&PropertyKey::new("age")),
4064 Some(&Value::from(25i64))
4065 );
4066 assert_eq!(props[0].get(&PropertyKey::new("email")), None);
4067
4068 assert_eq!(props[1].len(), 2);
4070 assert_eq!(
4071 props[1].get(&PropertyKey::new("name")),
4072 Some(&Value::from("Bob"))
4073 );
4074 assert_eq!(
4075 props[1].get(&PropertyKey::new("age")),
4076 Some(&Value::from(30i64))
4077 );
4078 assert_eq!(props[1].get(&PropertyKey::new("city")), None);
4079 }
4080
4081 #[test]
4082 fn test_get_nodes_properties_selective_batch_empty_keys() {
4083 let store = LpgStore::new();
4084
4085 let n1 = store.create_node(&["Person"]);
4086 store.set_node_property(n1, "name", Value::from("Alice"));
4087
4088 let props = store.get_nodes_properties_selective_batch(&[n1], &[]);
4090
4091 assert_eq!(props.len(), 1);
4092 assert!(props[0].is_empty()); }
4094
4095 #[test]
4096 fn test_get_nodes_properties_selective_batch_missing_keys() {
4097 let store = LpgStore::new();
4098
4099 let n1 = store.create_node(&["Person"]);
4100 store.set_node_property(n1, "name", Value::from("Alice"));
4101
4102 let keys = vec![PropertyKey::new("nonexistent"), PropertyKey::new("name")];
4104 let props = store.get_nodes_properties_selective_batch(&[n1], &keys);
4105
4106 assert_eq!(props.len(), 1);
4107 assert_eq!(props[0].len(), 1); assert_eq!(
4109 props[0].get(&PropertyKey::new("name")),
4110 Some(&Value::from("Alice"))
4111 );
4112 }
4113
4114 #[test]
4117 fn test_find_nodes_in_range_inclusive() {
4118 let store = LpgStore::new();
4119
4120 let n1 = store.create_node_with_props(&["Person"], [("age", Value::from(20i64))]);
4121 let n2 = store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
4122 let n3 = store.create_node_with_props(&["Person"], [("age", Value::from(40i64))]);
4123 let _n4 = store.create_node_with_props(&["Person"], [("age", Value::from(50i64))]);
4124
4125 let result = store.find_nodes_in_range(
4127 "age",
4128 Some(&Value::from(20i64)),
4129 Some(&Value::from(40i64)),
4130 true,
4131 true,
4132 );
4133 assert_eq!(result.len(), 3);
4134 assert!(result.contains(&n1));
4135 assert!(result.contains(&n2));
4136 assert!(result.contains(&n3));
4137 }
4138
4139 #[test]
4140 fn test_find_nodes_in_range_exclusive() {
4141 let store = LpgStore::new();
4142
4143 store.create_node_with_props(&["Person"], [("age", Value::from(20i64))]);
4144 let n2 = store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
4145 store.create_node_with_props(&["Person"], [("age", Value::from(40i64))]);
4146
4147 let result = store.find_nodes_in_range(
4149 "age",
4150 Some(&Value::from(20i64)),
4151 Some(&Value::from(40i64)),
4152 false,
4153 false,
4154 );
4155 assert_eq!(result.len(), 1);
4156 assert!(result.contains(&n2));
4157 }
4158
4159 #[test]
4160 fn test_find_nodes_in_range_open_ended() {
4161 let store = LpgStore::new();
4162
4163 store.create_node_with_props(&["Person"], [("age", Value::from(20i64))]);
4164 store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
4165 let n3 = store.create_node_with_props(&["Person"], [("age", Value::from(40i64))]);
4166 let n4 = store.create_node_with_props(&["Person"], [("age", Value::from(50i64))]);
4167
4168 let result = store.find_nodes_in_range("age", Some(&Value::from(35i64)), None, true, true);
4170 assert_eq!(result.len(), 2);
4171 assert!(result.contains(&n3));
4172 assert!(result.contains(&n4));
4173
4174 let result = store.find_nodes_in_range("age", None, Some(&Value::from(25i64)), true, true);
4176 assert_eq!(result.len(), 1);
4177 }
4178
4179 #[test]
4180 fn test_find_nodes_in_range_empty_result() {
4181 let store = LpgStore::new();
4182
4183 store.create_node_with_props(&["Person"], [("age", Value::from(20i64))]);
4184
4185 let result = store.find_nodes_in_range(
4187 "age",
4188 Some(&Value::from(100i64)),
4189 Some(&Value::from(200i64)),
4190 true,
4191 true,
4192 );
4193 assert!(result.is_empty());
4194 }
4195
4196 #[test]
4197 fn test_find_nodes_in_range_nonexistent_property() {
4198 let store = LpgStore::new();
4199
4200 store.create_node_with_props(&["Person"], [("age", Value::from(20i64))]);
4201
4202 let result = store.find_nodes_in_range(
4203 "weight",
4204 Some(&Value::from(50i64)),
4205 Some(&Value::from(100i64)),
4206 true,
4207 true,
4208 );
4209 assert!(result.is_empty());
4210 }
4211
4212 #[test]
4215 fn test_find_nodes_by_properties_multiple_conditions() {
4216 let store = LpgStore::new();
4217
4218 let alice = store.create_node_with_props(
4219 &["Person"],
4220 [("name", Value::from("Alice")), ("city", Value::from("NYC"))],
4221 );
4222 store.create_node_with_props(
4223 &["Person"],
4224 [("name", Value::from("Bob")), ("city", Value::from("NYC"))],
4225 );
4226 store.create_node_with_props(
4227 &["Person"],
4228 [("name", Value::from("Alice")), ("city", Value::from("LA"))],
4229 );
4230
4231 let result = store.find_nodes_by_properties(&[
4233 ("name", Value::from("Alice")),
4234 ("city", Value::from("NYC")),
4235 ]);
4236 assert_eq!(result.len(), 1);
4237 assert!(result.contains(&alice));
4238 }
4239
4240 #[test]
4241 fn test_find_nodes_by_properties_empty_conditions() {
4242 let store = LpgStore::new();
4243
4244 store.create_node(&["Person"]);
4245 store.create_node(&["Person"]);
4246
4247 let result = store.find_nodes_by_properties(&[]);
4249 assert_eq!(result.len(), 2);
4250 }
4251
4252 #[test]
4253 fn test_find_nodes_by_properties_no_match() {
4254 let store = LpgStore::new();
4255
4256 store.create_node_with_props(&["Person"], [("name", Value::from("Alice"))]);
4257
4258 let result = store.find_nodes_by_properties(&[("name", Value::from("Nobody"))]);
4259 assert!(result.is_empty());
4260 }
4261
4262 #[test]
4263 fn test_find_nodes_by_properties_with_index() {
4264 let store = LpgStore::new();
4265
4266 store.create_property_index("name");
4268
4269 let alice = store.create_node_with_props(
4270 &["Person"],
4271 [("name", Value::from("Alice")), ("age", Value::from(30i64))],
4272 );
4273 store.create_node_with_props(
4274 &["Person"],
4275 [("name", Value::from("Bob")), ("age", Value::from(30i64))],
4276 );
4277
4278 let result = store.find_nodes_by_properties(&[
4280 ("name", Value::from("Alice")),
4281 ("age", Value::from(30i64)),
4282 ]);
4283 assert_eq!(result.len(), 1);
4284 assert!(result.contains(&alice));
4285 }
4286
4287 #[test]
4290 fn test_estimate_label_cardinality() {
4291 let store = LpgStore::new();
4292
4293 store.create_node(&["Person"]);
4294 store.create_node(&["Person"]);
4295 store.create_node(&["Animal"]);
4296
4297 store.ensure_statistics_fresh();
4298
4299 let person_est = store.estimate_label_cardinality("Person");
4300 let animal_est = store.estimate_label_cardinality("Animal");
4301 let unknown_est = store.estimate_label_cardinality("Unknown");
4302
4303 assert!(
4304 person_est >= 1.0,
4305 "Person should have cardinality >= 1, got {person_est}"
4306 );
4307 assert!(
4308 animal_est >= 1.0,
4309 "Animal should have cardinality >= 1, got {animal_est}"
4310 );
4311 assert!(unknown_est >= 0.0);
4313 }
4314
4315 #[test]
4316 fn test_estimate_avg_degree() {
4317 let store = LpgStore::new();
4318
4319 let a = store.create_node(&["Person"]);
4320 let b = store.create_node(&["Person"]);
4321 let c = store.create_node(&["Person"]);
4322
4323 store.create_edge(a, b, "KNOWS");
4324 store.create_edge(a, c, "KNOWS");
4325 store.create_edge(b, c, "KNOWS");
4326
4327 store.ensure_statistics_fresh();
4328
4329 let outgoing = store.estimate_avg_degree("KNOWS", true);
4330 let incoming = store.estimate_avg_degree("KNOWS", false);
4331
4332 assert!(
4333 outgoing > 0.0,
4334 "Outgoing degree should be > 0, got {outgoing}"
4335 );
4336 assert!(
4337 incoming > 0.0,
4338 "Incoming degree should be > 0, got {incoming}"
4339 );
4340 }
4341
4342 #[test]
4345 fn test_delete_node_does_not_cascade() {
4346 let store = LpgStore::new();
4347
4348 let a = store.create_node(&["A"]);
4349 let b = store.create_node(&["B"]);
4350 let e = store.create_edge(a, b, "KNOWS");
4351
4352 assert!(store.delete_node(a));
4353 assert!(store.get_node(a).is_none());
4354
4355 assert!(
4357 store.get_edge(e).is_some(),
4358 "Edge should survive non-detach node delete"
4359 );
4360 }
4361
4362 #[test]
4363 fn test_delete_already_deleted_node() {
4364 let store = LpgStore::new();
4365 let a = store.create_node(&["A"]);
4366
4367 assert!(store.delete_node(a));
4368 assert!(!store.delete_node(a));
4370 }
4371
4372 #[test]
4373 fn test_delete_nonexistent_node() {
4374 let store = LpgStore::new();
4375 assert!(!store.delete_node(NodeId::new(999)));
4376 }
4377}