1use super::property::CompareOp;
13use super::{Edge, EdgeRecord, Node, NodeRecord, PropertyStorage};
14use crate::graph::Direction;
15use crate::index::adjacency::ChunkedAdjacency;
16use crate::index::zone_map::ZoneMapEntry;
17use crate::statistics::{EdgeTypeStatistics, LabelStatistics, Statistics};
18use dashmap::DashMap;
19#[cfg(not(feature = "tiered-storage"))]
20use grafeo_common::mvcc::VersionChain;
21use grafeo_common::types::{EdgeId, EpochId, HashableValue, NodeId, PropertyKey, TxId, Value};
22use grafeo_common::utils::hash::{FxHashMap, FxHashSet};
23use parking_lot::RwLock;
24use std::cmp::Ordering as CmpOrdering;
25use std::sync::Arc;
26use std::sync::atomic::{AtomicU64, Ordering};
27
28fn compare_values_for_range(a: &Value, b: &Value) -> Option<CmpOrdering> {
30 match (a, b) {
31 (Value::Int64(a), Value::Int64(b)) => Some(a.cmp(b)),
32 (Value::Float64(a), Value::Float64(b)) => a.partial_cmp(b),
33 (Value::String(a), Value::String(b)) => Some(a.cmp(b)),
34 (Value::Bool(a), Value::Bool(b)) => Some(a.cmp(b)),
35 _ => None,
36 }
37}
38
39fn value_in_range(
41 value: &Value,
42 min: Option<&Value>,
43 max: Option<&Value>,
44 min_inclusive: bool,
45 max_inclusive: bool,
46) -> bool {
47 if let Some(min_val) = min {
49 match compare_values_for_range(value, min_val) {
50 Some(CmpOrdering::Less) => return false,
51 Some(CmpOrdering::Equal) if !min_inclusive => return false,
52 None => return false, _ => {}
54 }
55 }
56
57 if let Some(max_val) = max {
59 match compare_values_for_range(value, max_val) {
60 Some(CmpOrdering::Greater) => return false,
61 Some(CmpOrdering::Equal) if !max_inclusive => return false,
62 None => return false,
63 _ => {}
64 }
65 }
66
67 true
68}
69
70#[cfg(feature = "tiered-storage")]
72use crate::storage::EpochStore;
73#[cfg(feature = "tiered-storage")]
74use grafeo_common::memory::arena::ArenaAllocator;
75#[cfg(feature = "tiered-storage")]
76use grafeo_common::mvcc::{ColdVersionRef, HotVersionRef, VersionIndex, VersionRef};
77
78#[derive(Debug, Clone)]
84pub struct LpgStoreConfig {
85 pub backward_edges: bool,
88 pub initial_node_capacity: usize,
90 pub initial_edge_capacity: usize,
92}
93
94impl Default for LpgStoreConfig {
95 fn default() -> Self {
96 Self {
97 backward_edges: true,
98 initial_node_capacity: 1024,
99 initial_edge_capacity: 4096,
100 }
101 }
102}
103
104pub struct LpgStore {
132 #[allow(dead_code)]
134 config: LpgStoreConfig,
135
136 #[cfg(not(feature = "tiered-storage"))]
139 nodes: RwLock<FxHashMap<NodeId, VersionChain<NodeRecord>>>,
140
141 #[cfg(not(feature = "tiered-storage"))]
144 edges: RwLock<FxHashMap<EdgeId, VersionChain<EdgeRecord>>>,
145
146 #[cfg(feature = "tiered-storage")]
150 arena_allocator: Arc<ArenaAllocator>,
151
152 #[cfg(feature = "tiered-storage")]
155 node_versions: RwLock<FxHashMap<NodeId, VersionIndex>>,
156
157 #[cfg(feature = "tiered-storage")]
160 edge_versions: RwLock<FxHashMap<EdgeId, VersionIndex>>,
161
162 #[cfg(feature = "tiered-storage")]
165 epoch_store: Arc<EpochStore>,
166
167 node_properties: PropertyStorage<NodeId>,
169
170 edge_properties: PropertyStorage<EdgeId>,
172
173 label_to_id: RwLock<FxHashMap<Arc<str>, u32>>,
175
176 id_to_label: RwLock<Vec<Arc<str>>>,
178
179 edge_type_to_id: RwLock<FxHashMap<Arc<str>, u32>>,
181
182 id_to_edge_type: RwLock<Vec<Arc<str>>>,
184
185 forward_adj: ChunkedAdjacency,
187
188 backward_adj: Option<ChunkedAdjacency>,
191
192 label_index: RwLock<Vec<FxHashMap<NodeId, ()>>>,
194
195 node_labels: RwLock<FxHashMap<NodeId, FxHashSet<u32>>>,
198
199 property_indexes: RwLock<FxHashMap<PropertyKey, DashMap<HashableValue, FxHashSet<NodeId>>>>,
204
205 next_node_id: AtomicU64,
207
208 next_edge_id: AtomicU64,
210
211 current_epoch: AtomicU64,
213
214 statistics: RwLock<Statistics>,
216}
217
218impl LpgStore {
219 #[must_use]
221 pub fn new() -> Self {
222 Self::with_config(LpgStoreConfig::default())
223 }
224
225 #[must_use]
227 pub fn with_config(config: LpgStoreConfig) -> Self {
228 let backward_adj = if config.backward_edges {
229 Some(ChunkedAdjacency::new())
230 } else {
231 None
232 };
233
234 Self {
235 #[cfg(not(feature = "tiered-storage"))]
236 nodes: RwLock::new(FxHashMap::default()),
237 #[cfg(not(feature = "tiered-storage"))]
238 edges: RwLock::new(FxHashMap::default()),
239 #[cfg(feature = "tiered-storage")]
240 arena_allocator: Arc::new(ArenaAllocator::new()),
241 #[cfg(feature = "tiered-storage")]
242 node_versions: RwLock::new(FxHashMap::default()),
243 #[cfg(feature = "tiered-storage")]
244 edge_versions: RwLock::new(FxHashMap::default()),
245 #[cfg(feature = "tiered-storage")]
246 epoch_store: Arc::new(EpochStore::new()),
247 node_properties: PropertyStorage::new(),
248 edge_properties: PropertyStorage::new(),
249 label_to_id: RwLock::new(FxHashMap::default()),
250 id_to_label: RwLock::new(Vec::new()),
251 edge_type_to_id: RwLock::new(FxHashMap::default()),
252 id_to_edge_type: RwLock::new(Vec::new()),
253 forward_adj: ChunkedAdjacency::new(),
254 backward_adj,
255 label_index: RwLock::new(Vec::new()),
256 node_labels: RwLock::new(FxHashMap::default()),
257 property_indexes: RwLock::new(FxHashMap::default()),
258 next_node_id: AtomicU64::new(0),
259 next_edge_id: AtomicU64::new(0),
260 current_epoch: AtomicU64::new(0),
261 statistics: RwLock::new(Statistics::new()),
262 config,
263 }
264 }
265
266 #[must_use]
268 pub fn current_epoch(&self) -> EpochId {
269 EpochId::new(self.current_epoch.load(Ordering::Acquire))
270 }
271
272 pub fn new_epoch(&self) -> EpochId {
274 let id = self.current_epoch.fetch_add(1, Ordering::AcqRel) + 1;
275 EpochId::new(id)
276 }
277
278 pub fn create_node(&self, labels: &[&str]) -> NodeId {
284 self.create_node_versioned(labels, self.current_epoch(), TxId::SYSTEM)
285 }
286
287 #[cfg(not(feature = "tiered-storage"))]
289 pub fn create_node_versioned(&self, labels: &[&str], epoch: EpochId, tx_id: TxId) -> NodeId {
290 let id = NodeId::new(self.next_node_id.fetch_add(1, Ordering::Relaxed));
291
292 let mut record = NodeRecord::new(id, epoch);
293 record.set_label_count(labels.len() as u16);
294
295 let mut node_label_set = FxHashSet::default();
297 for label in labels {
298 let label_id = self.get_or_create_label_id(*label);
299 node_label_set.insert(label_id);
300
301 let mut index = self.label_index.write();
303 while index.len() <= label_id as usize {
304 index.push(FxHashMap::default());
305 }
306 index[label_id as usize].insert(id, ());
307 }
308
309 self.node_labels.write().insert(id, node_label_set);
311
312 let chain = VersionChain::with_initial(record, epoch, tx_id);
314 self.nodes.write().insert(id, chain);
315 id
316 }
317
318 #[cfg(feature = "tiered-storage")]
321 pub fn create_node_versioned(&self, labels: &[&str], epoch: EpochId, tx_id: TxId) -> NodeId {
322 let id = NodeId::new(self.next_node_id.fetch_add(1, Ordering::Relaxed));
323
324 let mut record = NodeRecord::new(id, epoch);
325 record.set_label_count(labels.len() as u16);
326
327 let mut node_label_set = FxHashSet::default();
329 for label in labels {
330 let label_id = self.get_or_create_label_id(*label);
331 node_label_set.insert(label_id);
332
333 let mut index = self.label_index.write();
335 while index.len() <= label_id as usize {
336 index.push(FxHashMap::default());
337 }
338 index[label_id as usize].insert(id, ());
339 }
340
341 self.node_labels.write().insert(id, node_label_set);
343
344 let arena = self.arena_allocator.arena_or_create(epoch);
346 let (offset, _stored) = arena.alloc_value_with_offset(record);
347
348 let hot_ref = HotVersionRef::new(epoch, offset, tx_id);
350
351 let mut versions = self.node_versions.write();
353 if let Some(index) = versions.get_mut(&id) {
354 index.add_hot(hot_ref);
355 } else {
356 versions.insert(id, VersionIndex::with_initial(hot_ref));
357 }
358
359 id
360 }
361
362 pub fn create_node_with_props(
364 &self,
365 labels: &[&str],
366 properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
367 ) -> NodeId {
368 self.create_node_with_props_versioned(
369 labels,
370 properties,
371 self.current_epoch(),
372 TxId::SYSTEM,
373 )
374 }
375
376 #[cfg(not(feature = "tiered-storage"))]
378 pub fn create_node_with_props_versioned(
379 &self,
380 labels: &[&str],
381 properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
382 epoch: EpochId,
383 tx_id: TxId,
384 ) -> NodeId {
385 let id = self.create_node_versioned(labels, epoch, tx_id);
386
387 for (key, value) in properties {
388 self.node_properties.set(id, key.into(), value.into());
389 }
390
391 let count = self.node_properties.get_all(id).len() as u16;
393 if let Some(chain) = self.nodes.write().get_mut(&id) {
394 if let Some(record) = chain.latest_mut() {
395 record.props_count = count;
396 }
397 }
398
399 id
400 }
401
402 #[cfg(feature = "tiered-storage")]
405 pub fn create_node_with_props_versioned(
406 &self,
407 labels: &[&str],
408 properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
409 epoch: EpochId,
410 tx_id: TxId,
411 ) -> NodeId {
412 let id = self.create_node_versioned(labels, epoch, tx_id);
413
414 for (key, value) in properties {
415 self.node_properties.set(id, key.into(), value.into());
416 }
417
418 id
422 }
423
424 #[must_use]
426 pub fn get_node(&self, id: NodeId) -> Option<Node> {
427 self.get_node_at_epoch(id, self.current_epoch())
428 }
429
430 #[must_use]
432 #[cfg(not(feature = "tiered-storage"))]
433 pub fn get_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> Option<Node> {
434 let nodes = self.nodes.read();
435 let chain = nodes.get(&id)?;
436 let record = chain.visible_at(epoch)?;
437
438 if record.is_deleted() {
439 return None;
440 }
441
442 let mut node = Node::new(id);
443
444 let id_to_label = self.id_to_label.read();
446 let node_labels = self.node_labels.read();
447 if let Some(label_ids) = node_labels.get(&id) {
448 for &label_id in label_ids {
449 if let Some(label) = id_to_label.get(label_id as usize) {
450 node.labels.push(label.clone());
451 }
452 }
453 }
454
455 node.properties = self.node_properties.get_all(id).into_iter().collect();
457
458 Some(node)
459 }
460
461 #[must_use]
464 #[cfg(feature = "tiered-storage")]
465 pub fn get_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> Option<Node> {
466 let versions = self.node_versions.read();
467 let index = versions.get(&id)?;
468 let version_ref = index.visible_at(epoch)?;
469
470 let record = self.read_node_record(&version_ref)?;
472
473 if record.is_deleted() {
474 return None;
475 }
476
477 let mut node = Node::new(id);
478
479 let id_to_label = self.id_to_label.read();
481 let node_labels = self.node_labels.read();
482 if let Some(label_ids) = node_labels.get(&id) {
483 for &label_id in label_ids {
484 if let Some(label) = id_to_label.get(label_id as usize) {
485 node.labels.push(label.clone());
486 }
487 }
488 }
489
490 node.properties = self.node_properties.get_all(id).into_iter().collect();
492
493 Some(node)
494 }
495
496 #[must_use]
498 #[cfg(not(feature = "tiered-storage"))]
499 pub fn get_node_versioned(&self, id: NodeId, epoch: EpochId, tx_id: TxId) -> Option<Node> {
500 let nodes = self.nodes.read();
501 let chain = nodes.get(&id)?;
502 let record = chain.visible_to(epoch, tx_id)?;
503
504 if record.is_deleted() {
505 return None;
506 }
507
508 let mut node = Node::new(id);
509
510 let id_to_label = self.id_to_label.read();
512 let node_labels = self.node_labels.read();
513 if let Some(label_ids) = node_labels.get(&id) {
514 for &label_id in label_ids {
515 if let Some(label) = id_to_label.get(label_id as usize) {
516 node.labels.push(label.clone());
517 }
518 }
519 }
520
521 node.properties = self.node_properties.get_all(id).into_iter().collect();
523
524 Some(node)
525 }
526
527 #[must_use]
530 #[cfg(feature = "tiered-storage")]
531 pub fn get_node_versioned(&self, id: NodeId, epoch: EpochId, tx_id: TxId) -> Option<Node> {
532 let versions = self.node_versions.read();
533 let index = versions.get(&id)?;
534 let version_ref = index.visible_to(epoch, tx_id)?;
535
536 let record = self.read_node_record(&version_ref)?;
538
539 if record.is_deleted() {
540 return None;
541 }
542
543 let mut node = Node::new(id);
544
545 let id_to_label = self.id_to_label.read();
547 let node_labels = self.node_labels.read();
548 if let Some(label_ids) = node_labels.get(&id) {
549 for &label_id in label_ids {
550 if let Some(label) = id_to_label.get(label_id as usize) {
551 node.labels.push(label.clone());
552 }
553 }
554 }
555
556 node.properties = self.node_properties.get_all(id).into_iter().collect();
558
559 Some(node)
560 }
561
562 #[cfg(feature = "tiered-storage")]
564 #[allow(unsafe_code)]
565 fn read_node_record(&self, version_ref: &VersionRef) -> Option<NodeRecord> {
566 match version_ref {
567 VersionRef::Hot(hot_ref) => {
568 let arena = self.arena_allocator.arena(hot_ref.epoch);
569 let record: &NodeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
571 Some(record.clone())
572 }
573 VersionRef::Cold(cold_ref) => {
574 self.epoch_store
576 .get_node(cold_ref.epoch, cold_ref.block_offset, cold_ref.length)
577 }
578 }
579 }
580
581 pub fn delete_node(&self, id: NodeId) -> bool {
583 self.delete_node_at_epoch(id, self.current_epoch())
584 }
585
586 #[cfg(not(feature = "tiered-storage"))]
588 pub fn delete_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> bool {
589 let mut nodes = self.nodes.write();
590 if let Some(chain) = nodes.get_mut(&id) {
591 if let Some(record) = chain.visible_at(epoch) {
593 if record.is_deleted() {
594 return false;
595 }
596 } else {
597 return false;
599 }
600
601 chain.mark_deleted(epoch);
603
604 let mut index = self.label_index.write();
606 let mut node_labels = self.node_labels.write();
607 if let Some(label_ids) = node_labels.remove(&id) {
608 for label_id in label_ids {
609 if let Some(set) = index.get_mut(label_id as usize) {
610 set.remove(&id);
611 }
612 }
613 }
614
615 drop(nodes); drop(index);
618 drop(node_labels);
619 self.node_properties.remove_all(id);
620
621 true
624 } else {
625 false
626 }
627 }
628
629 #[cfg(feature = "tiered-storage")]
632 pub fn delete_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> bool {
633 let mut versions = self.node_versions.write();
634 if let Some(index) = versions.get_mut(&id) {
635 if let Some(version_ref) = index.visible_at(epoch) {
637 if let Some(record) = self.read_node_record(&version_ref) {
638 if record.is_deleted() {
639 return false;
640 }
641 } else {
642 return false;
643 }
644 } else {
645 return false;
646 }
647
648 index.mark_deleted(epoch);
650
651 let mut label_index = self.label_index.write();
653 let mut node_labels = self.node_labels.write();
654 if let Some(label_ids) = node_labels.remove(&id) {
655 for label_id in label_ids {
656 if let Some(set) = label_index.get_mut(label_id as usize) {
657 set.remove(&id);
658 }
659 }
660 }
661
662 drop(versions);
664 drop(label_index);
665 drop(node_labels);
666 self.node_properties.remove_all(id);
667
668 true
669 } else {
670 false
671 }
672 }
673
674 #[cfg(not(feature = "tiered-storage"))]
679 pub fn delete_node_edges(&self, node_id: NodeId) {
680 let outgoing: Vec<EdgeId> = self
682 .forward_adj
683 .edges_from(node_id)
684 .into_iter()
685 .map(|(_, edge_id)| edge_id)
686 .collect();
687
688 let incoming: Vec<EdgeId> = if let Some(ref backward) = self.backward_adj {
690 backward
691 .edges_from(node_id)
692 .into_iter()
693 .map(|(_, edge_id)| edge_id)
694 .collect()
695 } else {
696 let epoch = self.current_epoch();
698 self.edges
699 .read()
700 .iter()
701 .filter_map(|(id, chain)| {
702 chain.visible_at(epoch).and_then(|r| {
703 if !r.is_deleted() && r.dst == node_id {
704 Some(*id)
705 } else {
706 None
707 }
708 })
709 })
710 .collect()
711 };
712
713 for edge_id in outgoing.into_iter().chain(incoming) {
715 self.delete_edge(edge_id);
716 }
717 }
718
719 #[cfg(feature = "tiered-storage")]
722 pub fn delete_node_edges(&self, node_id: NodeId) {
723 let outgoing: Vec<EdgeId> = self
725 .forward_adj
726 .edges_from(node_id)
727 .into_iter()
728 .map(|(_, edge_id)| edge_id)
729 .collect();
730
731 let incoming: Vec<EdgeId> = if let Some(ref backward) = self.backward_adj {
733 backward
734 .edges_from(node_id)
735 .into_iter()
736 .map(|(_, edge_id)| edge_id)
737 .collect()
738 } else {
739 let epoch = self.current_epoch();
741 let versions = self.edge_versions.read();
742 versions
743 .iter()
744 .filter_map(|(id, index)| {
745 index.visible_at(epoch).and_then(|vref| {
746 self.read_edge_record(&vref).and_then(|r| {
747 if !r.is_deleted() && r.dst == node_id {
748 Some(*id)
749 } else {
750 None
751 }
752 })
753 })
754 })
755 .collect()
756 };
757
758 for edge_id in outgoing.into_iter().chain(incoming) {
760 self.delete_edge(edge_id);
761 }
762 }
763
764 #[cfg(not(feature = "tiered-storage"))]
766 pub fn set_node_property(&self, id: NodeId, key: &str, value: Value) {
767 let prop_key: PropertyKey = key.into();
768
769 self.update_property_index_on_set(id, &prop_key, &value);
771
772 self.node_properties.set(id, prop_key, value);
773
774 let count = self.node_properties.get_all(id).len() as u16;
776 if let Some(chain) = self.nodes.write().get_mut(&id) {
777 if let Some(record) = chain.latest_mut() {
778 record.props_count = count;
779 }
780 }
781 }
782
783 #[cfg(feature = "tiered-storage")]
786 pub fn set_node_property(&self, id: NodeId, key: &str, value: Value) {
787 let prop_key: PropertyKey = key.into();
788
789 self.update_property_index_on_set(id, &prop_key, &value);
791
792 self.node_properties.set(id, prop_key, value);
793 }
797
798 pub fn set_edge_property(&self, id: EdgeId, key: &str, value: Value) {
800 self.edge_properties.set(id, key.into(), value);
801 }
802
803 #[cfg(not(feature = "tiered-storage"))]
807 pub fn remove_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
808 let prop_key: PropertyKey = key.into();
809
810 self.update_property_index_on_remove(id, &prop_key);
812
813 let result = self.node_properties.remove(id, &prop_key);
814
815 let count = self.node_properties.get_all(id).len() as u16;
817 if let Some(chain) = self.nodes.write().get_mut(&id) {
818 if let Some(record) = chain.latest_mut() {
819 record.props_count = count;
820 }
821 }
822
823 result
824 }
825
826 #[cfg(feature = "tiered-storage")]
829 pub fn remove_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
830 let prop_key: PropertyKey = key.into();
831
832 self.update_property_index_on_remove(id, &prop_key);
834
835 self.node_properties.remove(id, &prop_key)
836 }
838
839 pub fn remove_edge_property(&self, id: EdgeId, key: &str) -> Option<Value> {
843 self.edge_properties.remove(id, &key.into())
844 }
845
846 #[must_use]
861 pub fn get_node_property(&self, id: NodeId, key: &PropertyKey) -> Option<Value> {
862 self.node_properties.get(id, key)
863 }
864
865 #[must_use]
869 pub fn get_edge_property(&self, id: EdgeId, key: &PropertyKey) -> Option<Value> {
870 self.edge_properties.get(id, key)
871 }
872
873 #[must_use]
896 pub fn get_node_property_batch(&self, ids: &[NodeId], key: &PropertyKey) -> Vec<Option<Value>> {
897 ids.iter()
898 .map(|&id| self.node_properties.get(id, key))
899 .collect()
900 }
901
902 #[must_use]
907 pub fn get_nodes_properties_batch(&self, ids: &[NodeId]) -> Vec<FxHashMap<PropertyKey, Value>> {
908 ids.iter()
909 .map(|&id| self.node_properties.get_all(id))
910 .collect()
911 }
912
913 #[must_use]
949 pub fn find_nodes_in_range(
950 &self,
951 property: &str,
952 min: Option<&Value>,
953 max: Option<&Value>,
954 min_inclusive: bool,
955 max_inclusive: bool,
956 ) -> Vec<NodeId> {
957 let key = PropertyKey::new(property);
958
959 if !self
961 .node_properties
962 .might_match_range(&key, min, max, min_inclusive, max_inclusive)
963 {
964 return Vec::new();
965 }
966
967 self.node_ids()
969 .into_iter()
970 .filter(|&node_id| {
971 self.node_properties
972 .get(node_id, &key)
973 .is_some_and(|v| value_in_range(&v, min, max, min_inclusive, max_inclusive))
974 })
975 .collect()
976 }
977
978 #[must_use]
1003 pub fn find_nodes_by_properties(&self, conditions: &[(&str, Value)]) -> Vec<NodeId> {
1004 if conditions.is_empty() {
1005 return self.node_ids();
1006 }
1007
1008 let mut best_start: Option<(usize, Vec<NodeId>)> = None;
1011 let indexes = self.property_indexes.read();
1012
1013 for (i, (prop, value)) in conditions.iter().enumerate() {
1014 let key = PropertyKey::new(*prop);
1015 let hv = HashableValue::new(value.clone());
1016
1017 if let Some(index) = indexes.get(&key) {
1018 let matches: Vec<NodeId> = index
1019 .get(&hv)
1020 .map(|nodes| nodes.iter().copied().collect())
1021 .unwrap_or_default();
1022
1023 if matches.is_empty() {
1025 return Vec::new();
1026 }
1027
1028 if best_start
1030 .as_ref()
1031 .is_none_or(|(_, best)| matches.len() < best.len())
1032 {
1033 best_start = Some((i, matches));
1034 }
1035 }
1036 }
1037 drop(indexes);
1038
1039 let (start_idx, mut candidates) = best_start.unwrap_or_else(|| {
1041 let (prop, value) = &conditions[0];
1043 (0, self.find_nodes_by_property(prop, value))
1044 });
1045
1046 for (i, (prop, value)) in conditions.iter().enumerate() {
1048 if i == start_idx {
1049 continue;
1050 }
1051
1052 let key = PropertyKey::new(*prop);
1053 candidates.retain(|&node_id| {
1054 self.node_properties
1055 .get(node_id, &key)
1056 .is_some_and(|v| v == *value)
1057 });
1058
1059 if candidates.is_empty() {
1061 return Vec::new();
1062 }
1063 }
1064
1065 candidates
1066 }
1067
1068 pub fn create_property_index(&self, property: &str) {
1096 let key = PropertyKey::new(property);
1097
1098 let mut indexes = self.property_indexes.write();
1099 if indexes.contains_key(&key) {
1100 return; }
1102
1103 let index: DashMap<HashableValue, FxHashSet<NodeId>> = DashMap::new();
1105
1106 for node_id in self.node_ids() {
1108 if let Some(value) = self.node_properties.get(node_id, &key) {
1109 let hv = HashableValue::new(value);
1110 index
1111 .entry(hv)
1112 .or_insert_with(FxHashSet::default)
1113 .insert(node_id);
1114 }
1115 }
1116
1117 indexes.insert(key, index);
1118 }
1119
1120 pub fn drop_property_index(&self, property: &str) -> bool {
1124 let key = PropertyKey::new(property);
1125 self.property_indexes.write().remove(&key).is_some()
1126 }
1127
1128 #[must_use]
1130 pub fn has_property_index(&self, property: &str) -> bool {
1131 let key = PropertyKey::new(property);
1132 self.property_indexes.read().contains_key(&key)
1133 }
1134
1135 #[must_use]
1158 pub fn find_nodes_by_property(&self, property: &str, value: &Value) -> Vec<NodeId> {
1159 let key = PropertyKey::new(property);
1160 let hv = HashableValue::new(value.clone());
1161
1162 let indexes = self.property_indexes.read();
1164 if let Some(index) = indexes.get(&key) {
1165 if let Some(nodes) = index.get(&hv) {
1166 return nodes.iter().copied().collect();
1167 }
1168 return Vec::new();
1169 }
1170 drop(indexes);
1171
1172 self.node_ids()
1174 .into_iter()
1175 .filter(|&node_id| {
1176 self.node_properties
1177 .get(node_id, &key)
1178 .is_some_and(|v| v == *value)
1179 })
1180 .collect()
1181 }
1182
1183 fn update_property_index_on_set(&self, node_id: NodeId, key: &PropertyKey, new_value: &Value) {
1185 let indexes = self.property_indexes.read();
1186 if let Some(index) = indexes.get(key) {
1187 if let Some(old_value) = self.node_properties.get(node_id, key) {
1189 let old_hv = HashableValue::new(old_value);
1190 if let Some(mut nodes) = index.get_mut(&old_hv) {
1191 nodes.remove(&node_id);
1192 if nodes.is_empty() {
1193 drop(nodes);
1194 index.remove(&old_hv);
1195 }
1196 }
1197 }
1198
1199 let new_hv = HashableValue::new(new_value.clone());
1201 index
1202 .entry(new_hv)
1203 .or_insert_with(FxHashSet::default)
1204 .insert(node_id);
1205 }
1206 }
1207
1208 fn update_property_index_on_remove(&self, node_id: NodeId, key: &PropertyKey) {
1210 let indexes = self.property_indexes.read();
1211 if let Some(index) = indexes.get(key) {
1212 if let Some(old_value) = self.node_properties.get(node_id, key) {
1214 let old_hv = HashableValue::new(old_value);
1215 if let Some(mut nodes) = index.get_mut(&old_hv) {
1216 nodes.remove(&node_id);
1217 if nodes.is_empty() {
1218 drop(nodes);
1219 index.remove(&old_hv);
1220 }
1221 }
1222 }
1223 }
1224 }
1225
1226 #[cfg(not(feature = "tiered-storage"))]
1231 pub fn add_label(&self, node_id: NodeId, label: &str) -> bool {
1232 let epoch = self.current_epoch();
1233
1234 let nodes = self.nodes.read();
1236 if let Some(chain) = nodes.get(&node_id) {
1237 if chain.visible_at(epoch).map_or(true, |r| r.is_deleted()) {
1238 return false;
1239 }
1240 } else {
1241 return false;
1242 }
1243 drop(nodes);
1244
1245 let label_id = self.get_or_create_label_id(label);
1247
1248 let mut node_labels = self.node_labels.write();
1250 let label_set = node_labels
1251 .entry(node_id)
1252 .or_insert_with(FxHashSet::default);
1253
1254 if label_set.contains(&label_id) {
1255 return false; }
1257
1258 label_set.insert(label_id);
1259 drop(node_labels);
1260
1261 let mut index = self.label_index.write();
1263 if (label_id as usize) >= index.len() {
1264 index.resize(label_id as usize + 1, FxHashMap::default());
1265 }
1266 index[label_id as usize].insert(node_id, ());
1267
1268 if let Some(chain) = self.nodes.write().get_mut(&node_id) {
1270 if let Some(record) = chain.latest_mut() {
1271 let count = self.node_labels.read().get(&node_id).map_or(0, |s| s.len());
1272 record.set_label_count(count as u16);
1273 }
1274 }
1275
1276 true
1277 }
1278
1279 #[cfg(feature = "tiered-storage")]
1282 pub fn add_label(&self, node_id: NodeId, label: &str) -> bool {
1283 let epoch = self.current_epoch();
1284
1285 let versions = self.node_versions.read();
1287 if let Some(index) = versions.get(&node_id) {
1288 if let Some(vref) = index.visible_at(epoch) {
1289 if let Some(record) = self.read_node_record(&vref) {
1290 if record.is_deleted() {
1291 return false;
1292 }
1293 } else {
1294 return false;
1295 }
1296 } else {
1297 return false;
1298 }
1299 } else {
1300 return false;
1301 }
1302 drop(versions);
1303
1304 let label_id = self.get_or_create_label_id(label);
1306
1307 let mut node_labels = self.node_labels.write();
1309 let label_set = node_labels
1310 .entry(node_id)
1311 .or_insert_with(FxHashSet::default);
1312
1313 if label_set.contains(&label_id) {
1314 return false; }
1316
1317 label_set.insert(label_id);
1318 drop(node_labels);
1319
1320 let mut index = self.label_index.write();
1322 if (label_id as usize) >= index.len() {
1323 index.resize(label_id as usize + 1, FxHashMap::default());
1324 }
1325 index[label_id as usize].insert(node_id, ());
1326
1327 true
1331 }
1332
1333 #[cfg(not(feature = "tiered-storage"))]
1338 pub fn remove_label(&self, node_id: NodeId, label: &str) -> bool {
1339 let epoch = self.current_epoch();
1340
1341 let nodes = self.nodes.read();
1343 if let Some(chain) = nodes.get(&node_id) {
1344 if chain.visible_at(epoch).map_or(true, |r| r.is_deleted()) {
1345 return false;
1346 }
1347 } else {
1348 return false;
1349 }
1350 drop(nodes);
1351
1352 let label_id = {
1354 let label_ids = self.label_to_id.read();
1355 match label_ids.get(label) {
1356 Some(&id) => id,
1357 None => return false, }
1359 };
1360
1361 let mut node_labels = self.node_labels.write();
1363 if let Some(label_set) = node_labels.get_mut(&node_id) {
1364 if !label_set.remove(&label_id) {
1365 return false; }
1367 } else {
1368 return false;
1369 }
1370 drop(node_labels);
1371
1372 let mut index = self.label_index.write();
1374 if (label_id as usize) < index.len() {
1375 index[label_id as usize].remove(&node_id);
1376 }
1377
1378 if let Some(chain) = self.nodes.write().get_mut(&node_id) {
1380 if let Some(record) = chain.latest_mut() {
1381 let count = self.node_labels.read().get(&node_id).map_or(0, |s| s.len());
1382 record.set_label_count(count as u16);
1383 }
1384 }
1385
1386 true
1387 }
1388
1389 #[cfg(feature = "tiered-storage")]
1392 pub fn remove_label(&self, node_id: NodeId, label: &str) -> bool {
1393 let epoch = self.current_epoch();
1394
1395 let versions = self.node_versions.read();
1397 if let Some(index) = versions.get(&node_id) {
1398 if let Some(vref) = index.visible_at(epoch) {
1399 if let Some(record) = self.read_node_record(&vref) {
1400 if record.is_deleted() {
1401 return false;
1402 }
1403 } else {
1404 return false;
1405 }
1406 } else {
1407 return false;
1408 }
1409 } else {
1410 return false;
1411 }
1412 drop(versions);
1413
1414 let label_id = {
1416 let label_ids = self.label_to_id.read();
1417 match label_ids.get(label) {
1418 Some(&id) => id,
1419 None => return false, }
1421 };
1422
1423 let mut node_labels = self.node_labels.write();
1425 if let Some(label_set) = node_labels.get_mut(&node_id) {
1426 if !label_set.remove(&label_id) {
1427 return false; }
1429 } else {
1430 return false;
1431 }
1432 drop(node_labels);
1433
1434 let mut index = self.label_index.write();
1436 if (label_id as usize) < index.len() {
1437 index[label_id as usize].remove(&node_id);
1438 }
1439
1440 true
1443 }
1444
1445 #[must_use]
1447 #[cfg(not(feature = "tiered-storage"))]
1448 pub fn node_count(&self) -> usize {
1449 let epoch = self.current_epoch();
1450 self.nodes
1451 .read()
1452 .values()
1453 .filter_map(|chain| chain.visible_at(epoch))
1454 .filter(|r| !r.is_deleted())
1455 .count()
1456 }
1457
1458 #[must_use]
1461 #[cfg(feature = "tiered-storage")]
1462 pub fn node_count(&self) -> usize {
1463 let epoch = self.current_epoch();
1464 let versions = self.node_versions.read();
1465 versions
1466 .iter()
1467 .filter(|(_, index)| {
1468 index.visible_at(epoch).map_or(false, |vref| {
1469 self.read_node_record(&vref)
1470 .map_or(false, |r| !r.is_deleted())
1471 })
1472 })
1473 .count()
1474 }
1475
1476 #[must_use]
1482 #[cfg(not(feature = "tiered-storage"))]
1483 pub fn node_ids(&self) -> Vec<NodeId> {
1484 let epoch = self.current_epoch();
1485 let mut ids: Vec<NodeId> = self
1486 .nodes
1487 .read()
1488 .iter()
1489 .filter_map(|(id, chain)| {
1490 chain
1491 .visible_at(epoch)
1492 .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
1493 })
1494 .collect();
1495 ids.sort_unstable();
1496 ids
1497 }
1498
1499 #[must_use]
1502 #[cfg(feature = "tiered-storage")]
1503 pub fn node_ids(&self) -> Vec<NodeId> {
1504 let epoch = self.current_epoch();
1505 let versions = self.node_versions.read();
1506 let mut ids: Vec<NodeId> = versions
1507 .iter()
1508 .filter_map(|(id, index)| {
1509 index.visible_at(epoch).and_then(|vref| {
1510 self.read_node_record(&vref)
1511 .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
1512 })
1513 })
1514 .collect();
1515 ids.sort_unstable();
1516 ids
1517 }
1518
1519 pub fn create_edge(&self, src: NodeId, dst: NodeId, edge_type: &str) -> EdgeId {
1523 self.create_edge_versioned(src, dst, edge_type, self.current_epoch(), TxId::SYSTEM)
1524 }
1525
1526 #[cfg(not(feature = "tiered-storage"))]
1528 pub fn create_edge_versioned(
1529 &self,
1530 src: NodeId,
1531 dst: NodeId,
1532 edge_type: &str,
1533 epoch: EpochId,
1534 tx_id: TxId,
1535 ) -> EdgeId {
1536 let id = EdgeId::new(self.next_edge_id.fetch_add(1, Ordering::Relaxed));
1537 let type_id = self.get_or_create_edge_type_id(edge_type);
1538
1539 let record = EdgeRecord::new(id, src, dst, type_id, epoch);
1540 let chain = VersionChain::with_initial(record, epoch, tx_id);
1541 self.edges.write().insert(id, chain);
1542
1543 self.forward_adj.add_edge(src, dst, id);
1545 if let Some(ref backward) = self.backward_adj {
1546 backward.add_edge(dst, src, id);
1547 }
1548
1549 id
1550 }
1551
1552 #[cfg(feature = "tiered-storage")]
1555 pub fn create_edge_versioned(
1556 &self,
1557 src: NodeId,
1558 dst: NodeId,
1559 edge_type: &str,
1560 epoch: EpochId,
1561 tx_id: TxId,
1562 ) -> EdgeId {
1563 let id = EdgeId::new(self.next_edge_id.fetch_add(1, Ordering::Relaxed));
1564 let type_id = self.get_or_create_edge_type_id(edge_type);
1565
1566 let record = EdgeRecord::new(id, src, dst, type_id, epoch);
1567
1568 let arena = self.arena_allocator.arena_or_create(epoch);
1570 let (offset, _stored) = arena.alloc_value_with_offset(record);
1571
1572 let hot_ref = HotVersionRef::new(epoch, offset, tx_id);
1574
1575 let mut versions = self.edge_versions.write();
1577 if let Some(index) = versions.get_mut(&id) {
1578 index.add_hot(hot_ref);
1579 } else {
1580 versions.insert(id, VersionIndex::with_initial(hot_ref));
1581 }
1582
1583 self.forward_adj.add_edge(src, dst, id);
1585 if let Some(ref backward) = self.backward_adj {
1586 backward.add_edge(dst, src, id);
1587 }
1588
1589 id
1590 }
1591
1592 pub fn create_edge_with_props(
1594 &self,
1595 src: NodeId,
1596 dst: NodeId,
1597 edge_type: &str,
1598 properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
1599 ) -> EdgeId {
1600 let id = self.create_edge(src, dst, edge_type);
1601
1602 for (key, value) in properties {
1603 self.edge_properties.set(id, key.into(), value.into());
1604 }
1605
1606 id
1607 }
1608
1609 #[must_use]
1611 pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
1612 self.get_edge_at_epoch(id, self.current_epoch())
1613 }
1614
1615 #[must_use]
1617 #[cfg(not(feature = "tiered-storage"))]
1618 pub fn get_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> Option<Edge> {
1619 let edges = self.edges.read();
1620 let chain = edges.get(&id)?;
1621 let record = chain.visible_at(epoch)?;
1622
1623 if record.is_deleted() {
1624 return None;
1625 }
1626
1627 let edge_type = {
1628 let id_to_type = self.id_to_edge_type.read();
1629 id_to_type.get(record.type_id as usize)?.clone()
1630 };
1631
1632 let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1633
1634 edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1636
1637 Some(edge)
1638 }
1639
1640 #[must_use]
1643 #[cfg(feature = "tiered-storage")]
1644 pub fn get_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> Option<Edge> {
1645 let versions = self.edge_versions.read();
1646 let index = versions.get(&id)?;
1647 let version_ref = index.visible_at(epoch)?;
1648
1649 let record = self.read_edge_record(&version_ref)?;
1650
1651 if record.is_deleted() {
1652 return None;
1653 }
1654
1655 let edge_type = {
1656 let id_to_type = self.id_to_edge_type.read();
1657 id_to_type.get(record.type_id as usize)?.clone()
1658 };
1659
1660 let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1661
1662 edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1664
1665 Some(edge)
1666 }
1667
1668 #[must_use]
1670 #[cfg(not(feature = "tiered-storage"))]
1671 pub fn get_edge_versioned(&self, id: EdgeId, epoch: EpochId, tx_id: TxId) -> Option<Edge> {
1672 let edges = self.edges.read();
1673 let chain = edges.get(&id)?;
1674 let record = chain.visible_to(epoch, tx_id)?;
1675
1676 if record.is_deleted() {
1677 return None;
1678 }
1679
1680 let edge_type = {
1681 let id_to_type = self.id_to_edge_type.read();
1682 id_to_type.get(record.type_id as usize)?.clone()
1683 };
1684
1685 let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1686
1687 edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1689
1690 Some(edge)
1691 }
1692
1693 #[must_use]
1696 #[cfg(feature = "tiered-storage")]
1697 pub fn get_edge_versioned(&self, id: EdgeId, epoch: EpochId, tx_id: TxId) -> Option<Edge> {
1698 let versions = self.edge_versions.read();
1699 let index = versions.get(&id)?;
1700 let version_ref = index.visible_to(epoch, tx_id)?;
1701
1702 let record = self.read_edge_record(&version_ref)?;
1703
1704 if record.is_deleted() {
1705 return None;
1706 }
1707
1708 let edge_type = {
1709 let id_to_type = self.id_to_edge_type.read();
1710 id_to_type.get(record.type_id as usize)?.clone()
1711 };
1712
1713 let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1714
1715 edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1717
1718 Some(edge)
1719 }
1720
1721 #[cfg(feature = "tiered-storage")]
1723 #[allow(unsafe_code)]
1724 fn read_edge_record(&self, version_ref: &VersionRef) -> Option<EdgeRecord> {
1725 match version_ref {
1726 VersionRef::Hot(hot_ref) => {
1727 let arena = self.arena_allocator.arena(hot_ref.epoch);
1728 let record: &EdgeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
1730 Some(record.clone())
1731 }
1732 VersionRef::Cold(cold_ref) => {
1733 self.epoch_store
1735 .get_edge(cold_ref.epoch, cold_ref.block_offset, cold_ref.length)
1736 }
1737 }
1738 }
1739
1740 pub fn delete_edge(&self, id: EdgeId) -> bool {
1742 self.delete_edge_at_epoch(id, self.current_epoch())
1743 }
1744
1745 #[cfg(not(feature = "tiered-storage"))]
1747 pub fn delete_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> bool {
1748 let mut edges = self.edges.write();
1749 if let Some(chain) = edges.get_mut(&id) {
1750 let (src, dst) = {
1752 match chain.visible_at(epoch) {
1753 Some(record) => {
1754 if record.is_deleted() {
1755 return false;
1756 }
1757 (record.src, record.dst)
1758 }
1759 None => return false, }
1761 };
1762
1763 chain.mark_deleted(epoch);
1765
1766 drop(edges); self.forward_adj.mark_deleted(src, id);
1770 if let Some(ref backward) = self.backward_adj {
1771 backward.mark_deleted(dst, id);
1772 }
1773
1774 self.edge_properties.remove_all(id);
1776
1777 true
1778 } else {
1779 false
1780 }
1781 }
1782
1783 #[cfg(feature = "tiered-storage")]
1786 pub fn delete_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> bool {
1787 let mut versions = self.edge_versions.write();
1788 if let Some(index) = versions.get_mut(&id) {
1789 let (src, dst) = {
1791 match index.visible_at(epoch) {
1792 Some(version_ref) => {
1793 if let Some(record) = self.read_edge_record(&version_ref) {
1794 if record.is_deleted() {
1795 return false;
1796 }
1797 (record.src, record.dst)
1798 } else {
1799 return false;
1800 }
1801 }
1802 None => return false,
1803 }
1804 };
1805
1806 index.mark_deleted(epoch);
1808
1809 drop(versions); self.forward_adj.mark_deleted(src, id);
1813 if let Some(ref backward) = self.backward_adj {
1814 backward.mark_deleted(dst, id);
1815 }
1816
1817 self.edge_properties.remove_all(id);
1819
1820 true
1821 } else {
1822 false
1823 }
1824 }
1825
1826 #[must_use]
1828 #[cfg(not(feature = "tiered-storage"))]
1829 pub fn edge_count(&self) -> usize {
1830 let epoch = self.current_epoch();
1831 self.edges
1832 .read()
1833 .values()
1834 .filter_map(|chain| chain.visible_at(epoch))
1835 .filter(|r| !r.is_deleted())
1836 .count()
1837 }
1838
1839 #[must_use]
1842 #[cfg(feature = "tiered-storage")]
1843 pub fn edge_count(&self) -> usize {
1844 let epoch = self.current_epoch();
1845 let versions = self.edge_versions.read();
1846 versions
1847 .iter()
1848 .filter(|(_, index)| {
1849 index.visible_at(epoch).map_or(false, |vref| {
1850 self.read_edge_record(&vref)
1851 .map_or(false, |r| !r.is_deleted())
1852 })
1853 })
1854 .count()
1855 }
1856
1857 #[cfg(not(feature = "tiered-storage"))]
1862 pub fn discard_uncommitted_versions(&self, tx_id: TxId) {
1863 {
1865 let mut nodes = self.nodes.write();
1866 for chain in nodes.values_mut() {
1867 chain.remove_versions_by(tx_id);
1868 }
1869 nodes.retain(|_, chain| !chain.is_empty());
1871 }
1872
1873 {
1875 let mut edges = self.edges.write();
1876 for chain in edges.values_mut() {
1877 chain.remove_versions_by(tx_id);
1878 }
1879 edges.retain(|_, chain| !chain.is_empty());
1881 }
1882 }
1883
1884 #[cfg(feature = "tiered-storage")]
1887 pub fn discard_uncommitted_versions(&self, tx_id: TxId) {
1888 {
1890 let mut versions = self.node_versions.write();
1891 for index in versions.values_mut() {
1892 index.remove_versions_by(tx_id);
1893 }
1894 versions.retain(|_, index| !index.is_empty());
1896 }
1897
1898 {
1900 let mut versions = self.edge_versions.write();
1901 for index in versions.values_mut() {
1902 index.remove_versions_by(tx_id);
1903 }
1904 versions.retain(|_, index| !index.is_empty());
1906 }
1907 }
1908
1909 #[cfg(feature = "tiered-storage")]
1928 #[allow(unsafe_code)]
1929 pub fn freeze_epoch(&self, epoch: EpochId) -> usize {
1930 let mut node_records: Vec<(u64, NodeRecord)> = Vec::new();
1932 let mut node_hot_refs: Vec<(NodeId, HotVersionRef)> = Vec::new();
1933
1934 {
1935 let versions = self.node_versions.read();
1936 for (node_id, index) in versions.iter() {
1937 for hot_ref in index.hot_refs_for_epoch(epoch) {
1938 let arena = self.arena_allocator.arena(hot_ref.epoch);
1939 let record: &NodeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
1941 node_records.push((node_id.as_u64(), record.clone()));
1942 node_hot_refs.push((*node_id, *hot_ref));
1943 }
1944 }
1945 }
1946
1947 let mut edge_records: Vec<(u64, EdgeRecord)> = Vec::new();
1949 let mut edge_hot_refs: Vec<(EdgeId, HotVersionRef)> = Vec::new();
1950
1951 {
1952 let versions = self.edge_versions.read();
1953 for (edge_id, index) in versions.iter() {
1954 for hot_ref in index.hot_refs_for_epoch(epoch) {
1955 let arena = self.arena_allocator.arena(hot_ref.epoch);
1956 let record: &EdgeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
1958 edge_records.push((edge_id.as_u64(), record.clone()));
1959 edge_hot_refs.push((*edge_id, *hot_ref));
1960 }
1961 }
1962 }
1963
1964 let total_frozen = node_records.len() + edge_records.len();
1965
1966 if total_frozen == 0 {
1967 return 0;
1968 }
1969
1970 let (node_entries, edge_entries) =
1972 self.epoch_store
1973 .freeze_epoch(epoch, node_records, edge_records);
1974
1975 let node_entry_map: FxHashMap<u64, _> = node_entries
1977 .iter()
1978 .map(|e| (e.entity_id, (e.offset, e.length)))
1979 .collect();
1980 let edge_entry_map: FxHashMap<u64, _> = edge_entries
1981 .iter()
1982 .map(|e| (e.entity_id, (e.offset, e.length)))
1983 .collect();
1984
1985 {
1987 let mut versions = self.node_versions.write();
1988 for (node_id, hot_ref) in &node_hot_refs {
1989 if let Some(index) = versions.get_mut(node_id) {
1990 if let Some(&(offset, length)) = node_entry_map.get(&node_id.as_u64()) {
1991 let cold_ref = ColdVersionRef {
1992 epoch,
1993 block_offset: offset,
1994 length,
1995 created_by: hot_ref.created_by,
1996 deleted_epoch: hot_ref.deleted_epoch,
1997 };
1998 index.freeze_epoch(epoch, std::iter::once(cold_ref));
1999 }
2000 }
2001 }
2002 }
2003
2004 {
2005 let mut versions = self.edge_versions.write();
2006 for (edge_id, hot_ref) in &edge_hot_refs {
2007 if let Some(index) = versions.get_mut(edge_id) {
2008 if let Some(&(offset, length)) = edge_entry_map.get(&edge_id.as_u64()) {
2009 let cold_ref = ColdVersionRef {
2010 epoch,
2011 block_offset: offset,
2012 length,
2013 created_by: hot_ref.created_by,
2014 deleted_epoch: hot_ref.deleted_epoch,
2015 };
2016 index.freeze_epoch(epoch, std::iter::once(cold_ref));
2017 }
2018 }
2019 }
2020 }
2021
2022 total_frozen
2023 }
2024
2025 #[cfg(feature = "tiered-storage")]
2027 #[must_use]
2028 pub fn epoch_store(&self) -> &EpochStore {
2029 &self.epoch_store
2030 }
2031
2032 #[must_use]
2034 pub fn label_count(&self) -> usize {
2035 self.id_to_label.read().len()
2036 }
2037
2038 #[must_use]
2042 pub fn property_key_count(&self) -> usize {
2043 let node_keys = self.node_properties.column_count();
2044 let edge_keys = self.edge_properties.column_count();
2045 node_keys + edge_keys
2049 }
2050
2051 #[must_use]
2053 pub fn edge_type_count(&self) -> usize {
2054 self.id_to_edge_type.read().len()
2055 }
2056
2057 pub fn neighbors(
2064 &self,
2065 node: NodeId,
2066 direction: Direction,
2067 ) -> impl Iterator<Item = NodeId> + '_ {
2068 let forward: Box<dyn Iterator<Item = NodeId>> = match direction {
2069 Direction::Outgoing | Direction::Both => {
2070 Box::new(self.forward_adj.neighbors(node).into_iter())
2071 }
2072 Direction::Incoming => Box::new(std::iter::empty()),
2073 };
2074
2075 let backward: Box<dyn Iterator<Item = NodeId>> = match direction {
2076 Direction::Incoming | Direction::Both => {
2077 if let Some(ref adj) = self.backward_adj {
2078 Box::new(adj.neighbors(node).into_iter())
2079 } else {
2080 Box::new(std::iter::empty())
2081 }
2082 }
2083 Direction::Outgoing => Box::new(std::iter::empty()),
2084 };
2085
2086 forward.chain(backward)
2087 }
2088
2089 pub fn edges_from(
2093 &self,
2094 node: NodeId,
2095 direction: Direction,
2096 ) -> impl Iterator<Item = (NodeId, EdgeId)> + '_ {
2097 let forward: Box<dyn Iterator<Item = (NodeId, EdgeId)>> = match direction {
2098 Direction::Outgoing | Direction::Both => {
2099 Box::new(self.forward_adj.edges_from(node).into_iter())
2100 }
2101 Direction::Incoming => Box::new(std::iter::empty()),
2102 };
2103
2104 let backward: Box<dyn Iterator<Item = (NodeId, EdgeId)>> = match direction {
2105 Direction::Incoming | Direction::Both => {
2106 if let Some(ref adj) = self.backward_adj {
2107 Box::new(adj.edges_from(node).into_iter())
2108 } else {
2109 Box::new(std::iter::empty())
2110 }
2111 }
2112 Direction::Outgoing => Box::new(std::iter::empty()),
2113 };
2114
2115 forward.chain(backward)
2116 }
2117
2118 pub fn edges_to(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
2131 if let Some(ref backward) = self.backward_adj {
2132 backward.edges_from(node)
2133 } else {
2134 self.all_edges()
2136 .filter_map(|edge| {
2137 if edge.dst == node {
2138 Some((edge.src, edge.id))
2139 } else {
2140 None
2141 }
2142 })
2143 .collect()
2144 }
2145 }
2146
2147 #[must_use]
2151 pub fn out_degree(&self, node: NodeId) -> usize {
2152 self.forward_adj.out_degree(node)
2153 }
2154
2155 #[must_use]
2160 pub fn in_degree(&self, node: NodeId) -> usize {
2161 if let Some(ref backward) = self.backward_adj {
2162 backward.in_degree(node)
2163 } else {
2164 self.all_edges().filter(|edge| edge.dst == node).count()
2166 }
2167 }
2168
2169 #[must_use]
2171 #[cfg(not(feature = "tiered-storage"))]
2172 pub fn edge_type(&self, id: EdgeId) -> Option<Arc<str>> {
2173 let edges = self.edges.read();
2174 let chain = edges.get(&id)?;
2175 let epoch = self.current_epoch();
2176 let record = chain.visible_at(epoch)?;
2177 let id_to_type = self.id_to_edge_type.read();
2178 id_to_type.get(record.type_id as usize).cloned()
2179 }
2180
2181 #[must_use]
2184 #[cfg(feature = "tiered-storage")]
2185 pub fn edge_type(&self, id: EdgeId) -> Option<Arc<str>> {
2186 let versions = self.edge_versions.read();
2187 let index = versions.get(&id)?;
2188 let epoch = self.current_epoch();
2189 let vref = index.visible_at(epoch)?;
2190 let record = self.read_edge_record(&vref)?;
2191 let id_to_type = self.id_to_edge_type.read();
2192 id_to_type.get(record.type_id as usize).cloned()
2193 }
2194
2195 pub fn nodes_by_label(&self, label: &str) -> Vec<NodeId> {
2201 let label_to_id = self.label_to_id.read();
2202 if let Some(&label_id) = label_to_id.get(label) {
2203 let index = self.label_index.read();
2204 if let Some(set) = index.get(label_id as usize) {
2205 let mut ids: Vec<NodeId> = set.keys().copied().collect();
2206 ids.sort_unstable();
2207 return ids;
2208 }
2209 }
2210 Vec::new()
2211 }
2212
2213 #[cfg(not(feature = "tiered-storage"))]
2220 pub fn all_nodes(&self) -> impl Iterator<Item = Node> + '_ {
2221 let epoch = self.current_epoch();
2222 let node_ids: Vec<NodeId> = self
2223 .nodes
2224 .read()
2225 .iter()
2226 .filter_map(|(id, chain)| {
2227 chain
2228 .visible_at(epoch)
2229 .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
2230 })
2231 .collect();
2232
2233 node_ids.into_iter().filter_map(move |id| self.get_node(id))
2234 }
2235
2236 #[cfg(feature = "tiered-storage")]
2239 pub fn all_nodes(&self) -> impl Iterator<Item = Node> + '_ {
2240 let node_ids = self.node_ids();
2241 node_ids.into_iter().filter_map(move |id| self.get_node(id))
2242 }
2243
2244 #[cfg(not(feature = "tiered-storage"))]
2249 pub fn all_edges(&self) -> impl Iterator<Item = Edge> + '_ {
2250 let epoch = self.current_epoch();
2251 let edge_ids: Vec<EdgeId> = self
2252 .edges
2253 .read()
2254 .iter()
2255 .filter_map(|(id, chain)| {
2256 chain
2257 .visible_at(epoch)
2258 .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
2259 })
2260 .collect();
2261
2262 edge_ids.into_iter().filter_map(move |id| self.get_edge(id))
2263 }
2264
2265 #[cfg(feature = "tiered-storage")]
2268 pub fn all_edges(&self) -> impl Iterator<Item = Edge> + '_ {
2269 let epoch = self.current_epoch();
2270 let versions = self.edge_versions.read();
2271 let edge_ids: Vec<EdgeId> = versions
2272 .iter()
2273 .filter_map(|(id, index)| {
2274 index.visible_at(epoch).and_then(|vref| {
2275 self.read_edge_record(&vref)
2276 .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
2277 })
2278 })
2279 .collect();
2280
2281 edge_ids.into_iter().filter_map(move |id| self.get_edge(id))
2282 }
2283
2284 pub fn all_labels(&self) -> Vec<String> {
2286 self.id_to_label
2287 .read()
2288 .iter()
2289 .map(|s| s.to_string())
2290 .collect()
2291 }
2292
2293 pub fn all_edge_types(&self) -> Vec<String> {
2295 self.id_to_edge_type
2296 .read()
2297 .iter()
2298 .map(|s| s.to_string())
2299 .collect()
2300 }
2301
2302 pub fn all_property_keys(&self) -> Vec<String> {
2304 let mut keys = std::collections::HashSet::new();
2305 for key in self.node_properties.keys() {
2306 keys.insert(key.to_string());
2307 }
2308 for key in self.edge_properties.keys() {
2309 keys.insert(key.to_string());
2310 }
2311 keys.into_iter().collect()
2312 }
2313
2314 pub fn nodes_with_label<'a>(&'a self, label: &str) -> impl Iterator<Item = Node> + 'a {
2316 let node_ids = self.nodes_by_label(label);
2317 node_ids.into_iter().filter_map(move |id| self.get_node(id))
2318 }
2319
2320 #[cfg(not(feature = "tiered-storage"))]
2322 pub fn edges_with_type<'a>(&'a self, edge_type: &str) -> impl Iterator<Item = Edge> + 'a {
2323 let epoch = self.current_epoch();
2324 let type_to_id = self.edge_type_to_id.read();
2325
2326 if let Some(&type_id) = type_to_id.get(edge_type) {
2327 let edge_ids: Vec<EdgeId> = self
2328 .edges
2329 .read()
2330 .iter()
2331 .filter_map(|(id, chain)| {
2332 chain.visible_at(epoch).and_then(|r| {
2333 if !r.is_deleted() && r.type_id == type_id {
2334 Some(*id)
2335 } else {
2336 None
2337 }
2338 })
2339 })
2340 .collect();
2341
2342 Box::new(edge_ids.into_iter().filter_map(move |id| self.get_edge(id)))
2344 as Box<dyn Iterator<Item = Edge> + 'a>
2345 } else {
2346 Box::new(std::iter::empty()) as Box<dyn Iterator<Item = Edge> + 'a>
2348 }
2349 }
2350
2351 #[cfg(feature = "tiered-storage")]
2354 pub fn edges_with_type<'a>(&'a self, edge_type: &str) -> impl Iterator<Item = Edge> + 'a {
2355 let epoch = self.current_epoch();
2356 let type_to_id = self.edge_type_to_id.read();
2357
2358 if let Some(&type_id) = type_to_id.get(edge_type) {
2359 let versions = self.edge_versions.read();
2360 let edge_ids: Vec<EdgeId> = versions
2361 .iter()
2362 .filter_map(|(id, index)| {
2363 index.visible_at(epoch).and_then(|vref| {
2364 self.read_edge_record(&vref).and_then(|r| {
2365 if !r.is_deleted() && r.type_id == type_id {
2366 Some(*id)
2367 } else {
2368 None
2369 }
2370 })
2371 })
2372 })
2373 .collect();
2374
2375 Box::new(edge_ids.into_iter().filter_map(move |id| self.get_edge(id)))
2376 as Box<dyn Iterator<Item = Edge> + 'a>
2377 } else {
2378 Box::new(std::iter::empty()) as Box<dyn Iterator<Item = Edge> + 'a>
2379 }
2380 }
2381
2382 #[must_use]
2389 pub fn node_property_might_match(
2390 &self,
2391 property: &PropertyKey,
2392 op: CompareOp,
2393 value: &Value,
2394 ) -> bool {
2395 self.node_properties.might_match(property, op, value)
2396 }
2397
2398 #[must_use]
2400 pub fn edge_property_might_match(
2401 &self,
2402 property: &PropertyKey,
2403 op: CompareOp,
2404 value: &Value,
2405 ) -> bool {
2406 self.edge_properties.might_match(property, op, value)
2407 }
2408
2409 #[must_use]
2411 pub fn node_property_zone_map(&self, property: &PropertyKey) -> Option<ZoneMapEntry> {
2412 self.node_properties.zone_map(property)
2413 }
2414
2415 #[must_use]
2417 pub fn edge_property_zone_map(&self, property: &PropertyKey) -> Option<ZoneMapEntry> {
2418 self.edge_properties.zone_map(property)
2419 }
2420
2421 pub fn rebuild_zone_maps(&self) {
2423 self.node_properties.rebuild_zone_maps();
2424 self.edge_properties.rebuild_zone_maps();
2425 }
2426
2427 #[must_use]
2431 pub fn statistics(&self) -> Statistics {
2432 self.statistics.read().clone()
2433 }
2434
2435 #[cfg(not(feature = "tiered-storage"))]
2440 pub fn compute_statistics(&self) {
2441 let mut stats = Statistics::new();
2442
2443 stats.total_nodes = self.node_count() as u64;
2445 stats.total_edges = self.edge_count() as u64;
2446
2447 let id_to_label = self.id_to_label.read();
2449 let label_index = self.label_index.read();
2450
2451 for (label_id, label_name) in id_to_label.iter().enumerate() {
2452 let node_count = label_index
2453 .get(label_id)
2454 .map(|set| set.len() as u64)
2455 .unwrap_or(0);
2456
2457 if node_count > 0 {
2458 let avg_out_degree = if stats.total_nodes > 0 {
2460 stats.total_edges as f64 / stats.total_nodes as f64
2461 } else {
2462 0.0
2463 };
2464
2465 let label_stats =
2466 LabelStatistics::new(node_count).with_degrees(avg_out_degree, avg_out_degree);
2467
2468 stats.update_label(label_name.as_ref(), label_stats);
2469 }
2470 }
2471
2472 let id_to_edge_type = self.id_to_edge_type.read();
2474 let edges = self.edges.read();
2475 let epoch = self.current_epoch();
2476
2477 let mut edge_type_counts: FxHashMap<u32, u64> = FxHashMap::default();
2478 for chain in edges.values() {
2479 if let Some(record) = chain.visible_at(epoch) {
2480 if !record.is_deleted() {
2481 *edge_type_counts.entry(record.type_id).or_default() += 1;
2482 }
2483 }
2484 }
2485
2486 for (type_id, count) in edge_type_counts {
2487 if let Some(type_name) = id_to_edge_type.get(type_id as usize) {
2488 let avg_degree = if stats.total_nodes > 0 {
2489 count as f64 / stats.total_nodes as f64
2490 } else {
2491 0.0
2492 };
2493
2494 let edge_stats = EdgeTypeStatistics::new(count, avg_degree, avg_degree);
2495 stats.update_edge_type(type_name.as_ref(), edge_stats);
2496 }
2497 }
2498
2499 *self.statistics.write() = stats;
2500 }
2501
2502 #[cfg(feature = "tiered-storage")]
2505 pub fn compute_statistics(&self) {
2506 let mut stats = Statistics::new();
2507
2508 stats.total_nodes = self.node_count() as u64;
2510 stats.total_edges = self.edge_count() as u64;
2511
2512 let id_to_label = self.id_to_label.read();
2514 let label_index = self.label_index.read();
2515
2516 for (label_id, label_name) in id_to_label.iter().enumerate() {
2517 let node_count = label_index
2518 .get(label_id)
2519 .map(|set| set.len() as u64)
2520 .unwrap_or(0);
2521
2522 if node_count > 0 {
2523 let avg_out_degree = if stats.total_nodes > 0 {
2524 stats.total_edges as f64 / stats.total_nodes as f64
2525 } else {
2526 0.0
2527 };
2528
2529 let label_stats =
2530 LabelStatistics::new(node_count).with_degrees(avg_out_degree, avg_out_degree);
2531
2532 stats.update_label(label_name.as_ref(), label_stats);
2533 }
2534 }
2535
2536 let id_to_edge_type = self.id_to_edge_type.read();
2538 let versions = self.edge_versions.read();
2539 let epoch = self.current_epoch();
2540
2541 let mut edge_type_counts: FxHashMap<u32, u64> = FxHashMap::default();
2542 for index in versions.values() {
2543 if let Some(vref) = index.visible_at(epoch) {
2544 if let Some(record) = self.read_edge_record(&vref) {
2545 if !record.is_deleted() {
2546 *edge_type_counts.entry(record.type_id).or_default() += 1;
2547 }
2548 }
2549 }
2550 }
2551
2552 for (type_id, count) in edge_type_counts {
2553 if let Some(type_name) = id_to_edge_type.get(type_id as usize) {
2554 let avg_degree = if stats.total_nodes > 0 {
2555 count as f64 / stats.total_nodes as f64
2556 } else {
2557 0.0
2558 };
2559
2560 let edge_stats = EdgeTypeStatistics::new(count, avg_degree, avg_degree);
2561 stats.update_edge_type(type_name.as_ref(), edge_stats);
2562 }
2563 }
2564
2565 *self.statistics.write() = stats;
2566 }
2567
2568 #[must_use]
2570 pub fn estimate_label_cardinality(&self, label: &str) -> f64 {
2571 self.statistics.read().estimate_label_cardinality(label)
2572 }
2573
2574 #[must_use]
2576 pub fn estimate_avg_degree(&self, edge_type: &str, outgoing: bool) -> f64 {
2577 self.statistics
2578 .read()
2579 .estimate_avg_degree(edge_type, outgoing)
2580 }
2581
2582 fn get_or_create_label_id(&self, label: &str) -> u32 {
2585 {
2586 let label_to_id = self.label_to_id.read();
2587 if let Some(&id) = label_to_id.get(label) {
2588 return id;
2589 }
2590 }
2591
2592 let mut label_to_id = self.label_to_id.write();
2593 let mut id_to_label = self.id_to_label.write();
2594
2595 if let Some(&id) = label_to_id.get(label) {
2597 return id;
2598 }
2599
2600 let id = id_to_label.len() as u32;
2601
2602 let label: Arc<str> = label.into();
2603 label_to_id.insert(label.clone(), id);
2604 id_to_label.push(label);
2605
2606 id
2607 }
2608
2609 fn get_or_create_edge_type_id(&self, edge_type: &str) -> u32 {
2610 {
2611 let type_to_id = self.edge_type_to_id.read();
2612 if let Some(&id) = type_to_id.get(edge_type) {
2613 return id;
2614 }
2615 }
2616
2617 let mut type_to_id = self.edge_type_to_id.write();
2618 let mut id_to_type = self.id_to_edge_type.write();
2619
2620 if let Some(&id) = type_to_id.get(edge_type) {
2622 return id;
2623 }
2624
2625 let id = id_to_type.len() as u32;
2626 let edge_type: Arc<str> = edge_type.into();
2627 type_to_id.insert(edge_type.clone(), id);
2628 id_to_type.push(edge_type);
2629
2630 id
2631 }
2632
2633 #[cfg(not(feature = "tiered-storage"))]
2640 pub fn create_node_with_id(&self, id: NodeId, labels: &[&str]) {
2641 let epoch = self.current_epoch();
2642 let mut record = NodeRecord::new(id, epoch);
2643 record.set_label_count(labels.len() as u16);
2644
2645 let mut node_label_set = FxHashSet::default();
2647 for label in labels {
2648 let label_id = self.get_or_create_label_id(*label);
2649 node_label_set.insert(label_id);
2650
2651 let mut index = self.label_index.write();
2653 while index.len() <= label_id as usize {
2654 index.push(FxHashMap::default());
2655 }
2656 index[label_id as usize].insert(id, ());
2657 }
2658
2659 self.node_labels.write().insert(id, node_label_set);
2661
2662 let chain = VersionChain::with_initial(record, epoch, TxId::SYSTEM);
2664 self.nodes.write().insert(id, chain);
2665
2666 let id_val = id.as_u64();
2668 let _ = self
2669 .next_node_id
2670 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
2671 if id_val >= current {
2672 Some(id_val + 1)
2673 } else {
2674 None
2675 }
2676 });
2677 }
2678
2679 #[cfg(feature = "tiered-storage")]
2682 pub fn create_node_with_id(&self, id: NodeId, labels: &[&str]) {
2683 let epoch = self.current_epoch();
2684 let mut record = NodeRecord::new(id, epoch);
2685 record.set_label_count(labels.len() as u16);
2686
2687 let mut node_label_set = FxHashSet::default();
2689 for label in labels {
2690 let label_id = self.get_or_create_label_id(*label);
2691 node_label_set.insert(label_id);
2692
2693 let mut index = self.label_index.write();
2695 while index.len() <= label_id as usize {
2696 index.push(FxHashMap::default());
2697 }
2698 index[label_id as usize].insert(id, ());
2699 }
2700
2701 self.node_labels.write().insert(id, node_label_set);
2703
2704 let arena = self.arena_allocator.arena_or_create(epoch);
2706 let (offset, _stored) = arena.alloc_value_with_offset(record);
2707
2708 let hot_ref = HotVersionRef::new(epoch, offset, TxId::SYSTEM);
2710 let mut versions = self.node_versions.write();
2711 versions.insert(id, VersionIndex::with_initial(hot_ref));
2712
2713 let id_val = id.as_u64();
2715 let _ = self
2716 .next_node_id
2717 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
2718 if id_val >= current {
2719 Some(id_val + 1)
2720 } else {
2721 None
2722 }
2723 });
2724 }
2725
2726 #[cfg(not(feature = "tiered-storage"))]
2730 pub fn create_edge_with_id(&self, id: EdgeId, src: NodeId, dst: NodeId, edge_type: &str) {
2731 let epoch = self.current_epoch();
2732 let type_id = self.get_or_create_edge_type_id(edge_type);
2733
2734 let record = EdgeRecord::new(id, src, dst, type_id, epoch);
2735 let chain = VersionChain::with_initial(record, epoch, TxId::SYSTEM);
2736 self.edges.write().insert(id, chain);
2737
2738 self.forward_adj.add_edge(src, dst, id);
2740 if let Some(ref backward) = self.backward_adj {
2741 backward.add_edge(dst, src, id);
2742 }
2743
2744 let id_val = id.as_u64();
2746 let _ = self
2747 .next_edge_id
2748 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
2749 if id_val >= current {
2750 Some(id_val + 1)
2751 } else {
2752 None
2753 }
2754 });
2755 }
2756
2757 #[cfg(feature = "tiered-storage")]
2760 pub fn create_edge_with_id(&self, id: EdgeId, src: NodeId, dst: NodeId, edge_type: &str) {
2761 let epoch = self.current_epoch();
2762 let type_id = self.get_or_create_edge_type_id(edge_type);
2763
2764 let record = EdgeRecord::new(id, src, dst, type_id, epoch);
2765
2766 let arena = self.arena_allocator.arena_or_create(epoch);
2768 let (offset, _stored) = arena.alloc_value_with_offset(record);
2769
2770 let hot_ref = HotVersionRef::new(epoch, offset, TxId::SYSTEM);
2772 let mut versions = self.edge_versions.write();
2773 versions.insert(id, VersionIndex::with_initial(hot_ref));
2774
2775 self.forward_adj.add_edge(src, dst, id);
2777 if let Some(ref backward) = self.backward_adj {
2778 backward.add_edge(dst, src, id);
2779 }
2780
2781 let id_val = id.as_u64();
2783 let _ = self
2784 .next_edge_id
2785 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
2786 if id_val >= current {
2787 Some(id_val + 1)
2788 } else {
2789 None
2790 }
2791 });
2792 }
2793
2794 pub fn set_epoch(&self, epoch: EpochId) {
2796 self.current_epoch.store(epoch.as_u64(), Ordering::SeqCst);
2797 }
2798}
2799
2800impl Default for LpgStore {
2801 fn default() -> Self {
2802 Self::new()
2803 }
2804}
2805
2806#[cfg(test)]
2807mod tests {
2808 use super::*;
2809
2810 #[test]
2811 fn test_create_node() {
2812 let store = LpgStore::new();
2813
2814 let id = store.create_node(&["Person"]);
2815 assert!(id.is_valid());
2816
2817 let node = store.get_node(id).unwrap();
2818 assert!(node.has_label("Person"));
2819 assert!(!node.has_label("Animal"));
2820 }
2821
2822 #[test]
2823 fn test_create_node_with_props() {
2824 let store = LpgStore::new();
2825
2826 let id = store.create_node_with_props(
2827 &["Person"],
2828 [("name", Value::from("Alice")), ("age", Value::from(30i64))],
2829 );
2830
2831 let node = store.get_node(id).unwrap();
2832 assert_eq!(
2833 node.get_property("name").and_then(|v| v.as_str()),
2834 Some("Alice")
2835 );
2836 assert_eq!(
2837 node.get_property("age").and_then(|v| v.as_int64()),
2838 Some(30)
2839 );
2840 }
2841
2842 #[test]
2843 fn test_delete_node() {
2844 let store = LpgStore::new();
2845
2846 let id = store.create_node(&["Person"]);
2847 assert_eq!(store.node_count(), 1);
2848
2849 assert!(store.delete_node(id));
2850 assert_eq!(store.node_count(), 0);
2851 assert!(store.get_node(id).is_none());
2852
2853 assert!(!store.delete_node(id));
2855 }
2856
2857 #[test]
2858 fn test_create_edge() {
2859 let store = LpgStore::new();
2860
2861 let alice = store.create_node(&["Person"]);
2862 let bob = store.create_node(&["Person"]);
2863
2864 let edge_id = store.create_edge(alice, bob, "KNOWS");
2865 assert!(edge_id.is_valid());
2866
2867 let edge = store.get_edge(edge_id).unwrap();
2868 assert_eq!(edge.src, alice);
2869 assert_eq!(edge.dst, bob);
2870 assert_eq!(edge.edge_type.as_ref(), "KNOWS");
2871 }
2872
2873 #[test]
2874 fn test_neighbors() {
2875 let store = LpgStore::new();
2876
2877 let a = store.create_node(&["Person"]);
2878 let b = store.create_node(&["Person"]);
2879 let c = store.create_node(&["Person"]);
2880
2881 store.create_edge(a, b, "KNOWS");
2882 store.create_edge(a, c, "KNOWS");
2883
2884 let outgoing: Vec<_> = store.neighbors(a, Direction::Outgoing).collect();
2885 assert_eq!(outgoing.len(), 2);
2886 assert!(outgoing.contains(&b));
2887 assert!(outgoing.contains(&c));
2888
2889 let incoming: Vec<_> = store.neighbors(b, Direction::Incoming).collect();
2890 assert_eq!(incoming.len(), 1);
2891 assert!(incoming.contains(&a));
2892 }
2893
2894 #[test]
2895 fn test_nodes_by_label() {
2896 let store = LpgStore::new();
2897
2898 let p1 = store.create_node(&["Person"]);
2899 let p2 = store.create_node(&["Person"]);
2900 let _a = store.create_node(&["Animal"]);
2901
2902 let persons = store.nodes_by_label("Person");
2903 assert_eq!(persons.len(), 2);
2904 assert!(persons.contains(&p1));
2905 assert!(persons.contains(&p2));
2906
2907 let animals = store.nodes_by_label("Animal");
2908 assert_eq!(animals.len(), 1);
2909 }
2910
2911 #[test]
2912 fn test_delete_edge() {
2913 let store = LpgStore::new();
2914
2915 let a = store.create_node(&["Person"]);
2916 let b = store.create_node(&["Person"]);
2917 let edge_id = store.create_edge(a, b, "KNOWS");
2918
2919 assert_eq!(store.edge_count(), 1);
2920
2921 assert!(store.delete_edge(edge_id));
2922 assert_eq!(store.edge_count(), 0);
2923 assert!(store.get_edge(edge_id).is_none());
2924 }
2925
2926 #[test]
2929 fn test_lpg_store_config() {
2930 let config = LpgStoreConfig {
2932 backward_edges: false,
2933 initial_node_capacity: 100,
2934 initial_edge_capacity: 200,
2935 };
2936 let store = LpgStore::with_config(config);
2937
2938 let a = store.create_node(&["Person"]);
2940 let b = store.create_node(&["Person"]);
2941 store.create_edge(a, b, "KNOWS");
2942
2943 let outgoing: Vec<_> = store.neighbors(a, Direction::Outgoing).collect();
2945 assert_eq!(outgoing.len(), 1);
2946
2947 let incoming: Vec<_> = store.neighbors(b, Direction::Incoming).collect();
2949 assert_eq!(incoming.len(), 0);
2950 }
2951
2952 #[test]
2953 fn test_epoch_management() {
2954 let store = LpgStore::new();
2955
2956 let epoch0 = store.current_epoch();
2957 assert_eq!(epoch0.as_u64(), 0);
2958
2959 let epoch1 = store.new_epoch();
2960 assert_eq!(epoch1.as_u64(), 1);
2961
2962 let current = store.current_epoch();
2963 assert_eq!(current.as_u64(), 1);
2964 }
2965
2966 #[test]
2967 fn test_node_properties() {
2968 let store = LpgStore::new();
2969 let id = store.create_node(&["Person"]);
2970
2971 store.set_node_property(id, "name", Value::from("Alice"));
2973 let name = store.get_node_property(id, &"name".into());
2974 assert!(matches!(name, Some(Value::String(s)) if s.as_ref() == "Alice"));
2975
2976 store.set_node_property(id, "name", Value::from("Bob"));
2978 let name = store.get_node_property(id, &"name".into());
2979 assert!(matches!(name, Some(Value::String(s)) if s.as_ref() == "Bob"));
2980
2981 let old = store.remove_node_property(id, "name");
2983 assert!(matches!(old, Some(Value::String(s)) if s.as_ref() == "Bob"));
2984
2985 let name = store.get_node_property(id, &"name".into());
2987 assert!(name.is_none());
2988
2989 let none = store.remove_node_property(id, "nonexistent");
2991 assert!(none.is_none());
2992 }
2993
2994 #[test]
2995 fn test_edge_properties() {
2996 let store = LpgStore::new();
2997 let a = store.create_node(&["Person"]);
2998 let b = store.create_node(&["Person"]);
2999 let edge_id = store.create_edge(a, b, "KNOWS");
3000
3001 store.set_edge_property(edge_id, "since", Value::from(2020i64));
3003 let since = store.get_edge_property(edge_id, &"since".into());
3004 assert_eq!(since.and_then(|v| v.as_int64()), Some(2020));
3005
3006 let old = store.remove_edge_property(edge_id, "since");
3008 assert_eq!(old.and_then(|v| v.as_int64()), Some(2020));
3009
3010 let since = store.get_edge_property(edge_id, &"since".into());
3011 assert!(since.is_none());
3012 }
3013
3014 #[test]
3015 fn test_add_remove_label() {
3016 let store = LpgStore::new();
3017 let id = store.create_node(&["Person"]);
3018
3019 assert!(store.add_label(id, "Employee"));
3021
3022 let node = store.get_node(id).unwrap();
3023 assert!(node.has_label("Person"));
3024 assert!(node.has_label("Employee"));
3025
3026 assert!(!store.add_label(id, "Employee"));
3028
3029 assert!(store.remove_label(id, "Employee"));
3031
3032 let node = store.get_node(id).unwrap();
3033 assert!(node.has_label("Person"));
3034 assert!(!node.has_label("Employee"));
3035
3036 assert!(!store.remove_label(id, "Employee"));
3038 assert!(!store.remove_label(id, "NonExistent"));
3039 }
3040
3041 #[test]
3042 fn test_add_label_to_nonexistent_node() {
3043 let store = LpgStore::new();
3044 let fake_id = NodeId::new(999);
3045 assert!(!store.add_label(fake_id, "Label"));
3046 }
3047
3048 #[test]
3049 fn test_remove_label_from_nonexistent_node() {
3050 let store = LpgStore::new();
3051 let fake_id = NodeId::new(999);
3052 assert!(!store.remove_label(fake_id, "Label"));
3053 }
3054
3055 #[test]
3056 fn test_node_ids() {
3057 let store = LpgStore::new();
3058
3059 let n1 = store.create_node(&["Person"]);
3060 let n2 = store.create_node(&["Person"]);
3061 let n3 = store.create_node(&["Person"]);
3062
3063 let ids = store.node_ids();
3064 assert_eq!(ids.len(), 3);
3065 assert!(ids.contains(&n1));
3066 assert!(ids.contains(&n2));
3067 assert!(ids.contains(&n3));
3068
3069 store.delete_node(n2);
3071 let ids = store.node_ids();
3072 assert_eq!(ids.len(), 2);
3073 assert!(!ids.contains(&n2));
3074 }
3075
3076 #[test]
3077 fn test_delete_node_nonexistent() {
3078 let store = LpgStore::new();
3079 let fake_id = NodeId::new(999);
3080 assert!(!store.delete_node(fake_id));
3081 }
3082
3083 #[test]
3084 fn test_delete_edge_nonexistent() {
3085 let store = LpgStore::new();
3086 let fake_id = EdgeId::new(999);
3087 assert!(!store.delete_edge(fake_id));
3088 }
3089
3090 #[test]
3091 fn test_delete_edge_double() {
3092 let store = LpgStore::new();
3093 let a = store.create_node(&["Person"]);
3094 let b = store.create_node(&["Person"]);
3095 let edge_id = store.create_edge(a, b, "KNOWS");
3096
3097 assert!(store.delete_edge(edge_id));
3098 assert!(!store.delete_edge(edge_id)); }
3100
3101 #[test]
3102 fn test_create_edge_with_props() {
3103 let store = LpgStore::new();
3104 let a = store.create_node(&["Person"]);
3105 let b = store.create_node(&["Person"]);
3106
3107 let edge_id = store.create_edge_with_props(
3108 a,
3109 b,
3110 "KNOWS",
3111 [
3112 ("since", Value::from(2020i64)),
3113 ("weight", Value::from(1.0)),
3114 ],
3115 );
3116
3117 let edge = store.get_edge(edge_id).unwrap();
3118 assert_eq!(
3119 edge.get_property("since").and_then(|v| v.as_int64()),
3120 Some(2020)
3121 );
3122 assert_eq!(
3123 edge.get_property("weight").and_then(|v| v.as_float64()),
3124 Some(1.0)
3125 );
3126 }
3127
3128 #[test]
3129 fn test_delete_node_edges() {
3130 let store = LpgStore::new();
3131
3132 let a = store.create_node(&["Person"]);
3133 let b = store.create_node(&["Person"]);
3134 let c = store.create_node(&["Person"]);
3135
3136 store.create_edge(a, b, "KNOWS"); store.create_edge(c, a, "KNOWS"); assert_eq!(store.edge_count(), 2);
3140
3141 store.delete_node_edges(a);
3143
3144 assert_eq!(store.edge_count(), 0);
3145 }
3146
3147 #[test]
3148 fn test_neighbors_both_directions() {
3149 let store = LpgStore::new();
3150
3151 let a = store.create_node(&["Person"]);
3152 let b = store.create_node(&["Person"]);
3153 let c = store.create_node(&["Person"]);
3154
3155 store.create_edge(a, b, "KNOWS"); store.create_edge(c, a, "KNOWS"); let neighbors: Vec<_> = store.neighbors(a, Direction::Both).collect();
3160 assert_eq!(neighbors.len(), 2);
3161 assert!(neighbors.contains(&b)); assert!(neighbors.contains(&c)); }
3164
3165 #[test]
3166 fn test_edges_from() {
3167 let store = LpgStore::new();
3168
3169 let a = store.create_node(&["Person"]);
3170 let b = store.create_node(&["Person"]);
3171 let c = store.create_node(&["Person"]);
3172
3173 let e1 = store.create_edge(a, b, "KNOWS");
3174 let e2 = store.create_edge(a, c, "KNOWS");
3175
3176 let edges: Vec<_> = store.edges_from(a, Direction::Outgoing).collect();
3177 assert_eq!(edges.len(), 2);
3178 assert!(edges.iter().any(|(_, e)| *e == e1));
3179 assert!(edges.iter().any(|(_, e)| *e == e2));
3180
3181 let incoming: Vec<_> = store.edges_from(b, Direction::Incoming).collect();
3183 assert_eq!(incoming.len(), 1);
3184 assert_eq!(incoming[0].1, e1);
3185 }
3186
3187 #[test]
3188 fn test_edges_to() {
3189 let store = LpgStore::new();
3190
3191 let a = store.create_node(&["Person"]);
3192 let b = store.create_node(&["Person"]);
3193 let c = store.create_node(&["Person"]);
3194
3195 let e1 = store.create_edge(a, b, "KNOWS");
3196 let e2 = store.create_edge(c, b, "KNOWS");
3197
3198 let to_b = store.edges_to(b);
3200 assert_eq!(to_b.len(), 2);
3201 assert!(to_b.iter().any(|(src, e)| *src == a && *e == e1));
3202 assert!(to_b.iter().any(|(src, e)| *src == c && *e == e2));
3203 }
3204
3205 #[test]
3206 fn test_out_degree_in_degree() {
3207 let store = LpgStore::new();
3208
3209 let a = store.create_node(&["Person"]);
3210 let b = store.create_node(&["Person"]);
3211 let c = store.create_node(&["Person"]);
3212
3213 store.create_edge(a, b, "KNOWS");
3214 store.create_edge(a, c, "KNOWS");
3215 store.create_edge(c, b, "KNOWS");
3216
3217 assert_eq!(store.out_degree(a), 2);
3218 assert_eq!(store.out_degree(b), 0);
3219 assert_eq!(store.out_degree(c), 1);
3220
3221 assert_eq!(store.in_degree(a), 0);
3222 assert_eq!(store.in_degree(b), 2);
3223 assert_eq!(store.in_degree(c), 1);
3224 }
3225
3226 #[test]
3227 fn test_edge_type() {
3228 let store = LpgStore::new();
3229
3230 let a = store.create_node(&["Person"]);
3231 let b = store.create_node(&["Person"]);
3232 let edge_id = store.create_edge(a, b, "KNOWS");
3233
3234 let edge_type = store.edge_type(edge_id);
3235 assert_eq!(edge_type.as_deref(), Some("KNOWS"));
3236
3237 let fake_id = EdgeId::new(999);
3239 assert!(store.edge_type(fake_id).is_none());
3240 }
3241
3242 #[test]
3243 fn test_count_methods() {
3244 let store = LpgStore::new();
3245
3246 assert_eq!(store.label_count(), 0);
3247 assert_eq!(store.edge_type_count(), 0);
3248 assert_eq!(store.property_key_count(), 0);
3249
3250 let a = store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
3251 let b = store.create_node(&["Company"]);
3252 store.create_edge_with_props(a, b, "WORKS_AT", [("since", Value::from(2020i64))]);
3253
3254 assert_eq!(store.label_count(), 2); assert_eq!(store.edge_type_count(), 1); assert_eq!(store.property_key_count(), 2); }
3258
3259 #[test]
3260 fn test_all_nodes_and_edges() {
3261 let store = LpgStore::new();
3262
3263 let a = store.create_node(&["Person"]);
3264 let b = store.create_node(&["Person"]);
3265 store.create_edge(a, b, "KNOWS");
3266
3267 let nodes: Vec<_> = store.all_nodes().collect();
3268 assert_eq!(nodes.len(), 2);
3269
3270 let edges: Vec<_> = store.all_edges().collect();
3271 assert_eq!(edges.len(), 1);
3272 }
3273
3274 #[test]
3275 fn test_all_labels_and_edge_types() {
3276 let store = LpgStore::new();
3277
3278 store.create_node(&["Person"]);
3279 store.create_node(&["Company"]);
3280 let a = store.create_node(&["Animal"]);
3281 let b = store.create_node(&["Animal"]);
3282 store.create_edge(a, b, "EATS");
3283
3284 let labels = store.all_labels();
3285 assert_eq!(labels.len(), 3);
3286 assert!(labels.contains(&"Person".to_string()));
3287 assert!(labels.contains(&"Company".to_string()));
3288 assert!(labels.contains(&"Animal".to_string()));
3289
3290 let edge_types = store.all_edge_types();
3291 assert_eq!(edge_types.len(), 1);
3292 assert!(edge_types.contains(&"EATS".to_string()));
3293 }
3294
3295 #[test]
3296 fn test_all_property_keys() {
3297 let store = LpgStore::new();
3298
3299 let a = store.create_node_with_props(&["Person"], [("name", Value::from("Alice"))]);
3300 let b = store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
3301 store.create_edge_with_props(a, b, "KNOWS", [("since", Value::from(2020i64))]);
3302
3303 let keys = store.all_property_keys();
3304 assert!(keys.contains(&"name".to_string()));
3305 assert!(keys.contains(&"age".to_string()));
3306 assert!(keys.contains(&"since".to_string()));
3307 }
3308
3309 #[test]
3310 fn test_nodes_with_label() {
3311 let store = LpgStore::new();
3312
3313 store.create_node(&["Person"]);
3314 store.create_node(&["Person"]);
3315 store.create_node(&["Company"]);
3316
3317 let persons: Vec<_> = store.nodes_with_label("Person").collect();
3318 assert_eq!(persons.len(), 2);
3319
3320 let companies: Vec<_> = store.nodes_with_label("Company").collect();
3321 assert_eq!(companies.len(), 1);
3322
3323 let none: Vec<_> = store.nodes_with_label("NonExistent").collect();
3324 assert_eq!(none.len(), 0);
3325 }
3326
3327 #[test]
3328 fn test_edges_with_type() {
3329 let store = LpgStore::new();
3330
3331 let a = store.create_node(&["Person"]);
3332 let b = store.create_node(&["Person"]);
3333 let c = store.create_node(&["Company"]);
3334
3335 store.create_edge(a, b, "KNOWS");
3336 store.create_edge(a, c, "WORKS_AT");
3337
3338 let knows: Vec<_> = store.edges_with_type("KNOWS").collect();
3339 assert_eq!(knows.len(), 1);
3340
3341 let works_at: Vec<_> = store.edges_with_type("WORKS_AT").collect();
3342 assert_eq!(works_at.len(), 1);
3343
3344 let none: Vec<_> = store.edges_with_type("NonExistent").collect();
3345 assert_eq!(none.len(), 0);
3346 }
3347
3348 #[test]
3349 fn test_nodes_by_label_nonexistent() {
3350 let store = LpgStore::new();
3351 store.create_node(&["Person"]);
3352
3353 let empty = store.nodes_by_label("NonExistent");
3354 assert!(empty.is_empty());
3355 }
3356
3357 #[test]
3358 fn test_statistics() {
3359 let store = LpgStore::new();
3360
3361 let a = store.create_node(&["Person"]);
3362 let b = store.create_node(&["Person"]);
3363 let c = store.create_node(&["Company"]);
3364
3365 store.create_edge(a, b, "KNOWS");
3366 store.create_edge(a, c, "WORKS_AT");
3367
3368 store.compute_statistics();
3369 let stats = store.statistics();
3370
3371 assert_eq!(stats.total_nodes, 3);
3372 assert_eq!(stats.total_edges, 2);
3373
3374 let person_card = store.estimate_label_cardinality("Person");
3376 assert!(person_card > 0.0);
3377
3378 let avg_degree = store.estimate_avg_degree("KNOWS", true);
3379 assert!(avg_degree >= 0.0);
3380 }
3381
3382 #[test]
3383 fn test_zone_maps() {
3384 let store = LpgStore::new();
3385
3386 store.create_node_with_props(&["Person"], [("age", Value::from(25i64))]);
3387 store.create_node_with_props(&["Person"], [("age", Value::from(35i64))]);
3388
3389 let might_match =
3391 store.node_property_might_match(&"age".into(), CompareOp::Eq, &Value::from(30i64));
3392 assert!(might_match);
3394
3395 let zone = store.node_property_zone_map(&"age".into());
3396 assert!(zone.is_some());
3397
3398 let no_zone = store.node_property_zone_map(&"nonexistent".into());
3400 assert!(no_zone.is_none());
3401
3402 let a = store.create_node(&["A"]);
3404 let b = store.create_node(&["B"]);
3405 store.create_edge_with_props(a, b, "REL", [("weight", Value::from(1.0))]);
3406
3407 let edge_zone = store.edge_property_zone_map(&"weight".into());
3408 assert!(edge_zone.is_some());
3409 }
3410
3411 #[test]
3412 fn test_rebuild_zone_maps() {
3413 let store = LpgStore::new();
3414 store.create_node_with_props(&["Person"], [("age", Value::from(25i64))]);
3415
3416 store.rebuild_zone_maps();
3418 }
3419
3420 #[test]
3421 fn test_create_node_with_id() {
3422 let store = LpgStore::new();
3423
3424 let specific_id = NodeId::new(100);
3425 store.create_node_with_id(specific_id, &["Person", "Employee"]);
3426
3427 let node = store.get_node(specific_id).unwrap();
3428 assert!(node.has_label("Person"));
3429 assert!(node.has_label("Employee"));
3430
3431 let next = store.create_node(&["Other"]);
3433 assert!(next.as_u64() > 100);
3434 }
3435
3436 #[test]
3437 fn test_create_edge_with_id() {
3438 let store = LpgStore::new();
3439
3440 let a = store.create_node(&["A"]);
3441 let b = store.create_node(&["B"]);
3442
3443 let specific_id = EdgeId::new(500);
3444 store.create_edge_with_id(specific_id, a, b, "REL");
3445
3446 let edge = store.get_edge(specific_id).unwrap();
3447 assert_eq!(edge.src, a);
3448 assert_eq!(edge.dst, b);
3449 assert_eq!(edge.edge_type.as_ref(), "REL");
3450
3451 let next = store.create_edge(a, b, "OTHER");
3453 assert!(next.as_u64() > 500);
3454 }
3455
3456 #[test]
3457 fn test_set_epoch() {
3458 let store = LpgStore::new();
3459
3460 assert_eq!(store.current_epoch().as_u64(), 0);
3461
3462 store.set_epoch(EpochId::new(42));
3463 assert_eq!(store.current_epoch().as_u64(), 42);
3464 }
3465
3466 #[test]
3467 fn test_get_node_nonexistent() {
3468 let store = LpgStore::new();
3469 let fake_id = NodeId::new(999);
3470 assert!(store.get_node(fake_id).is_none());
3471 }
3472
3473 #[test]
3474 fn test_get_edge_nonexistent() {
3475 let store = LpgStore::new();
3476 let fake_id = EdgeId::new(999);
3477 assert!(store.get_edge(fake_id).is_none());
3478 }
3479
3480 #[test]
3481 fn test_multiple_labels() {
3482 let store = LpgStore::new();
3483
3484 let id = store.create_node(&["Person", "Employee", "Manager"]);
3485 let node = store.get_node(id).unwrap();
3486
3487 assert!(node.has_label("Person"));
3488 assert!(node.has_label("Employee"));
3489 assert!(node.has_label("Manager"));
3490 assert!(!node.has_label("Other"));
3491 }
3492
3493 #[test]
3494 fn test_default_impl() {
3495 let store: LpgStore = Default::default();
3496 assert_eq!(store.node_count(), 0);
3497 assert_eq!(store.edge_count(), 0);
3498 }
3499
3500 #[test]
3501 fn test_edges_from_both_directions() {
3502 let store = LpgStore::new();
3503
3504 let a = store.create_node(&["A"]);
3505 let b = store.create_node(&["B"]);
3506 let c = store.create_node(&["C"]);
3507
3508 let e1 = store.create_edge(a, b, "R1"); let e2 = store.create_edge(c, a, "R2"); let edges: Vec<_> = store.edges_from(a, Direction::Both).collect();
3513 assert_eq!(edges.len(), 2);
3514 assert!(edges.iter().any(|(_, e)| *e == e1)); assert!(edges.iter().any(|(_, e)| *e == e2)); }
3517
3518 #[test]
3519 fn test_no_backward_adj_in_degree() {
3520 let config = LpgStoreConfig {
3521 backward_edges: false,
3522 initial_node_capacity: 10,
3523 initial_edge_capacity: 10,
3524 };
3525 let store = LpgStore::with_config(config);
3526
3527 let a = store.create_node(&["A"]);
3528 let b = store.create_node(&["B"]);
3529 store.create_edge(a, b, "R");
3530
3531 let degree = store.in_degree(b);
3533 assert_eq!(degree, 1);
3534 }
3535
3536 #[test]
3537 fn test_no_backward_adj_edges_to() {
3538 let config = LpgStoreConfig {
3539 backward_edges: false,
3540 initial_node_capacity: 10,
3541 initial_edge_capacity: 10,
3542 };
3543 let store = LpgStore::with_config(config);
3544
3545 let a = store.create_node(&["A"]);
3546 let b = store.create_node(&["B"]);
3547 let e = store.create_edge(a, b, "R");
3548
3549 let edges = store.edges_to(b);
3551 assert_eq!(edges.len(), 1);
3552 assert_eq!(edges[0].1, e);
3553 }
3554
3555 #[test]
3556 fn test_node_versioned_creation() {
3557 let store = LpgStore::new();
3558
3559 let epoch = store.new_epoch();
3560 let tx_id = TxId::new(1);
3561
3562 let id = store.create_node_versioned(&["Person"], epoch, tx_id);
3563 assert!(store.get_node(id).is_some());
3564 }
3565
3566 #[test]
3567 fn test_edge_versioned_creation() {
3568 let store = LpgStore::new();
3569
3570 let a = store.create_node(&["A"]);
3571 let b = store.create_node(&["B"]);
3572
3573 let epoch = store.new_epoch();
3574 let tx_id = TxId::new(1);
3575
3576 let edge_id = store.create_edge_versioned(a, b, "REL", epoch, tx_id);
3577 assert!(store.get_edge(edge_id).is_some());
3578 }
3579
3580 #[test]
3581 fn test_node_with_props_versioned() {
3582 let store = LpgStore::new();
3583
3584 let epoch = store.new_epoch();
3585 let tx_id = TxId::new(1);
3586
3587 let id = store.create_node_with_props_versioned(
3588 &["Person"],
3589 [("name", Value::from("Alice"))],
3590 epoch,
3591 tx_id,
3592 );
3593
3594 let node = store.get_node(id).unwrap();
3595 assert_eq!(
3596 node.get_property("name").and_then(|v| v.as_str()),
3597 Some("Alice")
3598 );
3599 }
3600
3601 #[test]
3602 fn test_discard_uncommitted_versions() {
3603 let store = LpgStore::new();
3604
3605 let epoch = store.new_epoch();
3606 let tx_id = TxId::new(42);
3607
3608 let node_id = store.create_node_versioned(&["Person"], epoch, tx_id);
3610 assert!(store.get_node(node_id).is_some());
3611
3612 store.discard_uncommitted_versions(tx_id);
3614
3615 assert!(store.get_node(node_id).is_none());
3617 }
3618
3619 #[test]
3622 fn test_property_index_create_and_lookup() {
3623 let store = LpgStore::new();
3624
3625 let alice = store.create_node(&["Person"]);
3627 let bob = store.create_node(&["Person"]);
3628 let charlie = store.create_node(&["Person"]);
3629
3630 store.set_node_property(alice, "city", Value::from("NYC"));
3631 store.set_node_property(bob, "city", Value::from("NYC"));
3632 store.set_node_property(charlie, "city", Value::from("LA"));
3633
3634 let nyc_people = store.find_nodes_by_property("city", &Value::from("NYC"));
3636 assert_eq!(nyc_people.len(), 2);
3637
3638 store.create_property_index("city");
3640 assert!(store.has_property_index("city"));
3641
3642 let nyc_people = store.find_nodes_by_property("city", &Value::from("NYC"));
3644 assert_eq!(nyc_people.len(), 2);
3645 assert!(nyc_people.contains(&alice));
3646 assert!(nyc_people.contains(&bob));
3647
3648 let la_people = store.find_nodes_by_property("city", &Value::from("LA"));
3649 assert_eq!(la_people.len(), 1);
3650 assert!(la_people.contains(&charlie));
3651 }
3652
3653 #[test]
3654 fn test_property_index_maintained_on_update() {
3655 let store = LpgStore::new();
3656
3657 store.create_property_index("status");
3659
3660 let node = store.create_node(&["Task"]);
3661 store.set_node_property(node, "status", Value::from("pending"));
3662
3663 let pending = store.find_nodes_by_property("status", &Value::from("pending"));
3665 assert_eq!(pending.len(), 1);
3666 assert!(pending.contains(&node));
3667
3668 store.set_node_property(node, "status", Value::from("done"));
3670
3671 let pending = store.find_nodes_by_property("status", &Value::from("pending"));
3673 assert!(pending.is_empty());
3674
3675 let done = store.find_nodes_by_property("status", &Value::from("done"));
3677 assert_eq!(done.len(), 1);
3678 assert!(done.contains(&node));
3679 }
3680
3681 #[test]
3682 fn test_property_index_maintained_on_remove() {
3683 let store = LpgStore::new();
3684
3685 store.create_property_index("tag");
3686
3687 let node = store.create_node(&["Item"]);
3688 store.set_node_property(node, "tag", Value::from("important"));
3689
3690 let found = store.find_nodes_by_property("tag", &Value::from("important"));
3692 assert_eq!(found.len(), 1);
3693
3694 store.remove_node_property(node, "tag");
3696
3697 let found = store.find_nodes_by_property("tag", &Value::from("important"));
3699 assert!(found.is_empty());
3700 }
3701
3702 #[test]
3703 fn test_property_index_drop() {
3704 let store = LpgStore::new();
3705
3706 store.create_property_index("key");
3707 assert!(store.has_property_index("key"));
3708
3709 assert!(store.drop_property_index("key"));
3710 assert!(!store.has_property_index("key"));
3711
3712 assert!(!store.drop_property_index("key"));
3714 }
3715
3716 #[test]
3717 fn test_property_index_multiple_values() {
3718 let store = LpgStore::new();
3719
3720 store.create_property_index("age");
3721
3722 let n1 = store.create_node(&["Person"]);
3724 let n2 = store.create_node(&["Person"]);
3725 let n3 = store.create_node(&["Person"]);
3726 let n4 = store.create_node(&["Person"]);
3727
3728 store.set_node_property(n1, "age", Value::from(25i64));
3729 store.set_node_property(n2, "age", Value::from(25i64));
3730 store.set_node_property(n3, "age", Value::from(30i64));
3731 store.set_node_property(n4, "age", Value::from(25i64));
3732
3733 let age_25 = store.find_nodes_by_property("age", &Value::from(25i64));
3734 assert_eq!(age_25.len(), 3);
3735
3736 let age_30 = store.find_nodes_by_property("age", &Value::from(30i64));
3737 assert_eq!(age_30.len(), 1);
3738
3739 let age_40 = store.find_nodes_by_property("age", &Value::from(40i64));
3740 assert!(age_40.is_empty());
3741 }
3742
3743 #[test]
3744 fn test_property_index_builds_from_existing_data() {
3745 let store = LpgStore::new();
3746
3747 let n1 = store.create_node(&["Person"]);
3749 let n2 = store.create_node(&["Person"]);
3750 store.set_node_property(n1, "email", Value::from("alice@example.com"));
3751 store.set_node_property(n2, "email", Value::from("bob@example.com"));
3752
3753 store.create_property_index("email");
3755
3756 let alice = store.find_nodes_by_property("email", &Value::from("alice@example.com"));
3758 assert_eq!(alice.len(), 1);
3759 assert!(alice.contains(&n1));
3760
3761 let bob = store.find_nodes_by_property("email", &Value::from("bob@example.com"));
3762 assert_eq!(bob.len(), 1);
3763 assert!(bob.contains(&n2));
3764 }
3765}