1use super::property::CompareOp;
13use super::{Edge, EdgeRecord, Node, NodeRecord, PropertyStorage};
14use crate::graph::Direction;
15use crate::index::adjacency::ChunkedAdjacency;
16use crate::index::zone_map::ZoneMapEntry;
17use crate::statistics::{EdgeTypeStatistics, LabelStatistics, Statistics};
18use arcstr::ArcStr;
19use dashmap::DashMap;
20#[cfg(not(feature = "tiered-storage"))]
21use grafeo_common::mvcc::VersionChain;
22use grafeo_common::types::{EdgeId, EpochId, HashableValue, NodeId, PropertyKey, TxId, Value};
23use grafeo_common::utils::hash::{FxHashMap, FxHashSet};
24use parking_lot::RwLock;
25use std::cmp::Ordering as CmpOrdering;
26use std::sync::Arc;
27use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
28
29#[cfg(feature = "vector-index")]
30use crate::index::vector::HnswIndex;
31
32fn compare_values_for_range(a: &Value, b: &Value) -> Option<CmpOrdering> {
34 match (a, b) {
35 (Value::Int64(a), Value::Int64(b)) => Some(a.cmp(b)),
36 (Value::Float64(a), Value::Float64(b)) => a.partial_cmp(b),
37 (Value::String(a), Value::String(b)) => Some(a.cmp(b)),
38 (Value::Bool(a), Value::Bool(b)) => Some(a.cmp(b)),
39 _ => None,
40 }
41}
42
43fn value_in_range(
45 value: &Value,
46 min: Option<&Value>,
47 max: Option<&Value>,
48 min_inclusive: bool,
49 max_inclusive: bool,
50) -> bool {
51 if let Some(min_val) = min {
53 match compare_values_for_range(value, min_val) {
54 Some(CmpOrdering::Less) => return false,
55 Some(CmpOrdering::Equal) if !min_inclusive => return false,
56 None => return false, _ => {}
58 }
59 }
60
61 if let Some(max_val) = max {
63 match compare_values_for_range(value, max_val) {
64 Some(CmpOrdering::Greater) => return false,
65 Some(CmpOrdering::Equal) if !max_inclusive => return false,
66 None => return false,
67 _ => {}
68 }
69 }
70
71 true
72}
73
74#[cfg(feature = "tiered-storage")]
76use crate::storage::EpochStore;
77#[cfg(feature = "tiered-storage")]
78use grafeo_common::memory::arena::ArenaAllocator;
79#[cfg(feature = "tiered-storage")]
80use grafeo_common::mvcc::{ColdVersionRef, HotVersionRef, VersionIndex, VersionRef};
81
82#[derive(Debug, Clone)]
88pub struct LpgStoreConfig {
89 pub backward_edges: bool,
92 pub initial_node_capacity: usize,
94 pub initial_edge_capacity: usize,
96}
97
98impl Default for LpgStoreConfig {
99 fn default() -> Self {
100 Self {
101 backward_edges: true,
102 initial_node_capacity: 1024,
103 initial_edge_capacity: 4096,
104 }
105 }
106}
107
108pub struct LpgStore {
167 #[allow(dead_code)]
169 config: LpgStoreConfig,
170
171 #[cfg(not(feature = "tiered-storage"))]
175 nodes: RwLock<FxHashMap<NodeId, VersionChain<NodeRecord>>>,
176
177 #[cfg(not(feature = "tiered-storage"))]
181 edges: RwLock<FxHashMap<EdgeId, VersionChain<EdgeRecord>>>,
182
183 #[cfg(feature = "tiered-storage")]
197 arena_allocator: Arc<ArenaAllocator>,
198
199 #[cfg(feature = "tiered-storage")]
203 node_versions: RwLock<FxHashMap<NodeId, VersionIndex>>,
204
205 #[cfg(feature = "tiered-storage")]
209 edge_versions: RwLock<FxHashMap<EdgeId, VersionIndex>>,
210
211 #[cfg(feature = "tiered-storage")]
214 epoch_store: Arc<EpochStore>,
215
216 node_properties: PropertyStorage<NodeId>,
218
219 edge_properties: PropertyStorage<EdgeId>,
221
222 label_to_id: RwLock<FxHashMap<ArcStr, u32>>,
225
226 id_to_label: RwLock<Vec<ArcStr>>,
229
230 edge_type_to_id: RwLock<FxHashMap<ArcStr, u32>>,
233
234 id_to_edge_type: RwLock<Vec<ArcStr>>,
237
238 forward_adj: ChunkedAdjacency,
240
241 backward_adj: Option<ChunkedAdjacency>,
244
245 label_index: RwLock<Vec<FxHashMap<NodeId, ()>>>,
248
249 node_labels: RwLock<FxHashMap<NodeId, FxHashSet<u32>>>,
253
254 property_indexes: RwLock<FxHashMap<PropertyKey, DashMap<HashableValue, FxHashSet<NodeId>>>>,
260
261 #[cfg(feature = "vector-index")]
266 vector_indexes: RwLock<FxHashMap<String, Arc<HnswIndex>>>,
267
268 #[cfg(feature = "text-index")]
273 text_indexes: RwLock<FxHashMap<String, Arc<RwLock<crate::index::text::InvertedIndex>>>>,
274
275 next_node_id: AtomicU64,
277
278 next_edge_id: AtomicU64,
280
281 current_epoch: AtomicU64,
283
284 statistics: RwLock<Arc<Statistics>>,
287
288 needs_stats_recompute: AtomicBool,
290}
291
292impl LpgStore {
293 #[must_use]
295 pub fn new() -> Self {
296 Self::with_config(LpgStoreConfig::default())
297 }
298
299 #[must_use]
301 pub fn with_config(config: LpgStoreConfig) -> Self {
302 let backward_adj = if config.backward_edges {
303 Some(ChunkedAdjacency::new())
304 } else {
305 None
306 };
307
308 Self {
309 #[cfg(not(feature = "tiered-storage"))]
310 nodes: RwLock::new(FxHashMap::default()),
311 #[cfg(not(feature = "tiered-storage"))]
312 edges: RwLock::new(FxHashMap::default()),
313 #[cfg(feature = "tiered-storage")]
314 arena_allocator: Arc::new(ArenaAllocator::new()),
315 #[cfg(feature = "tiered-storage")]
316 node_versions: RwLock::new(FxHashMap::default()),
317 #[cfg(feature = "tiered-storage")]
318 edge_versions: RwLock::new(FxHashMap::default()),
319 #[cfg(feature = "tiered-storage")]
320 epoch_store: Arc::new(EpochStore::new()),
321 node_properties: PropertyStorage::new(),
322 edge_properties: PropertyStorage::new(),
323 label_to_id: RwLock::new(FxHashMap::default()),
324 id_to_label: RwLock::new(Vec::new()),
325 edge_type_to_id: RwLock::new(FxHashMap::default()),
326 id_to_edge_type: RwLock::new(Vec::new()),
327 forward_adj: ChunkedAdjacency::new(),
328 backward_adj,
329 label_index: RwLock::new(Vec::new()),
330 node_labels: RwLock::new(FxHashMap::default()),
331 property_indexes: RwLock::new(FxHashMap::default()),
332 #[cfg(feature = "vector-index")]
333 vector_indexes: RwLock::new(FxHashMap::default()),
334 #[cfg(feature = "text-index")]
335 text_indexes: RwLock::new(FxHashMap::default()),
336 next_node_id: AtomicU64::new(0),
337 next_edge_id: AtomicU64::new(0),
338 current_epoch: AtomicU64::new(0),
339 statistics: RwLock::new(Arc::new(Statistics::new())),
340 needs_stats_recompute: AtomicBool::new(true),
341 config,
342 }
343 }
344
345 #[must_use]
347 pub fn current_epoch(&self) -> EpochId {
348 EpochId::new(self.current_epoch.load(Ordering::Acquire))
349 }
350
351 pub fn new_epoch(&self) -> EpochId {
353 let id = self.current_epoch.fetch_add(1, Ordering::AcqRel) + 1;
354 EpochId::new(id)
355 }
356
357 pub fn sync_epoch(&self, epoch: EpochId) {
362 self.current_epoch
363 .fetch_max(epoch.as_u64(), Ordering::AcqRel);
364 }
365
366 pub fn create_node(&self, labels: &[&str]) -> NodeId {
372 self.needs_stats_recompute.store(true, Ordering::Relaxed);
373 self.create_node_versioned(labels, self.current_epoch(), TxId::SYSTEM)
374 }
375
376 #[cfg(not(feature = "tiered-storage"))]
378 pub fn create_node_versioned(&self, labels: &[&str], epoch: EpochId, tx_id: TxId) -> NodeId {
379 let id = NodeId::new(self.next_node_id.fetch_add(1, Ordering::Relaxed));
380
381 let mut record = NodeRecord::new(id, epoch);
382 record.set_label_count(labels.len() as u16);
383
384 let mut node_label_set = FxHashSet::default();
386 for label in labels {
387 let label_id = self.get_or_create_label_id(*label);
388 node_label_set.insert(label_id);
389
390 let mut index = self.label_index.write();
392 while index.len() <= label_id as usize {
393 index.push(FxHashMap::default());
394 }
395 index[label_id as usize].insert(id, ());
396 }
397
398 self.node_labels.write().insert(id, node_label_set);
400
401 let chain = VersionChain::with_initial(record, epoch, tx_id);
403 self.nodes.write().insert(id, chain);
404 id
405 }
406
407 #[cfg(feature = "tiered-storage")]
410 pub fn create_node_versioned(&self, labels: &[&str], epoch: EpochId, tx_id: TxId) -> NodeId {
411 let id = NodeId::new(self.next_node_id.fetch_add(1, Ordering::Relaxed));
412
413 let mut record = NodeRecord::new(id, epoch);
414 record.set_label_count(labels.len() as u16);
415
416 let mut node_label_set = FxHashSet::default();
418 for label in labels {
419 let label_id = self.get_or_create_label_id(*label);
420 node_label_set.insert(label_id);
421
422 let mut index = self.label_index.write();
424 while index.len() <= label_id as usize {
425 index.push(FxHashMap::default());
426 }
427 index[label_id as usize].insert(id, ());
428 }
429
430 self.node_labels.write().insert(id, node_label_set);
432
433 let arena = self.arena_allocator.arena_or_create(epoch);
435 let (offset, _stored) = arena.alloc_value_with_offset(record);
436
437 let hot_ref = HotVersionRef::new(epoch, offset, tx_id);
439
440 let mut versions = self.node_versions.write();
442 if let Some(index) = versions.get_mut(&id) {
443 index.add_hot(hot_ref);
444 } else {
445 versions.insert(id, VersionIndex::with_initial(hot_ref));
446 }
447
448 id
449 }
450
451 pub fn create_node_with_props(
453 &self,
454 labels: &[&str],
455 properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
456 ) -> NodeId {
457 self.create_node_with_props_versioned(
458 labels,
459 properties,
460 self.current_epoch(),
461 TxId::SYSTEM,
462 )
463 }
464
465 #[cfg(not(feature = "tiered-storage"))]
467 pub fn create_node_with_props_versioned(
468 &self,
469 labels: &[&str],
470 properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
471 epoch: EpochId,
472 tx_id: TxId,
473 ) -> NodeId {
474 let id = self.create_node_versioned(labels, epoch, tx_id);
475
476 for (key, value) in properties {
477 let prop_key: PropertyKey = key.into();
478 let prop_value: Value = value.into();
479 self.update_property_index_on_set(id, &prop_key, &prop_value);
481 self.node_properties.set(id, prop_key, prop_value);
482 }
483
484 let count = self.node_properties.get_all(id).len() as u16;
486 if let Some(chain) = self.nodes.write().get_mut(&id)
487 && let Some(record) = chain.latest_mut()
488 {
489 record.props_count = count;
490 }
491
492 id
493 }
494
495 #[cfg(feature = "tiered-storage")]
498 pub fn create_node_with_props_versioned(
499 &self,
500 labels: &[&str],
501 properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
502 epoch: EpochId,
503 tx_id: TxId,
504 ) -> NodeId {
505 let id = self.create_node_versioned(labels, epoch, tx_id);
506
507 for (key, value) in properties {
508 let prop_key: PropertyKey = key.into();
509 let prop_value: Value = value.into();
510 self.update_property_index_on_set(id, &prop_key, &prop_value);
512 self.node_properties.set(id, prop_key, prop_value);
513 }
514
515 id
519 }
520
521 #[must_use]
523 pub fn get_node(&self, id: NodeId) -> Option<Node> {
524 self.get_node_at_epoch(id, self.current_epoch())
525 }
526
527 #[must_use]
529 #[cfg(not(feature = "tiered-storage"))]
530 pub fn get_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> Option<Node> {
531 let nodes = self.nodes.read();
532 let chain = nodes.get(&id)?;
533 let record = chain.visible_at(epoch)?;
534
535 if record.is_deleted() {
536 return None;
537 }
538
539 let mut node = Node::new(id);
540
541 let id_to_label = self.id_to_label.read();
543 let node_labels = self.node_labels.read();
544 if let Some(label_ids) = node_labels.get(&id) {
545 for &label_id in label_ids {
546 if let Some(label) = id_to_label.get(label_id as usize) {
547 node.labels.push(label.clone());
548 }
549 }
550 }
551
552 node.properties = self.node_properties.get_all(id).into_iter().collect();
554
555 Some(node)
556 }
557
558 #[must_use]
561 #[cfg(feature = "tiered-storage")]
562 pub fn get_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> Option<Node> {
563 let versions = self.node_versions.read();
564 let index = versions.get(&id)?;
565 let version_ref = index.visible_at(epoch)?;
566
567 let record = self.read_node_record(&version_ref)?;
569
570 if record.is_deleted() {
571 return None;
572 }
573
574 let mut node = Node::new(id);
575
576 let id_to_label = self.id_to_label.read();
578 let node_labels = self.node_labels.read();
579 if let Some(label_ids) = node_labels.get(&id) {
580 for &label_id in label_ids {
581 if let Some(label) = id_to_label.get(label_id as usize) {
582 node.labels.push(label.clone());
583 }
584 }
585 }
586
587 node.properties = self.node_properties.get_all(id).into_iter().collect();
589
590 Some(node)
591 }
592
593 #[must_use]
595 #[cfg(not(feature = "tiered-storage"))]
596 pub fn get_node_versioned(&self, id: NodeId, epoch: EpochId, tx_id: TxId) -> Option<Node> {
597 let nodes = self.nodes.read();
598 let chain = nodes.get(&id)?;
599 let record = chain.visible_to(epoch, tx_id)?;
600
601 if record.is_deleted() {
602 return None;
603 }
604
605 let mut node = Node::new(id);
606
607 let id_to_label = self.id_to_label.read();
609 let node_labels = self.node_labels.read();
610 if let Some(label_ids) = node_labels.get(&id) {
611 for &label_id in label_ids {
612 if let Some(label) = id_to_label.get(label_id as usize) {
613 node.labels.push(label.clone());
614 }
615 }
616 }
617
618 node.properties = self.node_properties.get_all(id).into_iter().collect();
620
621 Some(node)
622 }
623
624 #[must_use]
627 #[cfg(feature = "tiered-storage")]
628 pub fn get_node_versioned(&self, id: NodeId, epoch: EpochId, tx_id: TxId) -> Option<Node> {
629 let versions = self.node_versions.read();
630 let index = versions.get(&id)?;
631 let version_ref = index.visible_to(epoch, tx_id)?;
632
633 let record = self.read_node_record(&version_ref)?;
635
636 if record.is_deleted() {
637 return None;
638 }
639
640 let mut node = Node::new(id);
641
642 let id_to_label = self.id_to_label.read();
644 let node_labels = self.node_labels.read();
645 if let Some(label_ids) = node_labels.get(&id) {
646 for &label_id in label_ids {
647 if let Some(label) = id_to_label.get(label_id as usize) {
648 node.labels.push(label.clone());
649 }
650 }
651 }
652
653 node.properties = self.node_properties.get_all(id).into_iter().collect();
655
656 Some(node)
657 }
658
659 #[cfg(feature = "tiered-storage")]
661 #[allow(unsafe_code)]
662 fn read_node_record(&self, version_ref: &VersionRef) -> Option<NodeRecord> {
663 match version_ref {
664 VersionRef::Hot(hot_ref) => {
665 let arena = self.arena_allocator.arena(hot_ref.epoch);
666 let record: &NodeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
668 Some(*record)
669 }
670 VersionRef::Cold(cold_ref) => {
671 self.epoch_store
673 .get_node(cold_ref.epoch, cold_ref.block_offset, cold_ref.length)
674 }
675 }
676 }
677
678 pub fn delete_node(&self, id: NodeId) -> bool {
680 self.needs_stats_recompute.store(true, Ordering::Relaxed);
681 self.delete_node_at_epoch(id, self.current_epoch())
682 }
683
684 #[cfg(not(feature = "tiered-storage"))]
686 pub fn delete_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> bool {
687 let mut nodes = self.nodes.write();
688 if let Some(chain) = nodes.get_mut(&id) {
689 if let Some(record) = chain.visible_at(epoch) {
691 if record.is_deleted() {
692 return false;
693 }
694 } else {
695 return false;
697 }
698
699 chain.mark_deleted(epoch);
701
702 let mut index = self.label_index.write();
704 let mut node_labels = self.node_labels.write();
705 if let Some(label_ids) = node_labels.remove(&id) {
706 for label_id in label_ids {
707 if let Some(set) = index.get_mut(label_id as usize) {
708 set.remove(&id);
709 }
710 }
711 }
712
713 #[cfg(feature = "text-index")]
715 self.remove_from_all_text_indexes(id);
716
717 drop(nodes); drop(index);
720 drop(node_labels);
721 self.node_properties.remove_all(id);
722
723 true
726 } else {
727 false
728 }
729 }
730
731 #[cfg(feature = "tiered-storage")]
734 pub fn delete_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> bool {
735 let mut versions = self.node_versions.write();
736 if let Some(index) = versions.get_mut(&id) {
737 if let Some(version_ref) = index.visible_at(epoch) {
739 if let Some(record) = self.read_node_record(&version_ref) {
740 if record.is_deleted() {
741 return false;
742 }
743 } else {
744 return false;
745 }
746 } else {
747 return false;
748 }
749
750 index.mark_deleted(epoch);
752
753 let mut label_index = self.label_index.write();
755 let mut node_labels = self.node_labels.write();
756 if let Some(label_ids) = node_labels.remove(&id) {
757 for label_id in label_ids {
758 if let Some(set) = label_index.get_mut(label_id as usize) {
759 set.remove(&id);
760 }
761 }
762 }
763
764 #[cfg(feature = "text-index")]
766 self.remove_from_all_text_indexes(id);
767
768 drop(versions);
770 drop(label_index);
771 drop(node_labels);
772 self.node_properties.remove_all(id);
773
774 true
775 } else {
776 false
777 }
778 }
779
780 #[cfg(not(feature = "tiered-storage"))]
785 pub fn delete_node_edges(&self, node_id: NodeId) {
786 let outgoing: Vec<EdgeId> = self
788 .forward_adj
789 .edges_from(node_id)
790 .into_iter()
791 .map(|(_, edge_id)| edge_id)
792 .collect();
793
794 let incoming: Vec<EdgeId> = if let Some(ref backward) = self.backward_adj {
796 backward
797 .edges_from(node_id)
798 .into_iter()
799 .map(|(_, edge_id)| edge_id)
800 .collect()
801 } else {
802 let epoch = self.current_epoch();
804 self.edges
805 .read()
806 .iter()
807 .filter_map(|(id, chain)| {
808 chain.visible_at(epoch).and_then(|r| {
809 if !r.is_deleted() && r.dst == node_id {
810 Some(*id)
811 } else {
812 None
813 }
814 })
815 })
816 .collect()
817 };
818
819 for edge_id in outgoing.into_iter().chain(incoming) {
821 self.delete_edge(edge_id);
822 }
823 }
824
825 #[cfg(feature = "tiered-storage")]
828 pub fn delete_node_edges(&self, node_id: NodeId) {
829 let outgoing: Vec<EdgeId> = self
831 .forward_adj
832 .edges_from(node_id)
833 .into_iter()
834 .map(|(_, edge_id)| edge_id)
835 .collect();
836
837 let incoming: Vec<EdgeId> = if let Some(ref backward) = self.backward_adj {
839 backward
840 .edges_from(node_id)
841 .into_iter()
842 .map(|(_, edge_id)| edge_id)
843 .collect()
844 } else {
845 let epoch = self.current_epoch();
847 let versions = self.edge_versions.read();
848 versions
849 .iter()
850 .filter_map(|(id, index)| {
851 index.visible_at(epoch).and_then(|vref| {
852 self.read_edge_record(&vref).and_then(|r| {
853 if !r.is_deleted() && r.dst == node_id {
854 Some(*id)
855 } else {
856 None
857 }
858 })
859 })
860 })
861 .collect()
862 };
863
864 for edge_id in outgoing.into_iter().chain(incoming) {
866 self.delete_edge(edge_id);
867 }
868 }
869
870 #[cfg(not(feature = "tiered-storage"))]
872 pub fn set_node_property(&self, id: NodeId, key: &str, value: Value) {
873 let prop_key: PropertyKey = key.into();
874
875 self.update_property_index_on_set(id, &prop_key, &value);
877
878 #[cfg(feature = "text-index")]
880 self.update_text_index_on_set(id, key, &value);
881
882 self.node_properties.set(id, prop_key, value);
883
884 let count = self.node_properties.get_all(id).len() as u16;
886 if let Some(chain) = self.nodes.write().get_mut(&id)
887 && let Some(record) = chain.latest_mut()
888 {
889 record.props_count = count;
890 }
891 }
892
893 #[cfg(feature = "tiered-storage")]
896 pub fn set_node_property(&self, id: NodeId, key: &str, value: Value) {
897 let prop_key: PropertyKey = key.into();
898
899 self.update_property_index_on_set(id, &prop_key, &value);
901
902 #[cfg(feature = "text-index")]
904 self.update_text_index_on_set(id, key, &value);
905
906 self.node_properties.set(id, prop_key, value);
907 }
911
912 pub fn set_edge_property(&self, id: EdgeId, key: &str, value: Value) {
914 self.edge_properties.set(id, key.into(), value);
915 }
916
917 #[cfg(not(feature = "tiered-storage"))]
921 pub fn remove_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
922 let prop_key: PropertyKey = key.into();
923
924 self.update_property_index_on_remove(id, &prop_key);
926
927 #[cfg(feature = "text-index")]
929 self.update_text_index_on_remove(id, key);
930
931 let result = self.node_properties.remove(id, &prop_key);
932
933 let count = self.node_properties.get_all(id).len() as u16;
935 if let Some(chain) = self.nodes.write().get_mut(&id)
936 && let Some(record) = chain.latest_mut()
937 {
938 record.props_count = count;
939 }
940
941 result
942 }
943
944 #[cfg(feature = "tiered-storage")]
947 pub fn remove_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
948 let prop_key: PropertyKey = key.into();
949
950 self.update_property_index_on_remove(id, &prop_key);
952
953 #[cfg(feature = "text-index")]
955 self.update_text_index_on_remove(id, key);
956
957 self.node_properties.remove(id, &prop_key)
958 }
960
961 pub fn remove_edge_property(&self, id: EdgeId, key: &str) -> Option<Value> {
965 self.edge_properties.remove(id, &key.into())
966 }
967
968 #[must_use]
983 pub fn get_node_property(&self, id: NodeId, key: &PropertyKey) -> Option<Value> {
984 self.node_properties.get(id, key)
985 }
986
987 #[must_use]
991 pub fn get_edge_property(&self, id: EdgeId, key: &PropertyKey) -> Option<Value> {
992 self.edge_properties.get(id, key)
993 }
994
995 #[must_use]
1018 pub fn get_node_property_batch(&self, ids: &[NodeId], key: &PropertyKey) -> Vec<Option<Value>> {
1019 self.node_properties.get_batch(ids, key)
1020 }
1021
1022 #[must_use]
1027 pub fn get_nodes_properties_batch(&self, ids: &[NodeId]) -> Vec<FxHashMap<PropertyKey, Value>> {
1028 self.node_properties.get_all_batch(ids)
1029 }
1030
1031 #[must_use]
1059 pub fn get_nodes_properties_selective_batch(
1060 &self,
1061 ids: &[NodeId],
1062 keys: &[PropertyKey],
1063 ) -> Vec<FxHashMap<PropertyKey, Value>> {
1064 self.node_properties.get_selective_batch(ids, keys)
1065 }
1066
1067 #[must_use]
1071 pub fn get_edges_properties_selective_batch(
1072 &self,
1073 ids: &[EdgeId],
1074 keys: &[PropertyKey],
1075 ) -> Vec<FxHashMap<PropertyKey, Value>> {
1076 self.edge_properties.get_selective_batch(ids, keys)
1077 }
1078
1079 #[must_use]
1115 pub fn find_nodes_in_range(
1116 &self,
1117 property: &str,
1118 min: Option<&Value>,
1119 max: Option<&Value>,
1120 min_inclusive: bool,
1121 max_inclusive: bool,
1122 ) -> Vec<NodeId> {
1123 let key = PropertyKey::new(property);
1124
1125 if !self
1127 .node_properties
1128 .might_match_range(&key, min, max, min_inclusive, max_inclusive)
1129 {
1130 return Vec::new();
1131 }
1132
1133 self.node_ids()
1135 .into_iter()
1136 .filter(|&node_id| {
1137 self.node_properties
1138 .get(node_id, &key)
1139 .is_some_and(|v| value_in_range(&v, min, max, min_inclusive, max_inclusive))
1140 })
1141 .collect()
1142 }
1143
1144 #[must_use]
1169 pub fn find_nodes_by_properties(&self, conditions: &[(&str, Value)]) -> Vec<NodeId> {
1170 if conditions.is_empty() {
1171 return self.node_ids();
1172 }
1173
1174 let mut best_start: Option<(usize, Vec<NodeId>)> = None;
1177 let indexes = self.property_indexes.read();
1178
1179 for (i, (prop, value)) in conditions.iter().enumerate() {
1180 let key = PropertyKey::new(*prop);
1181 let hv = HashableValue::new(value.clone());
1182
1183 if let Some(index) = indexes.get(&key) {
1184 let matches: Vec<NodeId> = index
1185 .get(&hv)
1186 .map(|nodes| nodes.iter().copied().collect())
1187 .unwrap_or_default();
1188
1189 if matches.is_empty() {
1191 return Vec::new();
1192 }
1193
1194 if best_start
1196 .as_ref()
1197 .is_none_or(|(_, best)| matches.len() < best.len())
1198 {
1199 best_start = Some((i, matches));
1200 }
1201 }
1202 }
1203 drop(indexes);
1204
1205 let (start_idx, mut candidates) = best_start.unwrap_or_else(|| {
1207 let (prop, value) = &conditions[0];
1209 (0, self.find_nodes_by_property(prop, value))
1210 });
1211
1212 for (i, (prop, value)) in conditions.iter().enumerate() {
1214 if i == start_idx {
1215 continue;
1216 }
1217
1218 let key = PropertyKey::new(*prop);
1219 candidates.retain(|&node_id| {
1220 self.node_properties
1221 .get(node_id, &key)
1222 .is_some_and(|v| v == *value)
1223 });
1224
1225 if candidates.is_empty() {
1227 return Vec::new();
1228 }
1229 }
1230
1231 candidates
1232 }
1233
1234 pub fn create_property_index(&self, property: &str) {
1262 let key = PropertyKey::new(property);
1263
1264 let mut indexes = self.property_indexes.write();
1265 if indexes.contains_key(&key) {
1266 return; }
1268
1269 let index: DashMap<HashableValue, FxHashSet<NodeId>> = DashMap::new();
1271
1272 for node_id in self.node_ids() {
1274 if let Some(value) = self.node_properties.get(node_id, &key) {
1275 let hv = HashableValue::new(value);
1276 index.entry(hv).or_default().insert(node_id);
1277 }
1278 }
1279
1280 indexes.insert(key, index);
1281 }
1282
1283 pub fn drop_property_index(&self, property: &str) -> bool {
1287 let key = PropertyKey::new(property);
1288 self.property_indexes.write().remove(&key).is_some()
1289 }
1290
1291 #[must_use]
1293 pub fn has_property_index(&self, property: &str) -> bool {
1294 let key = PropertyKey::new(property);
1295 self.property_indexes.read().contains_key(&key)
1296 }
1297
1298 #[cfg(feature = "vector-index")]
1300 pub fn add_vector_index(&self, label: &str, property: &str, index: Arc<HnswIndex>) {
1301 let key = format!("{label}:{property}");
1302 self.vector_indexes.write().insert(key, index);
1303 }
1304
1305 #[cfg(feature = "vector-index")]
1307 #[must_use]
1308 pub fn get_vector_index(&self, label: &str, property: &str) -> Option<Arc<HnswIndex>> {
1309 let key = format!("{label}:{property}");
1310 self.vector_indexes.read().get(&key).cloned()
1311 }
1312
1313 #[cfg(feature = "vector-index")]
1317 pub fn remove_vector_index(&self, label: &str, property: &str) -> bool {
1318 let key = format!("{label}:{property}");
1319 self.vector_indexes.write().remove(&key).is_some()
1320 }
1321
1322 #[cfg(feature = "vector-index")]
1326 #[must_use]
1327 pub fn vector_index_entries(&self) -> Vec<(String, Arc<HnswIndex>)> {
1328 self.vector_indexes
1329 .read()
1330 .iter()
1331 .map(|(k, v)| (k.clone(), v.clone()))
1332 .collect()
1333 }
1334
1335 #[cfg(feature = "text-index")]
1337 pub fn add_text_index(
1338 &self,
1339 label: &str,
1340 property: &str,
1341 index: Arc<RwLock<crate::index::text::InvertedIndex>>,
1342 ) {
1343 let key = format!("{label}:{property}");
1344 self.text_indexes.write().insert(key, index);
1345 }
1346
1347 #[cfg(feature = "text-index")]
1349 #[must_use]
1350 pub fn get_text_index(
1351 &self,
1352 label: &str,
1353 property: &str,
1354 ) -> Option<Arc<RwLock<crate::index::text::InvertedIndex>>> {
1355 let key = format!("{label}:{property}");
1356 self.text_indexes.read().get(&key).cloned()
1357 }
1358
1359 #[cfg(feature = "text-index")]
1363 pub fn remove_text_index(&self, label: &str, property: &str) -> bool {
1364 let key = format!("{label}:{property}");
1365 self.text_indexes.write().remove(&key).is_some()
1366 }
1367
1368 #[cfg(feature = "text-index")]
1372 pub fn text_index_entries(
1373 &self,
1374 ) -> Vec<(String, Arc<RwLock<crate::index::text::InvertedIndex>>)> {
1375 self.text_indexes
1376 .read()
1377 .iter()
1378 .map(|(k, v)| (k.clone(), v.clone()))
1379 .collect()
1380 }
1381
1382 #[cfg(feature = "text-index")]
1387 fn update_text_index_on_set(&self, id: NodeId, key: &str, value: &Value) {
1388 let text_indexes = self.text_indexes.read();
1389 if text_indexes.is_empty() {
1390 return;
1391 }
1392 let id_to_label = self.id_to_label.read();
1393 let node_labels = self.node_labels.read();
1394 if let Some(label_ids) = node_labels.get(&id) {
1395 for &label_id in label_ids {
1396 if let Some(label_name) = id_to_label.get(label_id as usize) {
1397 let index_key = format!("{label_name}:{key}");
1398 if let Some(index) = text_indexes.get(&index_key) {
1399 let mut idx = index.write();
1400 idx.remove(id);
1402 if let Value::String(text) = value {
1403 idx.insert(id, text);
1404 }
1405 }
1406 }
1407 }
1408 }
1409 }
1410
1411 #[cfg(feature = "text-index")]
1413 fn update_text_index_on_remove(&self, id: NodeId, key: &str) {
1414 let text_indexes = self.text_indexes.read();
1415 if text_indexes.is_empty() {
1416 return;
1417 }
1418 let id_to_label = self.id_to_label.read();
1419 let node_labels = self.node_labels.read();
1420 if let Some(label_ids) = node_labels.get(&id) {
1421 for &label_id in label_ids {
1422 if let Some(label_name) = id_to_label.get(label_id as usize) {
1423 let index_key = format!("{label_name}:{key}");
1424 if let Some(index) = text_indexes.get(&index_key) {
1425 index.write().remove(id);
1426 }
1427 }
1428 }
1429 }
1430 }
1431
1432 #[cfg(feature = "text-index")]
1434 fn remove_from_all_text_indexes(&self, id: NodeId) {
1435 let text_indexes = self.text_indexes.read();
1436 if text_indexes.is_empty() {
1437 return;
1438 }
1439 for (_, index) in text_indexes.iter() {
1440 index.write().remove(id);
1441 }
1442 }
1443
1444 #[must_use]
1467 pub fn find_nodes_by_property(&self, property: &str, value: &Value) -> Vec<NodeId> {
1468 let key = PropertyKey::new(property);
1469 let hv = HashableValue::new(value.clone());
1470
1471 let indexes = self.property_indexes.read();
1473 if let Some(index) = indexes.get(&key) {
1474 if let Some(nodes) = index.get(&hv) {
1475 return nodes.iter().copied().collect();
1476 }
1477 return Vec::new();
1478 }
1479 drop(indexes);
1480
1481 self.node_ids()
1483 .into_iter()
1484 .filter(|&node_id| {
1485 self.node_properties
1486 .get(node_id, &key)
1487 .is_some_and(|v| v == *value)
1488 })
1489 .collect()
1490 }
1491
1492 pub fn find_nodes_matching_filter(&self, property: &str, filter_value: &Value) -> Vec<NodeId> {
1498 let key = PropertyKey::new(property);
1499 self.node_ids()
1500 .into_iter()
1501 .filter(|&node_id| {
1502 self.node_properties
1503 .get(node_id, &key)
1504 .is_some_and(|v| Self::matches_filter(&v, filter_value))
1505 })
1506 .collect()
1507 }
1508
1509 fn matches_filter(node_value: &Value, filter_value: &Value) -> bool {
1514 match filter_value {
1515 Value::Map(ops) if ops.keys().any(|k| k.as_str().starts_with('$')) => {
1516 ops.iter().all(|(op_key, op_val)| {
1517 match op_key.as_str() {
1518 "$gt" => {
1519 Self::compare_values(node_value, op_val)
1520 == Some(std::cmp::Ordering::Greater)
1521 }
1522 "$gte" => matches!(
1523 Self::compare_values(node_value, op_val),
1524 Some(std::cmp::Ordering::Greater | std::cmp::Ordering::Equal)
1525 ),
1526 "$lt" => {
1527 Self::compare_values(node_value, op_val)
1528 == Some(std::cmp::Ordering::Less)
1529 }
1530 "$lte" => matches!(
1531 Self::compare_values(node_value, op_val),
1532 Some(std::cmp::Ordering::Less | std::cmp::Ordering::Equal)
1533 ),
1534 "$ne" => node_value != op_val,
1535 "$in" => match op_val {
1536 Value::List(items) => items.iter().any(|v| v == node_value),
1537 _ => false,
1538 },
1539 "$nin" => match op_val {
1540 Value::List(items) => !items.iter().any(|v| v == node_value),
1541 _ => true,
1542 },
1543 "$contains" => match (node_value, op_val) {
1544 (Value::String(a), Value::String(b)) => a.contains(b.as_str()),
1545 _ => false,
1546 },
1547 _ => false, }
1549 })
1550 }
1551 _ => node_value == filter_value, }
1553 }
1554
1555 fn compare_values(a: &Value, b: &Value) -> Option<std::cmp::Ordering> {
1557 match (a, b) {
1558 (Value::Int64(a), Value::Int64(b)) => Some(a.cmp(b)),
1559 (Value::Float64(a), Value::Float64(b)) => a.partial_cmp(b),
1560 (Value::String(a), Value::String(b)) => Some(a.cmp(b)),
1561 (Value::Bool(a), Value::Bool(b)) => Some(a.cmp(b)),
1562 (Value::Int64(a), Value::Float64(b)) => (*a as f64).partial_cmp(b),
1563 (Value::Float64(a), Value::Int64(b)) => a.partial_cmp(&(*b as f64)),
1564 _ => None,
1565 }
1566 }
1567
1568 fn update_property_index_on_set(&self, node_id: NodeId, key: &PropertyKey, new_value: &Value) {
1570 let indexes = self.property_indexes.read();
1571 if let Some(index) = indexes.get(key) {
1572 if let Some(old_value) = self.node_properties.get(node_id, key) {
1574 let old_hv = HashableValue::new(old_value);
1575 if let Some(mut nodes) = index.get_mut(&old_hv) {
1576 nodes.remove(&node_id);
1577 if nodes.is_empty() {
1578 drop(nodes);
1579 index.remove(&old_hv);
1580 }
1581 }
1582 }
1583
1584 let new_hv = HashableValue::new(new_value.clone());
1586 index
1587 .entry(new_hv)
1588 .or_insert_with(FxHashSet::default)
1589 .insert(node_id);
1590 }
1591 }
1592
1593 fn update_property_index_on_remove(&self, node_id: NodeId, key: &PropertyKey) {
1595 let indexes = self.property_indexes.read();
1596 if let Some(index) = indexes.get(key) {
1597 if let Some(old_value) = self.node_properties.get(node_id, key) {
1599 let old_hv = HashableValue::new(old_value);
1600 if let Some(mut nodes) = index.get_mut(&old_hv) {
1601 nodes.remove(&node_id);
1602 if nodes.is_empty() {
1603 drop(nodes);
1604 index.remove(&old_hv);
1605 }
1606 }
1607 }
1608 }
1609 }
1610
1611 #[cfg(not(feature = "tiered-storage"))]
1616 pub fn add_label(&self, node_id: NodeId, label: &str) -> bool {
1617 let epoch = self.current_epoch();
1618
1619 let nodes = self.nodes.read();
1621 if let Some(chain) = nodes.get(&node_id) {
1622 if chain.visible_at(epoch).map_or(true, |r| r.is_deleted()) {
1623 return false;
1624 }
1625 } else {
1626 return false;
1627 }
1628 drop(nodes);
1629
1630 let label_id = self.get_or_create_label_id(label);
1632
1633 let mut node_labels = self.node_labels.write();
1635 let label_set = node_labels.entry(node_id).or_default();
1636
1637 if label_set.contains(&label_id) {
1638 return false; }
1640
1641 label_set.insert(label_id);
1642 drop(node_labels);
1643
1644 let mut index = self.label_index.write();
1646 if (label_id as usize) >= index.len() {
1647 index.resize(label_id as usize + 1, FxHashMap::default());
1648 }
1649 index[label_id as usize].insert(node_id, ());
1650
1651 if let Some(chain) = self.nodes.write().get_mut(&node_id)
1653 && let Some(record) = chain.latest_mut()
1654 {
1655 let count = self.node_labels.read().get(&node_id).map_or(0, |s| s.len());
1656 record.set_label_count(count as u16);
1657 }
1658
1659 true
1660 }
1661
1662 #[cfg(feature = "tiered-storage")]
1665 pub fn add_label(&self, node_id: NodeId, label: &str) -> bool {
1666 let epoch = self.current_epoch();
1667
1668 let versions = self.node_versions.read();
1670 if let Some(index) = versions.get(&node_id) {
1671 if let Some(vref) = index.visible_at(epoch) {
1672 if let Some(record) = self.read_node_record(&vref) {
1673 if record.is_deleted() {
1674 return false;
1675 }
1676 } else {
1677 return false;
1678 }
1679 } else {
1680 return false;
1681 }
1682 } else {
1683 return false;
1684 }
1685 drop(versions);
1686
1687 let label_id = self.get_or_create_label_id(label);
1689
1690 let mut node_labels = self.node_labels.write();
1692 let label_set = node_labels.entry(node_id).or_default();
1693
1694 if label_set.contains(&label_id) {
1695 return false; }
1697
1698 label_set.insert(label_id);
1699 drop(node_labels);
1700
1701 let mut index = self.label_index.write();
1703 if (label_id as usize) >= index.len() {
1704 index.resize(label_id as usize + 1, FxHashMap::default());
1705 }
1706 index[label_id as usize].insert(node_id, ());
1707
1708 true
1712 }
1713
1714 #[cfg(not(feature = "tiered-storage"))]
1719 pub fn remove_label(&self, node_id: NodeId, label: &str) -> bool {
1720 let epoch = self.current_epoch();
1721
1722 let nodes = self.nodes.read();
1724 if let Some(chain) = nodes.get(&node_id) {
1725 if chain.visible_at(epoch).map_or(true, |r| r.is_deleted()) {
1726 return false;
1727 }
1728 } else {
1729 return false;
1730 }
1731 drop(nodes);
1732
1733 let label_id = {
1735 let label_ids = self.label_to_id.read();
1736 match label_ids.get(label) {
1737 Some(&id) => id,
1738 None => return false, }
1740 };
1741
1742 let mut node_labels = self.node_labels.write();
1744 if let Some(label_set) = node_labels.get_mut(&node_id) {
1745 if !label_set.remove(&label_id) {
1746 return false; }
1748 } else {
1749 return false;
1750 }
1751 drop(node_labels);
1752
1753 let mut index = self.label_index.write();
1755 if (label_id as usize) < index.len() {
1756 index[label_id as usize].remove(&node_id);
1757 }
1758
1759 if let Some(chain) = self.nodes.write().get_mut(&node_id)
1761 && let Some(record) = chain.latest_mut()
1762 {
1763 let count = self.node_labels.read().get(&node_id).map_or(0, |s| s.len());
1764 record.set_label_count(count as u16);
1765 }
1766
1767 true
1768 }
1769
1770 #[cfg(feature = "tiered-storage")]
1773 pub fn remove_label(&self, node_id: NodeId, label: &str) -> bool {
1774 let epoch = self.current_epoch();
1775
1776 let versions = self.node_versions.read();
1778 if let Some(index) = versions.get(&node_id) {
1779 if let Some(vref) = index.visible_at(epoch) {
1780 if let Some(record) = self.read_node_record(&vref) {
1781 if record.is_deleted() {
1782 return false;
1783 }
1784 } else {
1785 return false;
1786 }
1787 } else {
1788 return false;
1789 }
1790 } else {
1791 return false;
1792 }
1793 drop(versions);
1794
1795 let label_id = {
1797 let label_ids = self.label_to_id.read();
1798 match label_ids.get(label) {
1799 Some(&id) => id,
1800 None => return false, }
1802 };
1803
1804 let mut node_labels = self.node_labels.write();
1806 if let Some(label_set) = node_labels.get_mut(&node_id) {
1807 if !label_set.remove(&label_id) {
1808 return false; }
1810 } else {
1811 return false;
1812 }
1813 drop(node_labels);
1814
1815 let mut index = self.label_index.write();
1817 if (label_id as usize) < index.len() {
1818 index[label_id as usize].remove(&node_id);
1819 }
1820
1821 true
1824 }
1825
1826 #[must_use]
1828 #[cfg(not(feature = "tiered-storage"))]
1829 pub fn node_count(&self) -> usize {
1830 let epoch = self.current_epoch();
1831 self.nodes
1832 .read()
1833 .values()
1834 .filter_map(|chain| chain.visible_at(epoch))
1835 .filter(|r| !r.is_deleted())
1836 .count()
1837 }
1838
1839 #[must_use]
1842 #[cfg(feature = "tiered-storage")]
1843 pub fn node_count(&self) -> usize {
1844 let epoch = self.current_epoch();
1845 let versions = self.node_versions.read();
1846 versions
1847 .iter()
1848 .filter(|(_, index)| {
1849 index.visible_at(epoch).map_or(false, |vref| {
1850 self.read_node_record(&vref)
1851 .map_or(false, |r| !r.is_deleted())
1852 })
1853 })
1854 .count()
1855 }
1856
1857 #[must_use]
1863 #[cfg(not(feature = "tiered-storage"))]
1864 pub fn node_ids(&self) -> Vec<NodeId> {
1865 let epoch = self.current_epoch();
1866 let mut ids: Vec<NodeId> = self
1867 .nodes
1868 .read()
1869 .iter()
1870 .filter_map(|(id, chain)| {
1871 chain
1872 .visible_at(epoch)
1873 .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
1874 })
1875 .collect();
1876 ids.sort_unstable();
1877 ids
1878 }
1879
1880 #[must_use]
1883 #[cfg(feature = "tiered-storage")]
1884 pub fn node_ids(&self) -> Vec<NodeId> {
1885 let epoch = self.current_epoch();
1886 let versions = self.node_versions.read();
1887 let mut ids: Vec<NodeId> = versions
1888 .iter()
1889 .filter_map(|(id, index)| {
1890 index.visible_at(epoch).and_then(|vref| {
1891 self.read_node_record(&vref)
1892 .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
1893 })
1894 })
1895 .collect();
1896 ids.sort_unstable();
1897 ids
1898 }
1899
1900 pub fn create_edge(&self, src: NodeId, dst: NodeId, edge_type: &str) -> EdgeId {
1904 self.needs_stats_recompute.store(true, Ordering::Relaxed);
1905 self.create_edge_versioned(src, dst, edge_type, self.current_epoch(), TxId::SYSTEM)
1906 }
1907
1908 #[cfg(not(feature = "tiered-storage"))]
1910 pub fn create_edge_versioned(
1911 &self,
1912 src: NodeId,
1913 dst: NodeId,
1914 edge_type: &str,
1915 epoch: EpochId,
1916 tx_id: TxId,
1917 ) -> EdgeId {
1918 let id = EdgeId::new(self.next_edge_id.fetch_add(1, Ordering::Relaxed));
1919 let type_id = self.get_or_create_edge_type_id(edge_type);
1920
1921 let record = EdgeRecord::new(id, src, dst, type_id, epoch);
1922 let chain = VersionChain::with_initial(record, epoch, tx_id);
1923 self.edges.write().insert(id, chain);
1924
1925 self.forward_adj.add_edge(src, dst, id);
1927 if let Some(ref backward) = self.backward_adj {
1928 backward.add_edge(dst, src, id);
1929 }
1930
1931 id
1932 }
1933
1934 #[cfg(feature = "tiered-storage")]
1937 pub fn create_edge_versioned(
1938 &self,
1939 src: NodeId,
1940 dst: NodeId,
1941 edge_type: &str,
1942 epoch: EpochId,
1943 tx_id: TxId,
1944 ) -> EdgeId {
1945 let id = EdgeId::new(self.next_edge_id.fetch_add(1, Ordering::Relaxed));
1946 let type_id = self.get_or_create_edge_type_id(edge_type);
1947
1948 let record = EdgeRecord::new(id, src, dst, type_id, epoch);
1949
1950 let arena = self.arena_allocator.arena_or_create(epoch);
1952 let (offset, _stored) = arena.alloc_value_with_offset(record);
1953
1954 let hot_ref = HotVersionRef::new(epoch, offset, tx_id);
1956
1957 let mut versions = self.edge_versions.write();
1959 if let Some(index) = versions.get_mut(&id) {
1960 index.add_hot(hot_ref);
1961 } else {
1962 versions.insert(id, VersionIndex::with_initial(hot_ref));
1963 }
1964
1965 self.forward_adj.add_edge(src, dst, id);
1967 if let Some(ref backward) = self.backward_adj {
1968 backward.add_edge(dst, src, id);
1969 }
1970
1971 id
1972 }
1973
1974 pub fn create_edge_with_props(
1976 &self,
1977 src: NodeId,
1978 dst: NodeId,
1979 edge_type: &str,
1980 properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
1981 ) -> EdgeId {
1982 let id = self.create_edge(src, dst, edge_type);
1983
1984 for (key, value) in properties {
1985 self.edge_properties.set(id, key.into(), value.into());
1986 }
1987
1988 id
1989 }
1990
1991 #[must_use]
1993 pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
1994 self.get_edge_at_epoch(id, self.current_epoch())
1995 }
1996
1997 #[must_use]
1999 #[cfg(not(feature = "tiered-storage"))]
2000 pub fn get_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> Option<Edge> {
2001 let edges = self.edges.read();
2002 let chain = edges.get(&id)?;
2003 let record = chain.visible_at(epoch)?;
2004
2005 if record.is_deleted() {
2006 return None;
2007 }
2008
2009 let edge_type = {
2010 let id_to_type = self.id_to_edge_type.read();
2011 id_to_type.get(record.type_id as usize)?.clone()
2012 };
2013
2014 let mut edge = Edge::new(id, record.src, record.dst, edge_type);
2015
2016 edge.properties = self.edge_properties.get_all(id).into_iter().collect();
2018
2019 Some(edge)
2020 }
2021
2022 #[must_use]
2025 #[cfg(feature = "tiered-storage")]
2026 pub fn get_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> Option<Edge> {
2027 let versions = self.edge_versions.read();
2028 let index = versions.get(&id)?;
2029 let version_ref = index.visible_at(epoch)?;
2030
2031 let record = self.read_edge_record(&version_ref)?;
2032
2033 if record.is_deleted() {
2034 return None;
2035 }
2036
2037 let edge_type = {
2038 let id_to_type = self.id_to_edge_type.read();
2039 id_to_type.get(record.type_id as usize)?.clone()
2040 };
2041
2042 let mut edge = Edge::new(id, record.src, record.dst, edge_type);
2043
2044 edge.properties = self.edge_properties.get_all(id).into_iter().collect();
2046
2047 Some(edge)
2048 }
2049
2050 #[must_use]
2052 #[cfg(not(feature = "tiered-storage"))]
2053 pub fn get_edge_versioned(&self, id: EdgeId, epoch: EpochId, tx_id: TxId) -> Option<Edge> {
2054 let edges = self.edges.read();
2055 let chain = edges.get(&id)?;
2056 let record = chain.visible_to(epoch, tx_id)?;
2057
2058 if record.is_deleted() {
2059 return None;
2060 }
2061
2062 let edge_type = {
2063 let id_to_type = self.id_to_edge_type.read();
2064 id_to_type.get(record.type_id as usize)?.clone()
2065 };
2066
2067 let mut edge = Edge::new(id, record.src, record.dst, edge_type);
2068
2069 edge.properties = self.edge_properties.get_all(id).into_iter().collect();
2071
2072 Some(edge)
2073 }
2074
2075 #[must_use]
2078 #[cfg(feature = "tiered-storage")]
2079 pub fn get_edge_versioned(&self, id: EdgeId, epoch: EpochId, tx_id: TxId) -> Option<Edge> {
2080 let versions = self.edge_versions.read();
2081 let index = versions.get(&id)?;
2082 let version_ref = index.visible_to(epoch, tx_id)?;
2083
2084 let record = self.read_edge_record(&version_ref)?;
2085
2086 if record.is_deleted() {
2087 return None;
2088 }
2089
2090 let edge_type = {
2091 let id_to_type = self.id_to_edge_type.read();
2092 id_to_type.get(record.type_id as usize)?.clone()
2093 };
2094
2095 let mut edge = Edge::new(id, record.src, record.dst, edge_type);
2096
2097 edge.properties = self.edge_properties.get_all(id).into_iter().collect();
2099
2100 Some(edge)
2101 }
2102
2103 #[cfg(feature = "tiered-storage")]
2105 #[allow(unsafe_code)]
2106 fn read_edge_record(&self, version_ref: &VersionRef) -> Option<EdgeRecord> {
2107 match version_ref {
2108 VersionRef::Hot(hot_ref) => {
2109 let arena = self.arena_allocator.arena(hot_ref.epoch);
2110 let record: &EdgeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
2112 Some(*record)
2113 }
2114 VersionRef::Cold(cold_ref) => {
2115 self.epoch_store
2117 .get_edge(cold_ref.epoch, cold_ref.block_offset, cold_ref.length)
2118 }
2119 }
2120 }
2121
2122 pub fn delete_edge(&self, id: EdgeId) -> bool {
2124 self.needs_stats_recompute.store(true, Ordering::Relaxed);
2125 self.delete_edge_at_epoch(id, self.current_epoch())
2126 }
2127
2128 #[cfg(not(feature = "tiered-storage"))]
2130 pub fn delete_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> bool {
2131 let mut edges = self.edges.write();
2132 if let Some(chain) = edges.get_mut(&id) {
2133 let (src, dst) = {
2135 match chain.visible_at(epoch) {
2136 Some(record) => {
2137 if record.is_deleted() {
2138 return false;
2139 }
2140 (record.src, record.dst)
2141 }
2142 None => return false, }
2144 };
2145
2146 chain.mark_deleted(epoch);
2148
2149 drop(edges); self.forward_adj.mark_deleted(src, id);
2153 if let Some(ref backward) = self.backward_adj {
2154 backward.mark_deleted(dst, id);
2155 }
2156
2157 self.edge_properties.remove_all(id);
2159
2160 true
2161 } else {
2162 false
2163 }
2164 }
2165
2166 #[cfg(feature = "tiered-storage")]
2169 pub fn delete_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> bool {
2170 let mut versions = self.edge_versions.write();
2171 if let Some(index) = versions.get_mut(&id) {
2172 let (src, dst) = {
2174 match index.visible_at(epoch) {
2175 Some(version_ref) => {
2176 if let Some(record) = self.read_edge_record(&version_ref) {
2177 if record.is_deleted() {
2178 return false;
2179 }
2180 (record.src, record.dst)
2181 } else {
2182 return false;
2183 }
2184 }
2185 None => return false,
2186 }
2187 };
2188
2189 index.mark_deleted(epoch);
2191
2192 drop(versions); self.forward_adj.mark_deleted(src, id);
2196 if let Some(ref backward) = self.backward_adj {
2197 backward.mark_deleted(dst, id);
2198 }
2199
2200 self.edge_properties.remove_all(id);
2202
2203 true
2204 } else {
2205 false
2206 }
2207 }
2208
2209 #[must_use]
2211 #[cfg(not(feature = "tiered-storage"))]
2212 pub fn edge_count(&self) -> usize {
2213 let epoch = self.current_epoch();
2214 self.edges
2215 .read()
2216 .values()
2217 .filter_map(|chain| chain.visible_at(epoch))
2218 .filter(|r| !r.is_deleted())
2219 .count()
2220 }
2221
2222 #[must_use]
2225 #[cfg(feature = "tiered-storage")]
2226 pub fn edge_count(&self) -> usize {
2227 let epoch = self.current_epoch();
2228 let versions = self.edge_versions.read();
2229 versions
2230 .iter()
2231 .filter(|(_, index)| {
2232 index.visible_at(epoch).map_or(false, |vref| {
2233 self.read_edge_record(&vref)
2234 .map_or(false, |r| !r.is_deleted())
2235 })
2236 })
2237 .count()
2238 }
2239
2240 #[cfg(not(feature = "tiered-storage"))]
2245 pub fn discard_uncommitted_versions(&self, tx_id: TxId) {
2246 {
2248 let mut nodes = self.nodes.write();
2249 for chain in nodes.values_mut() {
2250 chain.remove_versions_by(tx_id);
2251 }
2252 nodes.retain(|_, chain| !chain.is_empty());
2254 }
2255
2256 {
2258 let mut edges = self.edges.write();
2259 for chain in edges.values_mut() {
2260 chain.remove_versions_by(tx_id);
2261 }
2262 edges.retain(|_, chain| !chain.is_empty());
2264 }
2265 }
2266
2267 #[cfg(feature = "tiered-storage")]
2270 pub fn discard_uncommitted_versions(&self, tx_id: TxId) {
2271 {
2273 let mut versions = self.node_versions.write();
2274 for index in versions.values_mut() {
2275 index.remove_versions_by(tx_id);
2276 }
2277 versions.retain(|_, index| !index.is_empty());
2279 }
2280
2281 {
2283 let mut versions = self.edge_versions.write();
2284 for index in versions.values_mut() {
2285 index.remove_versions_by(tx_id);
2286 }
2287 versions.retain(|_, index| !index.is_empty());
2289 }
2290 }
2291
2292 #[cfg(not(feature = "tiered-storage"))]
2297 pub fn gc_versions(&self, min_epoch: EpochId) {
2298 {
2299 let mut nodes = self.nodes.write();
2300 for chain in nodes.values_mut() {
2301 chain.gc(min_epoch);
2302 }
2303 nodes.retain(|_, chain| !chain.is_empty());
2304 }
2305 {
2306 let mut edges = self.edges.write();
2307 for chain in edges.values_mut() {
2308 chain.gc(min_epoch);
2309 }
2310 edges.retain(|_, chain| !chain.is_empty());
2311 }
2312 }
2313
2314 #[cfg(feature = "tiered-storage")]
2316 pub fn gc_versions(&self, min_epoch: EpochId) {
2317 {
2318 let mut versions = self.node_versions.write();
2319 for index in versions.values_mut() {
2320 index.gc(min_epoch);
2321 }
2322 versions.retain(|_, index| !index.is_empty());
2323 }
2324 {
2325 let mut versions = self.edge_versions.write();
2326 for index in versions.values_mut() {
2327 index.gc(min_epoch);
2328 }
2329 versions.retain(|_, index| !index.is_empty());
2330 }
2331 }
2332
2333 #[cfg(feature = "tiered-storage")]
2352 #[allow(unsafe_code)]
2353 pub fn freeze_epoch(&self, epoch: EpochId) -> usize {
2354 let mut node_records: Vec<(u64, NodeRecord)> = Vec::new();
2356 let mut node_hot_refs: Vec<(NodeId, HotVersionRef)> = Vec::new();
2357
2358 {
2359 let versions = self.node_versions.read();
2360 for (node_id, index) in versions.iter() {
2361 for hot_ref in index.hot_refs_for_epoch(epoch) {
2362 let arena = self.arena_allocator.arena(hot_ref.epoch);
2363 let record: &NodeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
2365 node_records.push((node_id.as_u64(), *record));
2366 node_hot_refs.push((*node_id, *hot_ref));
2367 }
2368 }
2369 }
2370
2371 let mut edge_records: Vec<(u64, EdgeRecord)> = Vec::new();
2373 let mut edge_hot_refs: Vec<(EdgeId, HotVersionRef)> = Vec::new();
2374
2375 {
2376 let versions = self.edge_versions.read();
2377 for (edge_id, index) in versions.iter() {
2378 for hot_ref in index.hot_refs_for_epoch(epoch) {
2379 let arena = self.arena_allocator.arena(hot_ref.epoch);
2380 let record: &EdgeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
2382 edge_records.push((edge_id.as_u64(), *record));
2383 edge_hot_refs.push((*edge_id, *hot_ref));
2384 }
2385 }
2386 }
2387
2388 let total_frozen = node_records.len() + edge_records.len();
2389
2390 if total_frozen == 0 {
2391 return 0;
2392 }
2393
2394 let (node_entries, edge_entries) =
2396 self.epoch_store
2397 .freeze_epoch(epoch, node_records, edge_records);
2398
2399 let node_entry_map: FxHashMap<u64, _> = node_entries
2401 .iter()
2402 .map(|e| (e.entity_id, (e.offset, e.length)))
2403 .collect();
2404 let edge_entry_map: FxHashMap<u64, _> = edge_entries
2405 .iter()
2406 .map(|e| (e.entity_id, (e.offset, e.length)))
2407 .collect();
2408
2409 {
2411 let mut versions = self.node_versions.write();
2412 for (node_id, hot_ref) in &node_hot_refs {
2413 if let Some(index) = versions.get_mut(node_id)
2414 && let Some(&(offset, length)) = node_entry_map.get(&node_id.as_u64())
2415 {
2416 let cold_ref = ColdVersionRef {
2417 epoch,
2418 block_offset: offset,
2419 length,
2420 created_by: hot_ref.created_by,
2421 deleted_epoch: hot_ref.deleted_epoch,
2422 };
2423 index.freeze_epoch(epoch, std::iter::once(cold_ref));
2424 }
2425 }
2426 }
2427
2428 {
2429 let mut versions = self.edge_versions.write();
2430 for (edge_id, hot_ref) in &edge_hot_refs {
2431 if let Some(index) = versions.get_mut(edge_id)
2432 && let Some(&(offset, length)) = edge_entry_map.get(&edge_id.as_u64())
2433 {
2434 let cold_ref = ColdVersionRef {
2435 epoch,
2436 block_offset: offset,
2437 length,
2438 created_by: hot_ref.created_by,
2439 deleted_epoch: hot_ref.deleted_epoch,
2440 };
2441 index.freeze_epoch(epoch, std::iter::once(cold_ref));
2442 }
2443 }
2444 }
2445
2446 total_frozen
2447 }
2448
2449 #[cfg(feature = "tiered-storage")]
2451 #[must_use]
2452 pub fn epoch_store(&self) -> &EpochStore {
2453 &self.epoch_store
2454 }
2455
2456 #[must_use]
2458 pub fn label_count(&self) -> usize {
2459 self.id_to_label.read().len()
2460 }
2461
2462 #[must_use]
2466 pub fn property_key_count(&self) -> usize {
2467 let node_keys = self.node_properties.column_count();
2468 let edge_keys = self.edge_properties.column_count();
2469 node_keys + edge_keys
2473 }
2474
2475 #[must_use]
2477 pub fn edge_type_count(&self) -> usize {
2478 self.id_to_edge_type.read().len()
2479 }
2480
2481 pub fn neighbors(
2488 &self,
2489 node: NodeId,
2490 direction: Direction,
2491 ) -> impl Iterator<Item = NodeId> + '_ {
2492 let forward: Box<dyn Iterator<Item = NodeId>> = match direction {
2493 Direction::Outgoing | Direction::Both => {
2494 Box::new(self.forward_adj.neighbors(node).into_iter())
2495 }
2496 Direction::Incoming => Box::new(std::iter::empty()),
2497 };
2498
2499 let backward: Box<dyn Iterator<Item = NodeId>> = match direction {
2500 Direction::Incoming | Direction::Both => {
2501 if let Some(ref adj) = self.backward_adj {
2502 Box::new(adj.neighbors(node).into_iter())
2503 } else {
2504 Box::new(std::iter::empty())
2505 }
2506 }
2507 Direction::Outgoing => Box::new(std::iter::empty()),
2508 };
2509
2510 forward.chain(backward)
2511 }
2512
2513 pub fn edges_from(
2517 &self,
2518 node: NodeId,
2519 direction: Direction,
2520 ) -> impl Iterator<Item = (NodeId, EdgeId)> + '_ {
2521 let forward: Box<dyn Iterator<Item = (NodeId, EdgeId)>> = match direction {
2522 Direction::Outgoing | Direction::Both => {
2523 Box::new(self.forward_adj.edges_from(node).into_iter())
2524 }
2525 Direction::Incoming => Box::new(std::iter::empty()),
2526 };
2527
2528 let backward: Box<dyn Iterator<Item = (NodeId, EdgeId)>> = match direction {
2529 Direction::Incoming | Direction::Both => {
2530 if let Some(ref adj) = self.backward_adj {
2531 Box::new(adj.edges_from(node).into_iter())
2532 } else {
2533 Box::new(std::iter::empty())
2534 }
2535 }
2536 Direction::Outgoing => Box::new(std::iter::empty()),
2537 };
2538
2539 forward.chain(backward)
2540 }
2541
2542 pub fn edges_to(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
2555 if let Some(ref backward) = self.backward_adj {
2556 backward.edges_from(node)
2557 } else {
2558 self.all_edges()
2560 .filter_map(|edge| {
2561 if edge.dst == node {
2562 Some((edge.src, edge.id))
2563 } else {
2564 None
2565 }
2566 })
2567 .collect()
2568 }
2569 }
2570
2571 #[must_use]
2575 pub fn out_degree(&self, node: NodeId) -> usize {
2576 self.forward_adj.out_degree(node)
2577 }
2578
2579 #[must_use]
2584 pub fn in_degree(&self, node: NodeId) -> usize {
2585 if let Some(ref backward) = self.backward_adj {
2586 backward.in_degree(node)
2587 } else {
2588 self.all_edges().filter(|edge| edge.dst == node).count()
2590 }
2591 }
2592
2593 #[must_use]
2595 #[cfg(not(feature = "tiered-storage"))]
2596 pub fn edge_type(&self, id: EdgeId) -> Option<ArcStr> {
2597 let edges = self.edges.read();
2598 let chain = edges.get(&id)?;
2599 let epoch = self.current_epoch();
2600 let record = chain.visible_at(epoch)?;
2601 let id_to_type = self.id_to_edge_type.read();
2602 id_to_type.get(record.type_id as usize).cloned()
2603 }
2604
2605 #[must_use]
2608 #[cfg(feature = "tiered-storage")]
2609 pub fn edge_type(&self, id: EdgeId) -> Option<ArcStr> {
2610 let versions = self.edge_versions.read();
2611 let index = versions.get(&id)?;
2612 let epoch = self.current_epoch();
2613 let vref = index.visible_at(epoch)?;
2614 let record = self.read_edge_record(&vref)?;
2615 let id_to_type = self.id_to_edge_type.read();
2616 id_to_type.get(record.type_id as usize).cloned()
2617 }
2618
2619 pub fn nodes_by_label(&self, label: &str) -> Vec<NodeId> {
2625 let label_to_id = self.label_to_id.read();
2626 if let Some(&label_id) = label_to_id.get(label) {
2627 let index = self.label_index.read();
2628 if let Some(set) = index.get(label_id as usize) {
2629 let mut ids: Vec<NodeId> = set.keys().copied().collect();
2630 ids.sort_unstable();
2631 return ids;
2632 }
2633 }
2634 Vec::new()
2635 }
2636
2637 #[cfg(not(feature = "tiered-storage"))]
2644 pub fn all_nodes(&self) -> impl Iterator<Item = Node> + '_ {
2645 let epoch = self.current_epoch();
2646 let node_ids: Vec<NodeId> = self
2647 .nodes
2648 .read()
2649 .iter()
2650 .filter_map(|(id, chain)| {
2651 chain
2652 .visible_at(epoch)
2653 .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
2654 })
2655 .collect();
2656
2657 node_ids.into_iter().filter_map(move |id| self.get_node(id))
2658 }
2659
2660 #[cfg(feature = "tiered-storage")]
2663 pub fn all_nodes(&self) -> impl Iterator<Item = Node> + '_ {
2664 let node_ids = self.node_ids();
2665 node_ids.into_iter().filter_map(move |id| self.get_node(id))
2666 }
2667
2668 #[cfg(not(feature = "tiered-storage"))]
2673 pub fn all_edges(&self) -> impl Iterator<Item = Edge> + '_ {
2674 let epoch = self.current_epoch();
2675 let edge_ids: Vec<EdgeId> = self
2676 .edges
2677 .read()
2678 .iter()
2679 .filter_map(|(id, chain)| {
2680 chain
2681 .visible_at(epoch)
2682 .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
2683 })
2684 .collect();
2685
2686 edge_ids.into_iter().filter_map(move |id| self.get_edge(id))
2687 }
2688
2689 #[cfg(feature = "tiered-storage")]
2692 pub fn all_edges(&self) -> impl Iterator<Item = Edge> + '_ {
2693 let epoch = self.current_epoch();
2694 let versions = self.edge_versions.read();
2695 let edge_ids: Vec<EdgeId> = versions
2696 .iter()
2697 .filter_map(|(id, index)| {
2698 index.visible_at(epoch).and_then(|vref| {
2699 self.read_edge_record(&vref)
2700 .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
2701 })
2702 })
2703 .collect();
2704
2705 edge_ids.into_iter().filter_map(move |id| self.get_edge(id))
2706 }
2707
2708 pub fn all_labels(&self) -> Vec<String> {
2710 self.id_to_label
2711 .read()
2712 .iter()
2713 .map(|s| s.to_string())
2714 .collect()
2715 }
2716
2717 pub fn all_edge_types(&self) -> Vec<String> {
2719 self.id_to_edge_type
2720 .read()
2721 .iter()
2722 .map(|s| s.to_string())
2723 .collect()
2724 }
2725
2726 pub fn all_property_keys(&self) -> Vec<String> {
2728 let mut keys = std::collections::HashSet::new();
2729 for key in self.node_properties.keys() {
2730 keys.insert(key.to_string());
2731 }
2732 for key in self.edge_properties.keys() {
2733 keys.insert(key.to_string());
2734 }
2735 keys.into_iter().collect()
2736 }
2737
2738 pub fn nodes_with_label<'a>(&'a self, label: &str) -> impl Iterator<Item = Node> + 'a {
2740 let node_ids = self.nodes_by_label(label);
2741 node_ids.into_iter().filter_map(move |id| self.get_node(id))
2742 }
2743
2744 #[cfg(not(feature = "tiered-storage"))]
2746 pub fn edges_with_type<'a>(&'a self, edge_type: &str) -> impl Iterator<Item = Edge> + 'a {
2747 let epoch = self.current_epoch();
2748 let type_to_id = self.edge_type_to_id.read();
2749
2750 if let Some(&type_id) = type_to_id.get(edge_type) {
2751 let edge_ids: Vec<EdgeId> = self
2752 .edges
2753 .read()
2754 .iter()
2755 .filter_map(|(id, chain)| {
2756 chain.visible_at(epoch).and_then(|r| {
2757 if !r.is_deleted() && r.type_id == type_id {
2758 Some(*id)
2759 } else {
2760 None
2761 }
2762 })
2763 })
2764 .collect();
2765
2766 Box::new(edge_ids.into_iter().filter_map(move |id| self.get_edge(id)))
2768 as Box<dyn Iterator<Item = Edge> + 'a>
2769 } else {
2770 Box::new(std::iter::empty()) as Box<dyn Iterator<Item = Edge> + 'a>
2772 }
2773 }
2774
2775 #[cfg(feature = "tiered-storage")]
2778 pub fn edges_with_type<'a>(&'a self, edge_type: &str) -> impl Iterator<Item = Edge> + 'a {
2779 let epoch = self.current_epoch();
2780 let type_to_id = self.edge_type_to_id.read();
2781
2782 if let Some(&type_id) = type_to_id.get(edge_type) {
2783 let versions = self.edge_versions.read();
2784 let edge_ids: Vec<EdgeId> = versions
2785 .iter()
2786 .filter_map(|(id, index)| {
2787 index.visible_at(epoch).and_then(|vref| {
2788 self.read_edge_record(&vref).and_then(|r| {
2789 if !r.is_deleted() && r.type_id == type_id {
2790 Some(*id)
2791 } else {
2792 None
2793 }
2794 })
2795 })
2796 })
2797 .collect();
2798
2799 Box::new(edge_ids.into_iter().filter_map(move |id| self.get_edge(id)))
2800 as Box<dyn Iterator<Item = Edge> + 'a>
2801 } else {
2802 Box::new(std::iter::empty()) as Box<dyn Iterator<Item = Edge> + 'a>
2803 }
2804 }
2805
2806 #[must_use]
2813 pub fn node_property_might_match(
2814 &self,
2815 property: &PropertyKey,
2816 op: CompareOp,
2817 value: &Value,
2818 ) -> bool {
2819 self.node_properties.might_match(property, op, value)
2820 }
2821
2822 #[must_use]
2824 pub fn edge_property_might_match(
2825 &self,
2826 property: &PropertyKey,
2827 op: CompareOp,
2828 value: &Value,
2829 ) -> bool {
2830 self.edge_properties.might_match(property, op, value)
2831 }
2832
2833 #[must_use]
2835 pub fn node_property_zone_map(&self, property: &PropertyKey) -> Option<ZoneMapEntry> {
2836 self.node_properties.zone_map(property)
2837 }
2838
2839 #[must_use]
2841 pub fn edge_property_zone_map(&self, property: &PropertyKey) -> Option<ZoneMapEntry> {
2842 self.edge_properties.zone_map(property)
2843 }
2844
2845 pub fn rebuild_zone_maps(&self) {
2847 self.node_properties.rebuild_zone_maps();
2848 self.edge_properties.rebuild_zone_maps();
2849 }
2850
2851 #[must_use]
2855 pub fn statistics(&self) -> Arc<Statistics> {
2856 Arc::clone(&self.statistics.read())
2857 }
2858
2859 pub fn ensure_statistics_fresh(&self) {
2864 if self.needs_stats_recompute.swap(false, Ordering::Relaxed) {
2865 self.compute_statistics();
2866 }
2867 }
2868
2869 #[cfg(not(feature = "tiered-storage"))]
2874 pub fn compute_statistics(&self) {
2875 let mut stats = Statistics::new();
2876
2877 stats.total_nodes = self.node_count() as u64;
2879 stats.total_edges = self.edge_count() as u64;
2880
2881 let id_to_label = self.id_to_label.read();
2883 let label_index = self.label_index.read();
2884
2885 for (label_id, label_name) in id_to_label.iter().enumerate() {
2886 let node_count = label_index.get(label_id).map_or(0, |set| set.len() as u64);
2887
2888 if node_count > 0 {
2889 let avg_out_degree = if stats.total_nodes > 0 {
2891 stats.total_edges as f64 / stats.total_nodes as f64
2892 } else {
2893 0.0
2894 };
2895
2896 let label_stats =
2897 LabelStatistics::new(node_count).with_degrees(avg_out_degree, avg_out_degree);
2898
2899 stats.update_label(label_name.as_ref(), label_stats);
2900 }
2901 }
2902
2903 let id_to_edge_type = self.id_to_edge_type.read();
2905 let edges = self.edges.read();
2906 let epoch = self.current_epoch();
2907
2908 let mut edge_type_counts: FxHashMap<u32, u64> = FxHashMap::default();
2909 for chain in edges.values() {
2910 if let Some(record) = chain.visible_at(epoch)
2911 && !record.is_deleted()
2912 {
2913 *edge_type_counts.entry(record.type_id).or_default() += 1;
2914 }
2915 }
2916
2917 for (type_id, count) in edge_type_counts {
2918 if let Some(type_name) = id_to_edge_type.get(type_id as usize) {
2919 let avg_degree = if stats.total_nodes > 0 {
2920 count as f64 / stats.total_nodes as f64
2921 } else {
2922 0.0
2923 };
2924
2925 let edge_stats = EdgeTypeStatistics::new(count, avg_degree, avg_degree);
2926 stats.update_edge_type(type_name.as_ref(), edge_stats);
2927 }
2928 }
2929
2930 *self.statistics.write() = Arc::new(stats);
2931 }
2932
2933 #[cfg(feature = "tiered-storage")]
2936 pub fn compute_statistics(&self) {
2937 let mut stats = Statistics::new();
2938
2939 stats.total_nodes = self.node_count() as u64;
2941 stats.total_edges = self.edge_count() as u64;
2942
2943 let id_to_label = self.id_to_label.read();
2945 let label_index = self.label_index.read();
2946
2947 for (label_id, label_name) in id_to_label.iter().enumerate() {
2948 let node_count = label_index.get(label_id).map_or(0, |set| set.len() as u64);
2949
2950 if node_count > 0 {
2951 let avg_out_degree = if stats.total_nodes > 0 {
2952 stats.total_edges as f64 / stats.total_nodes as f64
2953 } else {
2954 0.0
2955 };
2956
2957 let label_stats =
2958 LabelStatistics::new(node_count).with_degrees(avg_out_degree, avg_out_degree);
2959
2960 stats.update_label(label_name.as_ref(), label_stats);
2961 }
2962 }
2963
2964 let id_to_edge_type = self.id_to_edge_type.read();
2966 let versions = self.edge_versions.read();
2967 let epoch = self.current_epoch();
2968
2969 let mut edge_type_counts: FxHashMap<u32, u64> = FxHashMap::default();
2970 for index in versions.values() {
2971 if let Some(vref) = index.visible_at(epoch)
2972 && let Some(record) = self.read_edge_record(&vref)
2973 && !record.is_deleted()
2974 {
2975 *edge_type_counts.entry(record.type_id).or_default() += 1;
2976 }
2977 }
2978
2979 for (type_id, count) in edge_type_counts {
2980 if let Some(type_name) = id_to_edge_type.get(type_id as usize) {
2981 let avg_degree = if stats.total_nodes > 0 {
2982 count as f64 / stats.total_nodes as f64
2983 } else {
2984 0.0
2985 };
2986
2987 let edge_stats = EdgeTypeStatistics::new(count, avg_degree, avg_degree);
2988 stats.update_edge_type(type_name.as_ref(), edge_stats);
2989 }
2990 }
2991
2992 *self.statistics.write() = Arc::new(stats);
2993 }
2994
2995 #[must_use]
2997 pub fn estimate_label_cardinality(&self, label: &str) -> f64 {
2998 self.statistics.read().estimate_label_cardinality(label)
2999 }
3000
3001 #[must_use]
3003 pub fn estimate_avg_degree(&self, edge_type: &str, outgoing: bool) -> f64 {
3004 self.statistics
3005 .read()
3006 .estimate_avg_degree(edge_type, outgoing)
3007 }
3008
3009 fn get_or_create_label_id(&self, label: &str) -> u32 {
3012 {
3013 let label_to_id = self.label_to_id.read();
3014 if let Some(&id) = label_to_id.get(label) {
3015 return id;
3016 }
3017 }
3018
3019 let mut label_to_id = self.label_to_id.write();
3020 let mut id_to_label = self.id_to_label.write();
3021
3022 if let Some(&id) = label_to_id.get(label) {
3024 return id;
3025 }
3026
3027 let id = id_to_label.len() as u32;
3028
3029 let label: ArcStr = label.into();
3030 label_to_id.insert(label.clone(), id);
3031 id_to_label.push(label);
3032
3033 id
3034 }
3035
3036 fn get_or_create_edge_type_id(&self, edge_type: &str) -> u32 {
3037 {
3038 let type_to_id = self.edge_type_to_id.read();
3039 if let Some(&id) = type_to_id.get(edge_type) {
3040 return id;
3041 }
3042 }
3043
3044 let mut type_to_id = self.edge_type_to_id.write();
3045 let mut id_to_type = self.id_to_edge_type.write();
3046
3047 if let Some(&id) = type_to_id.get(edge_type) {
3049 return id;
3050 }
3051
3052 let id = id_to_type.len() as u32;
3053 let edge_type: ArcStr = edge_type.into();
3054 type_to_id.insert(edge_type.clone(), id);
3055 id_to_type.push(edge_type);
3056
3057 id
3058 }
3059
3060 #[cfg(not(feature = "tiered-storage"))]
3067 pub fn create_node_with_id(&self, id: NodeId, labels: &[&str]) {
3068 let epoch = self.current_epoch();
3069 let mut record = NodeRecord::new(id, epoch);
3070 record.set_label_count(labels.len() as u16);
3071
3072 let mut node_label_set = FxHashSet::default();
3074 for label in labels {
3075 let label_id = self.get_or_create_label_id(*label);
3076 node_label_set.insert(label_id);
3077
3078 let mut index = self.label_index.write();
3080 while index.len() <= label_id as usize {
3081 index.push(FxHashMap::default());
3082 }
3083 index[label_id as usize].insert(id, ());
3084 }
3085
3086 self.node_labels.write().insert(id, node_label_set);
3088
3089 let chain = VersionChain::with_initial(record, epoch, TxId::SYSTEM);
3091 self.nodes.write().insert(id, chain);
3092
3093 let id_val = id.as_u64();
3095 let _ = self
3096 .next_node_id
3097 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
3098 if id_val >= current {
3099 Some(id_val + 1)
3100 } else {
3101 None
3102 }
3103 });
3104 }
3105
3106 #[cfg(feature = "tiered-storage")]
3109 pub fn create_node_with_id(&self, id: NodeId, labels: &[&str]) {
3110 let epoch = self.current_epoch();
3111 let mut record = NodeRecord::new(id, epoch);
3112 record.set_label_count(labels.len() as u16);
3113
3114 let mut node_label_set = FxHashSet::default();
3116 for label in labels {
3117 let label_id = self.get_or_create_label_id(*label);
3118 node_label_set.insert(label_id);
3119
3120 let mut index = self.label_index.write();
3122 while index.len() <= label_id as usize {
3123 index.push(FxHashMap::default());
3124 }
3125 index[label_id as usize].insert(id, ());
3126 }
3127
3128 self.node_labels.write().insert(id, node_label_set);
3130
3131 let arena = self.arena_allocator.arena_or_create(epoch);
3133 let (offset, _stored) = arena.alloc_value_with_offset(record);
3134
3135 let hot_ref = HotVersionRef::new(epoch, offset, TxId::SYSTEM);
3137 let mut versions = self.node_versions.write();
3138 versions.insert(id, VersionIndex::with_initial(hot_ref));
3139
3140 let id_val = id.as_u64();
3142 let _ = self
3143 .next_node_id
3144 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
3145 if id_val >= current {
3146 Some(id_val + 1)
3147 } else {
3148 None
3149 }
3150 });
3151 }
3152
3153 #[cfg(not(feature = "tiered-storage"))]
3157 pub fn create_edge_with_id(&self, id: EdgeId, src: NodeId, dst: NodeId, edge_type: &str) {
3158 let epoch = self.current_epoch();
3159 let type_id = self.get_or_create_edge_type_id(edge_type);
3160
3161 let record = EdgeRecord::new(id, src, dst, type_id, epoch);
3162 let chain = VersionChain::with_initial(record, epoch, TxId::SYSTEM);
3163 self.edges.write().insert(id, chain);
3164
3165 self.forward_adj.add_edge(src, dst, id);
3167 if let Some(ref backward) = self.backward_adj {
3168 backward.add_edge(dst, src, id);
3169 }
3170
3171 let id_val = id.as_u64();
3173 let _ = self
3174 .next_edge_id
3175 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
3176 if id_val >= current {
3177 Some(id_val + 1)
3178 } else {
3179 None
3180 }
3181 });
3182 }
3183
3184 #[cfg(feature = "tiered-storage")]
3187 pub fn create_edge_with_id(&self, id: EdgeId, src: NodeId, dst: NodeId, edge_type: &str) {
3188 let epoch = self.current_epoch();
3189 let type_id = self.get_or_create_edge_type_id(edge_type);
3190
3191 let record = EdgeRecord::new(id, src, dst, type_id, epoch);
3192
3193 let arena = self.arena_allocator.arena_or_create(epoch);
3195 let (offset, _stored) = arena.alloc_value_with_offset(record);
3196
3197 let hot_ref = HotVersionRef::new(epoch, offset, TxId::SYSTEM);
3199 let mut versions = self.edge_versions.write();
3200 versions.insert(id, VersionIndex::with_initial(hot_ref));
3201
3202 self.forward_adj.add_edge(src, dst, id);
3204 if let Some(ref backward) = self.backward_adj {
3205 backward.add_edge(dst, src, id);
3206 }
3207
3208 let id_val = id.as_u64();
3210 let _ = self
3211 .next_edge_id
3212 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
3213 if id_val >= current {
3214 Some(id_val + 1)
3215 } else {
3216 None
3217 }
3218 });
3219 }
3220
3221 pub fn set_epoch(&self, epoch: EpochId) {
3223 self.current_epoch.store(epoch.as_u64(), Ordering::SeqCst);
3224 }
3225}
3226
3227impl Default for LpgStore {
3228 fn default() -> Self {
3229 Self::new()
3230 }
3231}
3232
3233#[cfg(test)]
3234mod tests {
3235 use super::*;
3236
3237 #[test]
3238 fn test_create_node() {
3239 let store = LpgStore::new();
3240
3241 let id = store.create_node(&["Person"]);
3242 assert!(id.is_valid());
3243
3244 let node = store.get_node(id).unwrap();
3245 assert!(node.has_label("Person"));
3246 assert!(!node.has_label("Animal"));
3247 }
3248
3249 #[test]
3250 fn test_create_node_with_props() {
3251 let store = LpgStore::new();
3252
3253 let id = store.create_node_with_props(
3254 &["Person"],
3255 [("name", Value::from("Alice")), ("age", Value::from(30i64))],
3256 );
3257
3258 let node = store.get_node(id).unwrap();
3259 assert_eq!(
3260 node.get_property("name").and_then(|v| v.as_str()),
3261 Some("Alice")
3262 );
3263 assert_eq!(
3264 node.get_property("age").and_then(|v| v.as_int64()),
3265 Some(30)
3266 );
3267 }
3268
3269 #[test]
3270 fn test_delete_node() {
3271 let store = LpgStore::new();
3272
3273 let id = store.create_node(&["Person"]);
3274 assert_eq!(store.node_count(), 1);
3275
3276 assert!(store.delete_node(id));
3277 assert_eq!(store.node_count(), 0);
3278 assert!(store.get_node(id).is_none());
3279
3280 assert!(!store.delete_node(id));
3282 }
3283
3284 #[test]
3285 fn test_create_edge() {
3286 let store = LpgStore::new();
3287
3288 let alice = store.create_node(&["Person"]);
3289 let bob = store.create_node(&["Person"]);
3290
3291 let edge_id = store.create_edge(alice, bob, "KNOWS");
3292 assert!(edge_id.is_valid());
3293
3294 let edge = store.get_edge(edge_id).unwrap();
3295 assert_eq!(edge.src, alice);
3296 assert_eq!(edge.dst, bob);
3297 assert_eq!(edge.edge_type.as_str(), "KNOWS");
3298 }
3299
3300 #[test]
3301 fn test_neighbors() {
3302 let store = LpgStore::new();
3303
3304 let a = store.create_node(&["Person"]);
3305 let b = store.create_node(&["Person"]);
3306 let c = store.create_node(&["Person"]);
3307
3308 store.create_edge(a, b, "KNOWS");
3309 store.create_edge(a, c, "KNOWS");
3310
3311 let outgoing: Vec<_> = store.neighbors(a, Direction::Outgoing).collect();
3312 assert_eq!(outgoing.len(), 2);
3313 assert!(outgoing.contains(&b));
3314 assert!(outgoing.contains(&c));
3315
3316 let incoming: Vec<_> = store.neighbors(b, Direction::Incoming).collect();
3317 assert_eq!(incoming.len(), 1);
3318 assert!(incoming.contains(&a));
3319 }
3320
3321 #[test]
3322 fn test_nodes_by_label() {
3323 let store = LpgStore::new();
3324
3325 let p1 = store.create_node(&["Person"]);
3326 let p2 = store.create_node(&["Person"]);
3327 let _a = store.create_node(&["Animal"]);
3328
3329 let persons = store.nodes_by_label("Person");
3330 assert_eq!(persons.len(), 2);
3331 assert!(persons.contains(&p1));
3332 assert!(persons.contains(&p2));
3333
3334 let animals = store.nodes_by_label("Animal");
3335 assert_eq!(animals.len(), 1);
3336 }
3337
3338 #[test]
3339 fn test_delete_edge() {
3340 let store = LpgStore::new();
3341
3342 let a = store.create_node(&["Person"]);
3343 let b = store.create_node(&["Person"]);
3344 let edge_id = store.create_edge(a, b, "KNOWS");
3345
3346 assert_eq!(store.edge_count(), 1);
3347
3348 assert!(store.delete_edge(edge_id));
3349 assert_eq!(store.edge_count(), 0);
3350 assert!(store.get_edge(edge_id).is_none());
3351 }
3352
3353 #[test]
3356 fn test_lpg_store_config() {
3357 let config = LpgStoreConfig {
3359 backward_edges: false,
3360 initial_node_capacity: 100,
3361 initial_edge_capacity: 200,
3362 };
3363 let store = LpgStore::with_config(config);
3364
3365 let a = store.create_node(&["Person"]);
3367 let b = store.create_node(&["Person"]);
3368 store.create_edge(a, b, "KNOWS");
3369
3370 let outgoing: Vec<_> = store.neighbors(a, Direction::Outgoing).collect();
3372 assert_eq!(outgoing.len(), 1);
3373
3374 let incoming: Vec<_> = store.neighbors(b, Direction::Incoming).collect();
3376 assert_eq!(incoming.len(), 0);
3377 }
3378
3379 #[test]
3380 fn test_epoch_management() {
3381 let store = LpgStore::new();
3382
3383 let epoch0 = store.current_epoch();
3384 assert_eq!(epoch0.as_u64(), 0);
3385
3386 let epoch1 = store.new_epoch();
3387 assert_eq!(epoch1.as_u64(), 1);
3388
3389 let current = store.current_epoch();
3390 assert_eq!(current.as_u64(), 1);
3391 }
3392
3393 #[test]
3394 fn test_node_properties() {
3395 let store = LpgStore::new();
3396 let id = store.create_node(&["Person"]);
3397
3398 store.set_node_property(id, "name", Value::from("Alice"));
3400 let name = store.get_node_property(id, &"name".into());
3401 assert!(matches!(name, Some(Value::String(s)) if s.as_str() == "Alice"));
3402
3403 store.set_node_property(id, "name", Value::from("Bob"));
3405 let name = store.get_node_property(id, &"name".into());
3406 assert!(matches!(name, Some(Value::String(s)) if s.as_str() == "Bob"));
3407
3408 let old = store.remove_node_property(id, "name");
3410 assert!(matches!(old, Some(Value::String(s)) if s.as_str() == "Bob"));
3411
3412 let name = store.get_node_property(id, &"name".into());
3414 assert!(name.is_none());
3415
3416 let none = store.remove_node_property(id, "nonexistent");
3418 assert!(none.is_none());
3419 }
3420
3421 #[test]
3422 fn test_edge_properties() {
3423 let store = LpgStore::new();
3424 let a = store.create_node(&["Person"]);
3425 let b = store.create_node(&["Person"]);
3426 let edge_id = store.create_edge(a, b, "KNOWS");
3427
3428 store.set_edge_property(edge_id, "since", Value::from(2020i64));
3430 let since = store.get_edge_property(edge_id, &"since".into());
3431 assert_eq!(since.and_then(|v| v.as_int64()), Some(2020));
3432
3433 let old = store.remove_edge_property(edge_id, "since");
3435 assert_eq!(old.and_then(|v| v.as_int64()), Some(2020));
3436
3437 let since = store.get_edge_property(edge_id, &"since".into());
3438 assert!(since.is_none());
3439 }
3440
3441 #[test]
3442 fn test_add_remove_label() {
3443 let store = LpgStore::new();
3444 let id = store.create_node(&["Person"]);
3445
3446 assert!(store.add_label(id, "Employee"));
3448
3449 let node = store.get_node(id).unwrap();
3450 assert!(node.has_label("Person"));
3451 assert!(node.has_label("Employee"));
3452
3453 assert!(!store.add_label(id, "Employee"));
3455
3456 assert!(store.remove_label(id, "Employee"));
3458
3459 let node = store.get_node(id).unwrap();
3460 assert!(node.has_label("Person"));
3461 assert!(!node.has_label("Employee"));
3462
3463 assert!(!store.remove_label(id, "Employee"));
3465 assert!(!store.remove_label(id, "NonExistent"));
3466 }
3467
3468 #[test]
3469 fn test_add_label_to_nonexistent_node() {
3470 let store = LpgStore::new();
3471 let fake_id = NodeId::new(999);
3472 assert!(!store.add_label(fake_id, "Label"));
3473 }
3474
3475 #[test]
3476 fn test_remove_label_from_nonexistent_node() {
3477 let store = LpgStore::new();
3478 let fake_id = NodeId::new(999);
3479 assert!(!store.remove_label(fake_id, "Label"));
3480 }
3481
3482 #[test]
3483 fn test_node_ids() {
3484 let store = LpgStore::new();
3485
3486 let n1 = store.create_node(&["Person"]);
3487 let n2 = store.create_node(&["Person"]);
3488 let n3 = store.create_node(&["Person"]);
3489
3490 let ids = store.node_ids();
3491 assert_eq!(ids.len(), 3);
3492 assert!(ids.contains(&n1));
3493 assert!(ids.contains(&n2));
3494 assert!(ids.contains(&n3));
3495
3496 store.delete_node(n2);
3498 let ids = store.node_ids();
3499 assert_eq!(ids.len(), 2);
3500 assert!(!ids.contains(&n2));
3501 }
3502
3503 #[test]
3504 fn test_delete_node_nonexistent() {
3505 let store = LpgStore::new();
3506 let fake_id = NodeId::new(999);
3507 assert!(!store.delete_node(fake_id));
3508 }
3509
3510 #[test]
3511 fn test_delete_edge_nonexistent() {
3512 let store = LpgStore::new();
3513 let fake_id = EdgeId::new(999);
3514 assert!(!store.delete_edge(fake_id));
3515 }
3516
3517 #[test]
3518 fn test_delete_edge_double() {
3519 let store = LpgStore::new();
3520 let a = store.create_node(&["Person"]);
3521 let b = store.create_node(&["Person"]);
3522 let edge_id = store.create_edge(a, b, "KNOWS");
3523
3524 assert!(store.delete_edge(edge_id));
3525 assert!(!store.delete_edge(edge_id)); }
3527
3528 #[test]
3529 fn test_create_edge_with_props() {
3530 let store = LpgStore::new();
3531 let a = store.create_node(&["Person"]);
3532 let b = store.create_node(&["Person"]);
3533
3534 let edge_id = store.create_edge_with_props(
3535 a,
3536 b,
3537 "KNOWS",
3538 [
3539 ("since", Value::from(2020i64)),
3540 ("weight", Value::from(1.0)),
3541 ],
3542 );
3543
3544 let edge = store.get_edge(edge_id).unwrap();
3545 assert_eq!(
3546 edge.get_property("since").and_then(|v| v.as_int64()),
3547 Some(2020)
3548 );
3549 assert_eq!(
3550 edge.get_property("weight").and_then(|v| v.as_float64()),
3551 Some(1.0)
3552 );
3553 }
3554
3555 #[test]
3556 fn test_delete_node_edges() {
3557 let store = LpgStore::new();
3558
3559 let a = store.create_node(&["Person"]);
3560 let b = store.create_node(&["Person"]);
3561 let c = store.create_node(&["Person"]);
3562
3563 store.create_edge(a, b, "KNOWS"); store.create_edge(c, a, "KNOWS"); assert_eq!(store.edge_count(), 2);
3567
3568 store.delete_node_edges(a);
3570
3571 assert_eq!(store.edge_count(), 0);
3572 }
3573
3574 #[test]
3575 fn test_neighbors_both_directions() {
3576 let store = LpgStore::new();
3577
3578 let a = store.create_node(&["Person"]);
3579 let b = store.create_node(&["Person"]);
3580 let c = store.create_node(&["Person"]);
3581
3582 store.create_edge(a, b, "KNOWS"); store.create_edge(c, a, "KNOWS"); let neighbors: Vec<_> = store.neighbors(a, Direction::Both).collect();
3587 assert_eq!(neighbors.len(), 2);
3588 assert!(neighbors.contains(&b)); assert!(neighbors.contains(&c)); }
3591
3592 #[test]
3593 fn test_edges_from() {
3594 let store = LpgStore::new();
3595
3596 let a = store.create_node(&["Person"]);
3597 let b = store.create_node(&["Person"]);
3598 let c = store.create_node(&["Person"]);
3599
3600 let e1 = store.create_edge(a, b, "KNOWS");
3601 let e2 = store.create_edge(a, c, "KNOWS");
3602
3603 let edges: Vec<_> = store.edges_from(a, Direction::Outgoing).collect();
3604 assert_eq!(edges.len(), 2);
3605 assert!(edges.iter().any(|(_, e)| *e == e1));
3606 assert!(edges.iter().any(|(_, e)| *e == e2));
3607
3608 let incoming: Vec<_> = store.edges_from(b, Direction::Incoming).collect();
3610 assert_eq!(incoming.len(), 1);
3611 assert_eq!(incoming[0].1, e1);
3612 }
3613
3614 #[test]
3615 fn test_edges_to() {
3616 let store = LpgStore::new();
3617
3618 let a = store.create_node(&["Person"]);
3619 let b = store.create_node(&["Person"]);
3620 let c = store.create_node(&["Person"]);
3621
3622 let e1 = store.create_edge(a, b, "KNOWS");
3623 let e2 = store.create_edge(c, b, "KNOWS");
3624
3625 let to_b = store.edges_to(b);
3627 assert_eq!(to_b.len(), 2);
3628 assert!(to_b.iter().any(|(src, e)| *src == a && *e == e1));
3629 assert!(to_b.iter().any(|(src, e)| *src == c && *e == e2));
3630 }
3631
3632 #[test]
3633 fn test_out_degree_in_degree() {
3634 let store = LpgStore::new();
3635
3636 let a = store.create_node(&["Person"]);
3637 let b = store.create_node(&["Person"]);
3638 let c = store.create_node(&["Person"]);
3639
3640 store.create_edge(a, b, "KNOWS");
3641 store.create_edge(a, c, "KNOWS");
3642 store.create_edge(c, b, "KNOWS");
3643
3644 assert_eq!(store.out_degree(a), 2);
3645 assert_eq!(store.out_degree(b), 0);
3646 assert_eq!(store.out_degree(c), 1);
3647
3648 assert_eq!(store.in_degree(a), 0);
3649 assert_eq!(store.in_degree(b), 2);
3650 assert_eq!(store.in_degree(c), 1);
3651 }
3652
3653 #[test]
3654 fn test_edge_type() {
3655 let store = LpgStore::new();
3656
3657 let a = store.create_node(&["Person"]);
3658 let b = store.create_node(&["Person"]);
3659 let edge_id = store.create_edge(a, b, "KNOWS");
3660
3661 let edge_type = store.edge_type(edge_id);
3662 assert_eq!(edge_type.as_deref(), Some("KNOWS"));
3663
3664 let fake_id = EdgeId::new(999);
3666 assert!(store.edge_type(fake_id).is_none());
3667 }
3668
3669 #[test]
3670 fn test_count_methods() {
3671 let store = LpgStore::new();
3672
3673 assert_eq!(store.label_count(), 0);
3674 assert_eq!(store.edge_type_count(), 0);
3675 assert_eq!(store.property_key_count(), 0);
3676
3677 let a = store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
3678 let b = store.create_node(&["Company"]);
3679 store.create_edge_with_props(a, b, "WORKS_AT", [("since", Value::from(2020i64))]);
3680
3681 assert_eq!(store.label_count(), 2); assert_eq!(store.edge_type_count(), 1); assert_eq!(store.property_key_count(), 2); }
3685
3686 #[test]
3687 fn test_all_nodes_and_edges() {
3688 let store = LpgStore::new();
3689
3690 let a = store.create_node(&["Person"]);
3691 let b = store.create_node(&["Person"]);
3692 store.create_edge(a, b, "KNOWS");
3693
3694 let nodes: Vec<_> = store.all_nodes().collect();
3695 assert_eq!(nodes.len(), 2);
3696
3697 let edges: Vec<_> = store.all_edges().collect();
3698 assert_eq!(edges.len(), 1);
3699 }
3700
3701 #[test]
3702 fn test_all_labels_and_edge_types() {
3703 let store = LpgStore::new();
3704
3705 store.create_node(&["Person"]);
3706 store.create_node(&["Company"]);
3707 let a = store.create_node(&["Animal"]);
3708 let b = store.create_node(&["Animal"]);
3709 store.create_edge(a, b, "EATS");
3710
3711 let labels = store.all_labels();
3712 assert_eq!(labels.len(), 3);
3713 assert!(labels.contains(&"Person".to_string()));
3714 assert!(labels.contains(&"Company".to_string()));
3715 assert!(labels.contains(&"Animal".to_string()));
3716
3717 let edge_types = store.all_edge_types();
3718 assert_eq!(edge_types.len(), 1);
3719 assert!(edge_types.contains(&"EATS".to_string()));
3720 }
3721
3722 #[test]
3723 fn test_all_property_keys() {
3724 let store = LpgStore::new();
3725
3726 let a = store.create_node_with_props(&["Person"], [("name", Value::from("Alice"))]);
3727 let b = store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
3728 store.create_edge_with_props(a, b, "KNOWS", [("since", Value::from(2020i64))]);
3729
3730 let keys = store.all_property_keys();
3731 assert!(keys.contains(&"name".to_string()));
3732 assert!(keys.contains(&"age".to_string()));
3733 assert!(keys.contains(&"since".to_string()));
3734 }
3735
3736 #[test]
3737 fn test_nodes_with_label() {
3738 let store = LpgStore::new();
3739
3740 store.create_node(&["Person"]);
3741 store.create_node(&["Person"]);
3742 store.create_node(&["Company"]);
3743
3744 let persons: Vec<_> = store.nodes_with_label("Person").collect();
3745 assert_eq!(persons.len(), 2);
3746
3747 let companies: Vec<_> = store.nodes_with_label("Company").collect();
3748 assert_eq!(companies.len(), 1);
3749
3750 let none: Vec<_> = store.nodes_with_label("NonExistent").collect();
3751 assert_eq!(none.len(), 0);
3752 }
3753
3754 #[test]
3755 fn test_edges_with_type() {
3756 let store = LpgStore::new();
3757
3758 let a = store.create_node(&["Person"]);
3759 let b = store.create_node(&["Person"]);
3760 let c = store.create_node(&["Company"]);
3761
3762 store.create_edge(a, b, "KNOWS");
3763 store.create_edge(a, c, "WORKS_AT");
3764
3765 let knows: Vec<_> = store.edges_with_type("KNOWS").collect();
3766 assert_eq!(knows.len(), 1);
3767
3768 let works_at: Vec<_> = store.edges_with_type("WORKS_AT").collect();
3769 assert_eq!(works_at.len(), 1);
3770
3771 let none: Vec<_> = store.edges_with_type("NonExistent").collect();
3772 assert_eq!(none.len(), 0);
3773 }
3774
3775 #[test]
3776 fn test_nodes_by_label_nonexistent() {
3777 let store = LpgStore::new();
3778 store.create_node(&["Person"]);
3779
3780 let empty = store.nodes_by_label("NonExistent");
3781 assert!(empty.is_empty());
3782 }
3783
3784 #[test]
3785 fn test_statistics() {
3786 let store = LpgStore::new();
3787
3788 let a = store.create_node(&["Person"]);
3789 let b = store.create_node(&["Person"]);
3790 let c = store.create_node(&["Company"]);
3791
3792 store.create_edge(a, b, "KNOWS");
3793 store.create_edge(a, c, "WORKS_AT");
3794
3795 store.compute_statistics();
3796 let stats = store.statistics();
3797
3798 assert_eq!(stats.total_nodes, 3);
3799 assert_eq!(stats.total_edges, 2);
3800
3801 let person_card = store.estimate_label_cardinality("Person");
3803 assert!(person_card > 0.0);
3804
3805 let avg_degree = store.estimate_avg_degree("KNOWS", true);
3806 assert!(avg_degree >= 0.0);
3807 }
3808
3809 #[test]
3810 fn test_zone_maps() {
3811 let store = LpgStore::new();
3812
3813 store.create_node_with_props(&["Person"], [("age", Value::from(25i64))]);
3814 store.create_node_with_props(&["Person"], [("age", Value::from(35i64))]);
3815
3816 let might_match =
3818 store.node_property_might_match(&"age".into(), CompareOp::Eq, &Value::from(30i64));
3819 assert!(might_match);
3821
3822 let zone = store.node_property_zone_map(&"age".into());
3823 assert!(zone.is_some());
3824
3825 let no_zone = store.node_property_zone_map(&"nonexistent".into());
3827 assert!(no_zone.is_none());
3828
3829 let a = store.create_node(&["A"]);
3831 let b = store.create_node(&["B"]);
3832 store.create_edge_with_props(a, b, "REL", [("weight", Value::from(1.0))]);
3833
3834 let edge_zone = store.edge_property_zone_map(&"weight".into());
3835 assert!(edge_zone.is_some());
3836 }
3837
3838 #[test]
3839 fn test_rebuild_zone_maps() {
3840 let store = LpgStore::new();
3841 store.create_node_with_props(&["Person"], [("age", Value::from(25i64))]);
3842
3843 store.rebuild_zone_maps();
3845 }
3846
3847 #[test]
3848 fn test_create_node_with_id() {
3849 let store = LpgStore::new();
3850
3851 let specific_id = NodeId::new(100);
3852 store.create_node_with_id(specific_id, &["Person", "Employee"]);
3853
3854 let node = store.get_node(specific_id).unwrap();
3855 assert!(node.has_label("Person"));
3856 assert!(node.has_label("Employee"));
3857
3858 let next = store.create_node(&["Other"]);
3860 assert!(next.as_u64() > 100);
3861 }
3862
3863 #[test]
3864 fn test_create_edge_with_id() {
3865 let store = LpgStore::new();
3866
3867 let a = store.create_node(&["A"]);
3868 let b = store.create_node(&["B"]);
3869
3870 let specific_id = EdgeId::new(500);
3871 store.create_edge_with_id(specific_id, a, b, "REL");
3872
3873 let edge = store.get_edge(specific_id).unwrap();
3874 assert_eq!(edge.src, a);
3875 assert_eq!(edge.dst, b);
3876 assert_eq!(edge.edge_type.as_str(), "REL");
3877
3878 let next = store.create_edge(a, b, "OTHER");
3880 assert!(next.as_u64() > 500);
3881 }
3882
3883 #[test]
3884 fn test_set_epoch() {
3885 let store = LpgStore::new();
3886
3887 assert_eq!(store.current_epoch().as_u64(), 0);
3888
3889 store.set_epoch(EpochId::new(42));
3890 assert_eq!(store.current_epoch().as_u64(), 42);
3891 }
3892
3893 #[test]
3894 fn test_get_node_nonexistent() {
3895 let store = LpgStore::new();
3896 let fake_id = NodeId::new(999);
3897 assert!(store.get_node(fake_id).is_none());
3898 }
3899
3900 #[test]
3901 fn test_get_edge_nonexistent() {
3902 let store = LpgStore::new();
3903 let fake_id = EdgeId::new(999);
3904 assert!(store.get_edge(fake_id).is_none());
3905 }
3906
3907 #[test]
3908 fn test_multiple_labels() {
3909 let store = LpgStore::new();
3910
3911 let id = store.create_node(&["Person", "Employee", "Manager"]);
3912 let node = store.get_node(id).unwrap();
3913
3914 assert!(node.has_label("Person"));
3915 assert!(node.has_label("Employee"));
3916 assert!(node.has_label("Manager"));
3917 assert!(!node.has_label("Other"));
3918 }
3919
3920 #[test]
3921 fn test_default_impl() {
3922 let store: LpgStore = Default::default();
3923 assert_eq!(store.node_count(), 0);
3924 assert_eq!(store.edge_count(), 0);
3925 }
3926
3927 #[test]
3928 fn test_edges_from_both_directions() {
3929 let store = LpgStore::new();
3930
3931 let a = store.create_node(&["A"]);
3932 let b = store.create_node(&["B"]);
3933 let c = store.create_node(&["C"]);
3934
3935 let e1 = store.create_edge(a, b, "R1"); let e2 = store.create_edge(c, a, "R2"); let edges: Vec<_> = store.edges_from(a, Direction::Both).collect();
3940 assert_eq!(edges.len(), 2);
3941 assert!(edges.iter().any(|(_, e)| *e == e1)); assert!(edges.iter().any(|(_, e)| *e == e2)); }
3944
3945 #[test]
3946 fn test_no_backward_adj_in_degree() {
3947 let config = LpgStoreConfig {
3948 backward_edges: false,
3949 initial_node_capacity: 10,
3950 initial_edge_capacity: 10,
3951 };
3952 let store = LpgStore::with_config(config);
3953
3954 let a = store.create_node(&["A"]);
3955 let b = store.create_node(&["B"]);
3956 store.create_edge(a, b, "R");
3957
3958 let degree = store.in_degree(b);
3960 assert_eq!(degree, 1);
3961 }
3962
3963 #[test]
3964 fn test_no_backward_adj_edges_to() {
3965 let config = LpgStoreConfig {
3966 backward_edges: false,
3967 initial_node_capacity: 10,
3968 initial_edge_capacity: 10,
3969 };
3970 let store = LpgStore::with_config(config);
3971
3972 let a = store.create_node(&["A"]);
3973 let b = store.create_node(&["B"]);
3974 let e = store.create_edge(a, b, "R");
3975
3976 let edges = store.edges_to(b);
3978 assert_eq!(edges.len(), 1);
3979 assert_eq!(edges[0].1, e);
3980 }
3981
3982 #[test]
3983 fn test_node_versioned_creation() {
3984 let store = LpgStore::new();
3985
3986 let epoch = store.new_epoch();
3987 let tx_id = TxId::new(1);
3988
3989 let id = store.create_node_versioned(&["Person"], epoch, tx_id);
3990 assert!(store.get_node(id).is_some());
3991 }
3992
3993 #[test]
3994 fn test_edge_versioned_creation() {
3995 let store = LpgStore::new();
3996
3997 let a = store.create_node(&["A"]);
3998 let b = store.create_node(&["B"]);
3999
4000 let epoch = store.new_epoch();
4001 let tx_id = TxId::new(1);
4002
4003 let edge_id = store.create_edge_versioned(a, b, "REL", epoch, tx_id);
4004 assert!(store.get_edge(edge_id).is_some());
4005 }
4006
4007 #[test]
4008 fn test_node_with_props_versioned() {
4009 let store = LpgStore::new();
4010
4011 let epoch = store.new_epoch();
4012 let tx_id = TxId::new(1);
4013
4014 let id = store.create_node_with_props_versioned(
4015 &["Person"],
4016 [("name", Value::from("Alice"))],
4017 epoch,
4018 tx_id,
4019 );
4020
4021 let node = store.get_node(id).unwrap();
4022 assert_eq!(
4023 node.get_property("name").and_then(|v| v.as_str()),
4024 Some("Alice")
4025 );
4026 }
4027
4028 #[test]
4029 fn test_discard_uncommitted_versions() {
4030 let store = LpgStore::new();
4031
4032 let epoch = store.new_epoch();
4033 let tx_id = TxId::new(42);
4034
4035 let node_id = store.create_node_versioned(&["Person"], epoch, tx_id);
4037 assert!(store.get_node(node_id).is_some());
4038
4039 store.discard_uncommitted_versions(tx_id);
4041
4042 assert!(store.get_node(node_id).is_none());
4044 }
4045
4046 #[test]
4049 fn test_property_index_create_and_lookup() {
4050 let store = LpgStore::new();
4051
4052 let alice = store.create_node(&["Person"]);
4054 let bob = store.create_node(&["Person"]);
4055 let charlie = store.create_node(&["Person"]);
4056
4057 store.set_node_property(alice, "city", Value::from("NYC"));
4058 store.set_node_property(bob, "city", Value::from("NYC"));
4059 store.set_node_property(charlie, "city", Value::from("LA"));
4060
4061 let nyc_people = store.find_nodes_by_property("city", &Value::from("NYC"));
4063 assert_eq!(nyc_people.len(), 2);
4064
4065 store.create_property_index("city");
4067 assert!(store.has_property_index("city"));
4068
4069 let nyc_people = store.find_nodes_by_property("city", &Value::from("NYC"));
4071 assert_eq!(nyc_people.len(), 2);
4072 assert!(nyc_people.contains(&alice));
4073 assert!(nyc_people.contains(&bob));
4074
4075 let la_people = store.find_nodes_by_property("city", &Value::from("LA"));
4076 assert_eq!(la_people.len(), 1);
4077 assert!(la_people.contains(&charlie));
4078 }
4079
4080 #[test]
4081 fn test_property_index_maintained_on_update() {
4082 let store = LpgStore::new();
4083
4084 store.create_property_index("status");
4086
4087 let node = store.create_node(&["Task"]);
4088 store.set_node_property(node, "status", Value::from("pending"));
4089
4090 let pending = store.find_nodes_by_property("status", &Value::from("pending"));
4092 assert_eq!(pending.len(), 1);
4093 assert!(pending.contains(&node));
4094
4095 store.set_node_property(node, "status", Value::from("done"));
4097
4098 let pending = store.find_nodes_by_property("status", &Value::from("pending"));
4100 assert!(pending.is_empty());
4101
4102 let done = store.find_nodes_by_property("status", &Value::from("done"));
4104 assert_eq!(done.len(), 1);
4105 assert!(done.contains(&node));
4106 }
4107
4108 #[test]
4109 fn test_property_index_maintained_on_remove() {
4110 let store = LpgStore::new();
4111
4112 store.create_property_index("tag");
4113
4114 let node = store.create_node(&["Item"]);
4115 store.set_node_property(node, "tag", Value::from("important"));
4116
4117 let found = store.find_nodes_by_property("tag", &Value::from("important"));
4119 assert_eq!(found.len(), 1);
4120
4121 store.remove_node_property(node, "tag");
4123
4124 let found = store.find_nodes_by_property("tag", &Value::from("important"));
4126 assert!(found.is_empty());
4127 }
4128
4129 #[test]
4130 fn test_property_index_drop() {
4131 let store = LpgStore::new();
4132
4133 store.create_property_index("key");
4134 assert!(store.has_property_index("key"));
4135
4136 assert!(store.drop_property_index("key"));
4137 assert!(!store.has_property_index("key"));
4138
4139 assert!(!store.drop_property_index("key"));
4141 }
4142
4143 #[test]
4144 fn test_property_index_multiple_values() {
4145 let store = LpgStore::new();
4146
4147 store.create_property_index("age");
4148
4149 let n1 = store.create_node(&["Person"]);
4151 let n2 = store.create_node(&["Person"]);
4152 let n3 = store.create_node(&["Person"]);
4153 let n4 = store.create_node(&["Person"]);
4154
4155 store.set_node_property(n1, "age", Value::from(25i64));
4156 store.set_node_property(n2, "age", Value::from(25i64));
4157 store.set_node_property(n3, "age", Value::from(30i64));
4158 store.set_node_property(n4, "age", Value::from(25i64));
4159
4160 let age_25 = store.find_nodes_by_property("age", &Value::from(25i64));
4161 assert_eq!(age_25.len(), 3);
4162
4163 let age_30 = store.find_nodes_by_property("age", &Value::from(30i64));
4164 assert_eq!(age_30.len(), 1);
4165
4166 let age_40 = store.find_nodes_by_property("age", &Value::from(40i64));
4167 assert!(age_40.is_empty());
4168 }
4169
4170 #[test]
4171 fn test_property_index_builds_from_existing_data() {
4172 let store = LpgStore::new();
4173
4174 let n1 = store.create_node(&["Person"]);
4176 let n2 = store.create_node(&["Person"]);
4177 store.set_node_property(n1, "email", Value::from("alice@example.com"));
4178 store.set_node_property(n2, "email", Value::from("bob@example.com"));
4179
4180 store.create_property_index("email");
4182
4183 let alice = store.find_nodes_by_property("email", &Value::from("alice@example.com"));
4185 assert_eq!(alice.len(), 1);
4186 assert!(alice.contains(&n1));
4187
4188 let bob = store.find_nodes_by_property("email", &Value::from("bob@example.com"));
4189 assert_eq!(bob.len(), 1);
4190 assert!(bob.contains(&n2));
4191 }
4192
4193 #[test]
4194 fn test_get_node_property_batch() {
4195 let store = LpgStore::new();
4196
4197 let n1 = store.create_node(&["Person"]);
4198 let n2 = store.create_node(&["Person"]);
4199 let n3 = store.create_node(&["Person"]);
4200
4201 store.set_node_property(n1, "age", Value::from(25i64));
4202 store.set_node_property(n2, "age", Value::from(30i64));
4203 let age_key = PropertyKey::new("age");
4206 let values = store.get_node_property_batch(&[n1, n2, n3], &age_key);
4207
4208 assert_eq!(values.len(), 3);
4209 assert_eq!(values[0], Some(Value::from(25i64)));
4210 assert_eq!(values[1], Some(Value::from(30i64)));
4211 assert_eq!(values[2], None);
4212 }
4213
4214 #[test]
4215 fn test_get_node_property_batch_empty() {
4216 let store = LpgStore::new();
4217 let key = PropertyKey::new("any");
4218
4219 let values = store.get_node_property_batch(&[], &key);
4220 assert!(values.is_empty());
4221 }
4222
4223 #[test]
4224 fn test_get_nodes_properties_batch() {
4225 let store = LpgStore::new();
4226
4227 let n1 = store.create_node(&["Person"]);
4228 let n2 = store.create_node(&["Person"]);
4229 let n3 = store.create_node(&["Person"]);
4230
4231 store.set_node_property(n1, "name", Value::from("Alice"));
4232 store.set_node_property(n1, "age", Value::from(25i64));
4233 store.set_node_property(n2, "name", Value::from("Bob"));
4234 let all_props = store.get_nodes_properties_batch(&[n1, n2, n3]);
4237
4238 assert_eq!(all_props.len(), 3);
4239 assert_eq!(all_props[0].len(), 2); assert_eq!(all_props[1].len(), 1); assert_eq!(all_props[2].len(), 0); assert_eq!(
4244 all_props[0].get(&PropertyKey::new("name")),
4245 Some(&Value::from("Alice"))
4246 );
4247 assert_eq!(
4248 all_props[1].get(&PropertyKey::new("name")),
4249 Some(&Value::from("Bob"))
4250 );
4251 }
4252
4253 #[test]
4254 fn test_get_nodes_properties_batch_empty() {
4255 let store = LpgStore::new();
4256
4257 let all_props = store.get_nodes_properties_batch(&[]);
4258 assert!(all_props.is_empty());
4259 }
4260
4261 #[test]
4262 fn test_get_nodes_properties_selective_batch() {
4263 let store = LpgStore::new();
4264
4265 let n1 = store.create_node(&["Person"]);
4266 let n2 = store.create_node(&["Person"]);
4267
4268 store.set_node_property(n1, "name", Value::from("Alice"));
4270 store.set_node_property(n1, "age", Value::from(25i64));
4271 store.set_node_property(n1, "email", Value::from("alice@example.com"));
4272 store.set_node_property(n2, "name", Value::from("Bob"));
4273 store.set_node_property(n2, "age", Value::from(30i64));
4274 store.set_node_property(n2, "city", Value::from("NYC"));
4275
4276 let keys = vec![PropertyKey::new("name"), PropertyKey::new("age")];
4278 let props = store.get_nodes_properties_selective_batch(&[n1, n2], &keys);
4279
4280 assert_eq!(props.len(), 2);
4281
4282 assert_eq!(props[0].len(), 2);
4284 assert_eq!(
4285 props[0].get(&PropertyKey::new("name")),
4286 Some(&Value::from("Alice"))
4287 );
4288 assert_eq!(
4289 props[0].get(&PropertyKey::new("age")),
4290 Some(&Value::from(25i64))
4291 );
4292 assert_eq!(props[0].get(&PropertyKey::new("email")), None);
4293
4294 assert_eq!(props[1].len(), 2);
4296 assert_eq!(
4297 props[1].get(&PropertyKey::new("name")),
4298 Some(&Value::from("Bob"))
4299 );
4300 assert_eq!(
4301 props[1].get(&PropertyKey::new("age")),
4302 Some(&Value::from(30i64))
4303 );
4304 assert_eq!(props[1].get(&PropertyKey::new("city")), None);
4305 }
4306
4307 #[test]
4308 fn test_get_nodes_properties_selective_batch_empty_keys() {
4309 let store = LpgStore::new();
4310
4311 let n1 = store.create_node(&["Person"]);
4312 store.set_node_property(n1, "name", Value::from("Alice"));
4313
4314 let props = store.get_nodes_properties_selective_batch(&[n1], &[]);
4316
4317 assert_eq!(props.len(), 1);
4318 assert!(props[0].is_empty()); }
4320
4321 #[test]
4322 fn test_get_nodes_properties_selective_batch_missing_keys() {
4323 let store = LpgStore::new();
4324
4325 let n1 = store.create_node(&["Person"]);
4326 store.set_node_property(n1, "name", Value::from("Alice"));
4327
4328 let keys = vec![PropertyKey::new("nonexistent"), PropertyKey::new("name")];
4330 let props = store.get_nodes_properties_selective_batch(&[n1], &keys);
4331
4332 assert_eq!(props.len(), 1);
4333 assert_eq!(props[0].len(), 1); assert_eq!(
4335 props[0].get(&PropertyKey::new("name")),
4336 Some(&Value::from("Alice"))
4337 );
4338 }
4339
4340 #[test]
4343 fn test_find_nodes_in_range_inclusive() {
4344 let store = LpgStore::new();
4345
4346 let n1 = store.create_node_with_props(&["Person"], [("age", Value::from(20i64))]);
4347 let n2 = store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
4348 let n3 = store.create_node_with_props(&["Person"], [("age", Value::from(40i64))]);
4349 let _n4 = store.create_node_with_props(&["Person"], [("age", Value::from(50i64))]);
4350
4351 let result = store.find_nodes_in_range(
4353 "age",
4354 Some(&Value::from(20i64)),
4355 Some(&Value::from(40i64)),
4356 true,
4357 true,
4358 );
4359 assert_eq!(result.len(), 3);
4360 assert!(result.contains(&n1));
4361 assert!(result.contains(&n2));
4362 assert!(result.contains(&n3));
4363 }
4364
4365 #[test]
4366 fn test_find_nodes_in_range_exclusive() {
4367 let store = LpgStore::new();
4368
4369 store.create_node_with_props(&["Person"], [("age", Value::from(20i64))]);
4370 let n2 = store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
4371 store.create_node_with_props(&["Person"], [("age", Value::from(40i64))]);
4372
4373 let result = store.find_nodes_in_range(
4375 "age",
4376 Some(&Value::from(20i64)),
4377 Some(&Value::from(40i64)),
4378 false,
4379 false,
4380 );
4381 assert_eq!(result.len(), 1);
4382 assert!(result.contains(&n2));
4383 }
4384
4385 #[test]
4386 fn test_find_nodes_in_range_open_ended() {
4387 let store = LpgStore::new();
4388
4389 store.create_node_with_props(&["Person"], [("age", Value::from(20i64))]);
4390 store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
4391 let n3 = store.create_node_with_props(&["Person"], [("age", Value::from(40i64))]);
4392 let n4 = store.create_node_with_props(&["Person"], [("age", Value::from(50i64))]);
4393
4394 let result = store.find_nodes_in_range("age", Some(&Value::from(35i64)), None, true, true);
4396 assert_eq!(result.len(), 2);
4397 assert!(result.contains(&n3));
4398 assert!(result.contains(&n4));
4399
4400 let result = store.find_nodes_in_range("age", None, Some(&Value::from(25i64)), true, true);
4402 assert_eq!(result.len(), 1);
4403 }
4404
4405 #[test]
4406 fn test_find_nodes_in_range_empty_result() {
4407 let store = LpgStore::new();
4408
4409 store.create_node_with_props(&["Person"], [("age", Value::from(20i64))]);
4410
4411 let result = store.find_nodes_in_range(
4413 "age",
4414 Some(&Value::from(100i64)),
4415 Some(&Value::from(200i64)),
4416 true,
4417 true,
4418 );
4419 assert!(result.is_empty());
4420 }
4421
4422 #[test]
4423 fn test_find_nodes_in_range_nonexistent_property() {
4424 let store = LpgStore::new();
4425
4426 store.create_node_with_props(&["Person"], [("age", Value::from(20i64))]);
4427
4428 let result = store.find_nodes_in_range(
4429 "weight",
4430 Some(&Value::from(50i64)),
4431 Some(&Value::from(100i64)),
4432 true,
4433 true,
4434 );
4435 assert!(result.is_empty());
4436 }
4437
4438 #[test]
4441 fn test_find_nodes_by_properties_multiple_conditions() {
4442 let store = LpgStore::new();
4443
4444 let alice = store.create_node_with_props(
4445 &["Person"],
4446 [("name", Value::from("Alice")), ("city", Value::from("NYC"))],
4447 );
4448 store.create_node_with_props(
4449 &["Person"],
4450 [("name", Value::from("Bob")), ("city", Value::from("NYC"))],
4451 );
4452 store.create_node_with_props(
4453 &["Person"],
4454 [("name", Value::from("Alice")), ("city", Value::from("LA"))],
4455 );
4456
4457 let result = store.find_nodes_by_properties(&[
4459 ("name", Value::from("Alice")),
4460 ("city", Value::from("NYC")),
4461 ]);
4462 assert_eq!(result.len(), 1);
4463 assert!(result.contains(&alice));
4464 }
4465
4466 #[test]
4467 fn test_find_nodes_by_properties_empty_conditions() {
4468 let store = LpgStore::new();
4469
4470 store.create_node(&["Person"]);
4471 store.create_node(&["Person"]);
4472
4473 let result = store.find_nodes_by_properties(&[]);
4475 assert_eq!(result.len(), 2);
4476 }
4477
4478 #[test]
4479 fn test_find_nodes_by_properties_no_match() {
4480 let store = LpgStore::new();
4481
4482 store.create_node_with_props(&["Person"], [("name", Value::from("Alice"))]);
4483
4484 let result = store.find_nodes_by_properties(&[("name", Value::from("Nobody"))]);
4485 assert!(result.is_empty());
4486 }
4487
4488 #[test]
4489 fn test_find_nodes_by_properties_with_index() {
4490 let store = LpgStore::new();
4491
4492 store.create_property_index("name");
4494
4495 let alice = store.create_node_with_props(
4496 &["Person"],
4497 [("name", Value::from("Alice")), ("age", Value::from(30i64))],
4498 );
4499 store.create_node_with_props(
4500 &["Person"],
4501 [("name", Value::from("Bob")), ("age", Value::from(30i64))],
4502 );
4503
4504 let result = store.find_nodes_by_properties(&[
4506 ("name", Value::from("Alice")),
4507 ("age", Value::from(30i64)),
4508 ]);
4509 assert_eq!(result.len(), 1);
4510 assert!(result.contains(&alice));
4511 }
4512
4513 #[test]
4516 fn test_estimate_label_cardinality() {
4517 let store = LpgStore::new();
4518
4519 store.create_node(&["Person"]);
4520 store.create_node(&["Person"]);
4521 store.create_node(&["Animal"]);
4522
4523 store.ensure_statistics_fresh();
4524
4525 let person_est = store.estimate_label_cardinality("Person");
4526 let animal_est = store.estimate_label_cardinality("Animal");
4527 let unknown_est = store.estimate_label_cardinality("Unknown");
4528
4529 assert!(
4530 person_est >= 1.0,
4531 "Person should have cardinality >= 1, got {person_est}"
4532 );
4533 assert!(
4534 animal_est >= 1.0,
4535 "Animal should have cardinality >= 1, got {animal_est}"
4536 );
4537 assert!(unknown_est >= 0.0);
4539 }
4540
4541 #[test]
4542 fn test_estimate_avg_degree() {
4543 let store = LpgStore::new();
4544
4545 let a = store.create_node(&["Person"]);
4546 let b = store.create_node(&["Person"]);
4547 let c = store.create_node(&["Person"]);
4548
4549 store.create_edge(a, b, "KNOWS");
4550 store.create_edge(a, c, "KNOWS");
4551 store.create_edge(b, c, "KNOWS");
4552
4553 store.ensure_statistics_fresh();
4554
4555 let outgoing = store.estimate_avg_degree("KNOWS", true);
4556 let incoming = store.estimate_avg_degree("KNOWS", false);
4557
4558 assert!(
4559 outgoing > 0.0,
4560 "Outgoing degree should be > 0, got {outgoing}"
4561 );
4562 assert!(
4563 incoming > 0.0,
4564 "Incoming degree should be > 0, got {incoming}"
4565 );
4566 }
4567
4568 #[test]
4571 fn test_delete_node_does_not_cascade() {
4572 let store = LpgStore::new();
4573
4574 let a = store.create_node(&["A"]);
4575 let b = store.create_node(&["B"]);
4576 let e = store.create_edge(a, b, "KNOWS");
4577
4578 assert!(store.delete_node(a));
4579 assert!(store.get_node(a).is_none());
4580
4581 assert!(
4583 store.get_edge(e).is_some(),
4584 "Edge should survive non-detach node delete"
4585 );
4586 }
4587
4588 #[test]
4589 fn test_delete_already_deleted_node() {
4590 let store = LpgStore::new();
4591 let a = store.create_node(&["A"]);
4592
4593 assert!(store.delete_node(a));
4594 assert!(!store.delete_node(a));
4596 }
4597
4598 #[test]
4599 fn test_delete_nonexistent_node() {
4600 let store = LpgStore::new();
4601 assert!(!store.delete_node(NodeId::new(999)));
4602 }
4603}