1use super::property::CompareOp;
13use super::{Edge, EdgeRecord, Node, NodeRecord, PropertyStorage};
14use crate::graph::Direction;
15use crate::index::adjacency::ChunkedAdjacency;
16use crate::index::zone_map::ZoneMapEntry;
17use crate::statistics::{EdgeTypeStatistics, LabelStatistics, Statistics};
18use dashmap::DashMap;
19#[cfg(not(feature = "tiered-storage"))]
20use grafeo_common::mvcc::VersionChain;
21use grafeo_common::types::{EdgeId, EpochId, HashableValue, NodeId, PropertyKey, TxId, Value};
22use grafeo_common::utils::hash::{FxHashMap, FxHashSet};
23use parking_lot::RwLock;
24use std::cmp::Ordering as CmpOrdering;
25use std::sync::Arc;
26use std::sync::atomic::{AtomicU64, Ordering};
27
28fn compare_values_for_range(a: &Value, b: &Value) -> Option<CmpOrdering> {
30 match (a, b) {
31 (Value::Int64(a), Value::Int64(b)) => Some(a.cmp(b)),
32 (Value::Float64(a), Value::Float64(b)) => a.partial_cmp(b),
33 (Value::String(a), Value::String(b)) => Some(a.cmp(b)),
34 (Value::Bool(a), Value::Bool(b)) => Some(a.cmp(b)),
35 _ => None,
36 }
37}
38
39fn value_in_range(
41 value: &Value,
42 min: Option<&Value>,
43 max: Option<&Value>,
44 min_inclusive: bool,
45 max_inclusive: bool,
46) -> bool {
47 if let Some(min_val) = min {
49 match compare_values_for_range(value, min_val) {
50 Some(CmpOrdering::Less) => return false,
51 Some(CmpOrdering::Equal) if !min_inclusive => return false,
52 None => return false, _ => {}
54 }
55 }
56
57 if let Some(max_val) = max {
59 match compare_values_for_range(value, max_val) {
60 Some(CmpOrdering::Greater) => return false,
61 Some(CmpOrdering::Equal) if !max_inclusive => return false,
62 None => return false,
63 _ => {}
64 }
65 }
66
67 true
68}
69
70#[cfg(feature = "tiered-storage")]
72use crate::storage::EpochStore;
73#[cfg(feature = "tiered-storage")]
74use grafeo_common::memory::arena::ArenaAllocator;
75#[cfg(feature = "tiered-storage")]
76use grafeo_common::mvcc::{ColdVersionRef, HotVersionRef, VersionIndex, VersionRef};
77
78#[derive(Debug, Clone)]
84pub struct LpgStoreConfig {
85 pub backward_edges: bool,
88 pub initial_node_capacity: usize,
90 pub initial_edge_capacity: usize,
92}
93
94impl Default for LpgStoreConfig {
95 fn default() -> Self {
96 Self {
97 backward_edges: true,
98 initial_node_capacity: 1024,
99 initial_edge_capacity: 4096,
100 }
101 }
102}
103
104pub struct LpgStore {
132 #[allow(dead_code)]
134 config: LpgStoreConfig,
135
136 #[cfg(not(feature = "tiered-storage"))]
139 nodes: RwLock<FxHashMap<NodeId, VersionChain<NodeRecord>>>,
140
141 #[cfg(not(feature = "tiered-storage"))]
144 edges: RwLock<FxHashMap<EdgeId, VersionChain<EdgeRecord>>>,
145
146 #[cfg(feature = "tiered-storage")]
150 arena_allocator: Arc<ArenaAllocator>,
151
152 #[cfg(feature = "tiered-storage")]
155 node_versions: RwLock<FxHashMap<NodeId, VersionIndex>>,
156
157 #[cfg(feature = "tiered-storage")]
160 edge_versions: RwLock<FxHashMap<EdgeId, VersionIndex>>,
161
162 #[cfg(feature = "tiered-storage")]
165 epoch_store: Arc<EpochStore>,
166
167 node_properties: PropertyStorage<NodeId>,
169
170 edge_properties: PropertyStorage<EdgeId>,
172
173 label_to_id: RwLock<FxHashMap<Arc<str>, u32>>,
175
176 id_to_label: RwLock<Vec<Arc<str>>>,
178
179 edge_type_to_id: RwLock<FxHashMap<Arc<str>, u32>>,
181
182 id_to_edge_type: RwLock<Vec<Arc<str>>>,
184
185 forward_adj: ChunkedAdjacency,
187
188 backward_adj: Option<ChunkedAdjacency>,
191
192 label_index: RwLock<Vec<FxHashMap<NodeId, ()>>>,
194
195 node_labels: RwLock<FxHashMap<NodeId, FxHashSet<u32>>>,
198
199 property_indexes: RwLock<FxHashMap<PropertyKey, DashMap<HashableValue, FxHashSet<NodeId>>>>,
204
205 next_node_id: AtomicU64,
207
208 next_edge_id: AtomicU64,
210
211 current_epoch: AtomicU64,
213
214 statistics: RwLock<Statistics>,
216}
217
218impl LpgStore {
219 #[must_use]
221 pub fn new() -> Self {
222 Self::with_config(LpgStoreConfig::default())
223 }
224
225 #[must_use]
227 pub fn with_config(config: LpgStoreConfig) -> Self {
228 let backward_adj = if config.backward_edges {
229 Some(ChunkedAdjacency::new())
230 } else {
231 None
232 };
233
234 Self {
235 #[cfg(not(feature = "tiered-storage"))]
236 nodes: RwLock::new(FxHashMap::default()),
237 #[cfg(not(feature = "tiered-storage"))]
238 edges: RwLock::new(FxHashMap::default()),
239 #[cfg(feature = "tiered-storage")]
240 arena_allocator: Arc::new(ArenaAllocator::new()),
241 #[cfg(feature = "tiered-storage")]
242 node_versions: RwLock::new(FxHashMap::default()),
243 #[cfg(feature = "tiered-storage")]
244 edge_versions: RwLock::new(FxHashMap::default()),
245 #[cfg(feature = "tiered-storage")]
246 epoch_store: Arc::new(EpochStore::new()),
247 node_properties: PropertyStorage::new(),
248 edge_properties: PropertyStorage::new(),
249 label_to_id: RwLock::new(FxHashMap::default()),
250 id_to_label: RwLock::new(Vec::new()),
251 edge_type_to_id: RwLock::new(FxHashMap::default()),
252 id_to_edge_type: RwLock::new(Vec::new()),
253 forward_adj: ChunkedAdjacency::new(),
254 backward_adj,
255 label_index: RwLock::new(Vec::new()),
256 node_labels: RwLock::new(FxHashMap::default()),
257 property_indexes: RwLock::new(FxHashMap::default()),
258 next_node_id: AtomicU64::new(0),
259 next_edge_id: AtomicU64::new(0),
260 current_epoch: AtomicU64::new(0),
261 statistics: RwLock::new(Statistics::new()),
262 config,
263 }
264 }
265
266 #[must_use]
268 pub fn current_epoch(&self) -> EpochId {
269 EpochId::new(self.current_epoch.load(Ordering::Acquire))
270 }
271
272 pub fn new_epoch(&self) -> EpochId {
274 let id = self.current_epoch.fetch_add(1, Ordering::AcqRel) + 1;
275 EpochId::new(id)
276 }
277
278 pub fn create_node(&self, labels: &[&str]) -> NodeId {
284 self.create_node_versioned(labels, self.current_epoch(), TxId::SYSTEM)
285 }
286
287 #[cfg(not(feature = "tiered-storage"))]
289 pub fn create_node_versioned(&self, labels: &[&str], epoch: EpochId, tx_id: TxId) -> NodeId {
290 let id = NodeId::new(self.next_node_id.fetch_add(1, Ordering::Relaxed));
291
292 let mut record = NodeRecord::new(id, epoch);
293 record.set_label_count(labels.len() as u16);
294
295 let mut node_label_set = FxHashSet::default();
297 for label in labels {
298 let label_id = self.get_or_create_label_id(*label);
299 node_label_set.insert(label_id);
300
301 let mut index = self.label_index.write();
303 while index.len() <= label_id as usize {
304 index.push(FxHashMap::default());
305 }
306 index[label_id as usize].insert(id, ());
307 }
308
309 self.node_labels.write().insert(id, node_label_set);
311
312 let chain = VersionChain::with_initial(record, epoch, tx_id);
314 self.nodes.write().insert(id, chain);
315 id
316 }
317
318 #[cfg(feature = "tiered-storage")]
321 pub fn create_node_versioned(&self, labels: &[&str], epoch: EpochId, tx_id: TxId) -> NodeId {
322 let id = NodeId::new(self.next_node_id.fetch_add(1, Ordering::Relaxed));
323
324 let mut record = NodeRecord::new(id, epoch);
325 record.set_label_count(labels.len() as u16);
326
327 let mut node_label_set = FxHashSet::default();
329 for label in labels {
330 let label_id = self.get_or_create_label_id(*label);
331 node_label_set.insert(label_id);
332
333 let mut index = self.label_index.write();
335 while index.len() <= label_id as usize {
336 index.push(FxHashMap::default());
337 }
338 index[label_id as usize].insert(id, ());
339 }
340
341 self.node_labels.write().insert(id, node_label_set);
343
344 let arena = self.arena_allocator.arena_or_create(epoch);
346 let (offset, _stored) = arena.alloc_value_with_offset(record);
347
348 let hot_ref = HotVersionRef::new(epoch, offset, tx_id);
350
351 let mut versions = self.node_versions.write();
353 if let Some(index) = versions.get_mut(&id) {
354 index.add_hot(hot_ref);
355 } else {
356 versions.insert(id, VersionIndex::with_initial(hot_ref));
357 }
358
359 id
360 }
361
362 pub fn create_node_with_props(
364 &self,
365 labels: &[&str],
366 properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
367 ) -> NodeId {
368 self.create_node_with_props_versioned(
369 labels,
370 properties,
371 self.current_epoch(),
372 TxId::SYSTEM,
373 )
374 }
375
376 #[cfg(not(feature = "tiered-storage"))]
378 pub fn create_node_with_props_versioned(
379 &self,
380 labels: &[&str],
381 properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
382 epoch: EpochId,
383 tx_id: TxId,
384 ) -> NodeId {
385 let id = self.create_node_versioned(labels, epoch, tx_id);
386
387 for (key, value) in properties {
388 self.node_properties.set(id, key.into(), value.into());
389 }
390
391 let count = self.node_properties.get_all(id).len() as u16;
393 if let Some(chain) = self.nodes.write().get_mut(&id) {
394 if let Some(record) = chain.latest_mut() {
395 record.props_count = count;
396 }
397 }
398
399 id
400 }
401
402 #[cfg(feature = "tiered-storage")]
405 pub fn create_node_with_props_versioned(
406 &self,
407 labels: &[&str],
408 properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
409 epoch: EpochId,
410 tx_id: TxId,
411 ) -> NodeId {
412 let id = self.create_node_versioned(labels, epoch, tx_id);
413
414 for (key, value) in properties {
415 self.node_properties.set(id, key.into(), value.into());
416 }
417
418 id
422 }
423
424 #[must_use]
426 pub fn get_node(&self, id: NodeId) -> Option<Node> {
427 self.get_node_at_epoch(id, self.current_epoch())
428 }
429
430 #[must_use]
432 #[cfg(not(feature = "tiered-storage"))]
433 pub fn get_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> Option<Node> {
434 let nodes = self.nodes.read();
435 let chain = nodes.get(&id)?;
436 let record = chain.visible_at(epoch)?;
437
438 if record.is_deleted() {
439 return None;
440 }
441
442 let mut node = Node::new(id);
443
444 let id_to_label = self.id_to_label.read();
446 let node_labels = self.node_labels.read();
447 if let Some(label_ids) = node_labels.get(&id) {
448 for &label_id in label_ids {
449 if let Some(label) = id_to_label.get(label_id as usize) {
450 node.labels.push(label.clone());
451 }
452 }
453 }
454
455 node.properties = self.node_properties.get_all(id).into_iter().collect();
457
458 Some(node)
459 }
460
461 #[must_use]
464 #[cfg(feature = "tiered-storage")]
465 pub fn get_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> Option<Node> {
466 let versions = self.node_versions.read();
467 let index = versions.get(&id)?;
468 let version_ref = index.visible_at(epoch)?;
469
470 let record = self.read_node_record(&version_ref)?;
472
473 if record.is_deleted() {
474 return None;
475 }
476
477 let mut node = Node::new(id);
478
479 let id_to_label = self.id_to_label.read();
481 let node_labels = self.node_labels.read();
482 if let Some(label_ids) = node_labels.get(&id) {
483 for &label_id in label_ids {
484 if let Some(label) = id_to_label.get(label_id as usize) {
485 node.labels.push(label.clone());
486 }
487 }
488 }
489
490 node.properties = self.node_properties.get_all(id).into_iter().collect();
492
493 Some(node)
494 }
495
496 #[must_use]
498 #[cfg(not(feature = "tiered-storage"))]
499 pub fn get_node_versioned(&self, id: NodeId, epoch: EpochId, tx_id: TxId) -> Option<Node> {
500 let nodes = self.nodes.read();
501 let chain = nodes.get(&id)?;
502 let record = chain.visible_to(epoch, tx_id)?;
503
504 if record.is_deleted() {
505 return None;
506 }
507
508 let mut node = Node::new(id);
509
510 let id_to_label = self.id_to_label.read();
512 let node_labels = self.node_labels.read();
513 if let Some(label_ids) = node_labels.get(&id) {
514 for &label_id in label_ids {
515 if let Some(label) = id_to_label.get(label_id as usize) {
516 node.labels.push(label.clone());
517 }
518 }
519 }
520
521 node.properties = self.node_properties.get_all(id).into_iter().collect();
523
524 Some(node)
525 }
526
527 #[must_use]
530 #[cfg(feature = "tiered-storage")]
531 pub fn get_node_versioned(&self, id: NodeId, epoch: EpochId, tx_id: TxId) -> Option<Node> {
532 let versions = self.node_versions.read();
533 let index = versions.get(&id)?;
534 let version_ref = index.visible_to(epoch, tx_id)?;
535
536 let record = self.read_node_record(&version_ref)?;
538
539 if record.is_deleted() {
540 return None;
541 }
542
543 let mut node = Node::new(id);
544
545 let id_to_label = self.id_to_label.read();
547 let node_labels = self.node_labels.read();
548 if let Some(label_ids) = node_labels.get(&id) {
549 for &label_id in label_ids {
550 if let Some(label) = id_to_label.get(label_id as usize) {
551 node.labels.push(label.clone());
552 }
553 }
554 }
555
556 node.properties = self.node_properties.get_all(id).into_iter().collect();
558
559 Some(node)
560 }
561
562 #[cfg(feature = "tiered-storage")]
564 #[allow(unsafe_code)]
565 fn read_node_record(&self, version_ref: &VersionRef) -> Option<NodeRecord> {
566 match version_ref {
567 VersionRef::Hot(hot_ref) => {
568 let arena = self.arena_allocator.arena(hot_ref.epoch);
569 let record: &NodeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
571 Some(record.clone())
572 }
573 VersionRef::Cold(cold_ref) => {
574 self.epoch_store
576 .get_node(cold_ref.epoch, cold_ref.block_offset, cold_ref.length)
577 }
578 }
579 }
580
581 pub fn delete_node(&self, id: NodeId) -> bool {
583 self.delete_node_at_epoch(id, self.current_epoch())
584 }
585
586 #[cfg(not(feature = "tiered-storage"))]
588 pub fn delete_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> bool {
589 let mut nodes = self.nodes.write();
590 if let Some(chain) = nodes.get_mut(&id) {
591 if let Some(record) = chain.visible_at(epoch) {
593 if record.is_deleted() {
594 return false;
595 }
596 } else {
597 return false;
599 }
600
601 chain.mark_deleted(epoch);
603
604 let mut index = self.label_index.write();
606 let mut node_labels = self.node_labels.write();
607 if let Some(label_ids) = node_labels.remove(&id) {
608 for label_id in label_ids {
609 if let Some(set) = index.get_mut(label_id as usize) {
610 set.remove(&id);
611 }
612 }
613 }
614
615 drop(nodes); drop(index);
618 drop(node_labels);
619 self.node_properties.remove_all(id);
620
621 true
624 } else {
625 false
626 }
627 }
628
629 #[cfg(feature = "tiered-storage")]
632 pub fn delete_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> bool {
633 let mut versions = self.node_versions.write();
634 if let Some(index) = versions.get_mut(&id) {
635 if let Some(version_ref) = index.visible_at(epoch) {
637 if let Some(record) = self.read_node_record(&version_ref) {
638 if record.is_deleted() {
639 return false;
640 }
641 } else {
642 return false;
643 }
644 } else {
645 return false;
646 }
647
648 index.mark_deleted(epoch);
650
651 let mut label_index = self.label_index.write();
653 let mut node_labels = self.node_labels.write();
654 if let Some(label_ids) = node_labels.remove(&id) {
655 for label_id in label_ids {
656 if let Some(set) = label_index.get_mut(label_id as usize) {
657 set.remove(&id);
658 }
659 }
660 }
661
662 drop(versions);
664 drop(label_index);
665 drop(node_labels);
666 self.node_properties.remove_all(id);
667
668 true
669 } else {
670 false
671 }
672 }
673
674 #[cfg(not(feature = "tiered-storage"))]
679 pub fn delete_node_edges(&self, node_id: NodeId) {
680 let outgoing: Vec<EdgeId> = self
682 .forward_adj
683 .edges_from(node_id)
684 .into_iter()
685 .map(|(_, edge_id)| edge_id)
686 .collect();
687
688 let incoming: Vec<EdgeId> = if let Some(ref backward) = self.backward_adj {
690 backward
691 .edges_from(node_id)
692 .into_iter()
693 .map(|(_, edge_id)| edge_id)
694 .collect()
695 } else {
696 let epoch = self.current_epoch();
698 self.edges
699 .read()
700 .iter()
701 .filter_map(|(id, chain)| {
702 chain.visible_at(epoch).and_then(|r| {
703 if !r.is_deleted() && r.dst == node_id {
704 Some(*id)
705 } else {
706 None
707 }
708 })
709 })
710 .collect()
711 };
712
713 for edge_id in outgoing.into_iter().chain(incoming) {
715 self.delete_edge(edge_id);
716 }
717 }
718
719 #[cfg(feature = "tiered-storage")]
722 pub fn delete_node_edges(&self, node_id: NodeId) {
723 let outgoing: Vec<EdgeId> = self
725 .forward_adj
726 .edges_from(node_id)
727 .into_iter()
728 .map(|(_, edge_id)| edge_id)
729 .collect();
730
731 let incoming: Vec<EdgeId> = if let Some(ref backward) = self.backward_adj {
733 backward
734 .edges_from(node_id)
735 .into_iter()
736 .map(|(_, edge_id)| edge_id)
737 .collect()
738 } else {
739 let epoch = self.current_epoch();
741 let versions = self.edge_versions.read();
742 versions
743 .iter()
744 .filter_map(|(id, index)| {
745 index.visible_at(epoch).and_then(|vref| {
746 self.read_edge_record(&vref).and_then(|r| {
747 if !r.is_deleted() && r.dst == node_id {
748 Some(*id)
749 } else {
750 None
751 }
752 })
753 })
754 })
755 .collect()
756 };
757
758 for edge_id in outgoing.into_iter().chain(incoming) {
760 self.delete_edge(edge_id);
761 }
762 }
763
764 #[cfg(not(feature = "tiered-storage"))]
766 pub fn set_node_property(&self, id: NodeId, key: &str, value: Value) {
767 let prop_key: PropertyKey = key.into();
768
769 self.update_property_index_on_set(id, &prop_key, &value);
771
772 self.node_properties.set(id, prop_key, value);
773
774 let count = self.node_properties.get_all(id).len() as u16;
776 if let Some(chain) = self.nodes.write().get_mut(&id) {
777 if let Some(record) = chain.latest_mut() {
778 record.props_count = count;
779 }
780 }
781 }
782
783 #[cfg(feature = "tiered-storage")]
786 pub fn set_node_property(&self, id: NodeId, key: &str, value: Value) {
787 let prop_key: PropertyKey = key.into();
788
789 self.update_property_index_on_set(id, &prop_key, &value);
791
792 self.node_properties.set(id, prop_key, value);
793 }
797
798 pub fn set_edge_property(&self, id: EdgeId, key: &str, value: Value) {
800 self.edge_properties.set(id, key.into(), value);
801 }
802
803 #[cfg(not(feature = "tiered-storage"))]
807 pub fn remove_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
808 let prop_key: PropertyKey = key.into();
809
810 self.update_property_index_on_remove(id, &prop_key);
812
813 let result = self.node_properties.remove(id, &prop_key);
814
815 let count = self.node_properties.get_all(id).len() as u16;
817 if let Some(chain) = self.nodes.write().get_mut(&id) {
818 if let Some(record) = chain.latest_mut() {
819 record.props_count = count;
820 }
821 }
822
823 result
824 }
825
826 #[cfg(feature = "tiered-storage")]
829 pub fn remove_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
830 let prop_key: PropertyKey = key.into();
831
832 self.update_property_index_on_remove(id, &prop_key);
834
835 self.node_properties.remove(id, &prop_key)
836 }
838
839 pub fn remove_edge_property(&self, id: EdgeId, key: &str) -> Option<Value> {
843 self.edge_properties.remove(id, &key.into())
844 }
845
846 #[must_use]
861 pub fn get_node_property(&self, id: NodeId, key: &PropertyKey) -> Option<Value> {
862 self.node_properties.get(id, key)
863 }
864
865 #[must_use]
869 pub fn get_edge_property(&self, id: EdgeId, key: &PropertyKey) -> Option<Value> {
870 self.edge_properties.get(id, key)
871 }
872
873 #[must_use]
896 pub fn get_node_property_batch(&self, ids: &[NodeId], key: &PropertyKey) -> Vec<Option<Value>> {
897 self.node_properties.get_batch(ids, key)
898 }
899
900 #[must_use]
905 pub fn get_nodes_properties_batch(&self, ids: &[NodeId]) -> Vec<FxHashMap<PropertyKey, Value>> {
906 self.node_properties.get_all_batch(ids)
907 }
908
909 #[must_use]
945 pub fn find_nodes_in_range(
946 &self,
947 property: &str,
948 min: Option<&Value>,
949 max: Option<&Value>,
950 min_inclusive: bool,
951 max_inclusive: bool,
952 ) -> Vec<NodeId> {
953 let key = PropertyKey::new(property);
954
955 if !self
957 .node_properties
958 .might_match_range(&key, min, max, min_inclusive, max_inclusive)
959 {
960 return Vec::new();
961 }
962
963 self.node_ids()
965 .into_iter()
966 .filter(|&node_id| {
967 self.node_properties
968 .get(node_id, &key)
969 .is_some_and(|v| value_in_range(&v, min, max, min_inclusive, max_inclusive))
970 })
971 .collect()
972 }
973
974 #[must_use]
999 pub fn find_nodes_by_properties(&self, conditions: &[(&str, Value)]) -> Vec<NodeId> {
1000 if conditions.is_empty() {
1001 return self.node_ids();
1002 }
1003
1004 let mut best_start: Option<(usize, Vec<NodeId>)> = None;
1007 let indexes = self.property_indexes.read();
1008
1009 for (i, (prop, value)) in conditions.iter().enumerate() {
1010 let key = PropertyKey::new(*prop);
1011 let hv = HashableValue::new(value.clone());
1012
1013 if let Some(index) = indexes.get(&key) {
1014 let matches: Vec<NodeId> = index
1015 .get(&hv)
1016 .map(|nodes| nodes.iter().copied().collect())
1017 .unwrap_or_default();
1018
1019 if matches.is_empty() {
1021 return Vec::new();
1022 }
1023
1024 if best_start
1026 .as_ref()
1027 .is_none_or(|(_, best)| matches.len() < best.len())
1028 {
1029 best_start = Some((i, matches));
1030 }
1031 }
1032 }
1033 drop(indexes);
1034
1035 let (start_idx, mut candidates) = best_start.unwrap_or_else(|| {
1037 let (prop, value) = &conditions[0];
1039 (0, self.find_nodes_by_property(prop, value))
1040 });
1041
1042 for (i, (prop, value)) in conditions.iter().enumerate() {
1044 if i == start_idx {
1045 continue;
1046 }
1047
1048 let key = PropertyKey::new(*prop);
1049 candidates.retain(|&node_id| {
1050 self.node_properties
1051 .get(node_id, &key)
1052 .is_some_and(|v| v == *value)
1053 });
1054
1055 if candidates.is_empty() {
1057 return Vec::new();
1058 }
1059 }
1060
1061 candidates
1062 }
1063
1064 pub fn create_property_index(&self, property: &str) {
1092 let key = PropertyKey::new(property);
1093
1094 let mut indexes = self.property_indexes.write();
1095 if indexes.contains_key(&key) {
1096 return; }
1098
1099 let index: DashMap<HashableValue, FxHashSet<NodeId>> = DashMap::new();
1101
1102 for node_id in self.node_ids() {
1104 if let Some(value) = self.node_properties.get(node_id, &key) {
1105 let hv = HashableValue::new(value);
1106 index
1107 .entry(hv)
1108 .or_insert_with(FxHashSet::default)
1109 .insert(node_id);
1110 }
1111 }
1112
1113 indexes.insert(key, index);
1114 }
1115
1116 pub fn drop_property_index(&self, property: &str) -> bool {
1120 let key = PropertyKey::new(property);
1121 self.property_indexes.write().remove(&key).is_some()
1122 }
1123
1124 #[must_use]
1126 pub fn has_property_index(&self, property: &str) -> bool {
1127 let key = PropertyKey::new(property);
1128 self.property_indexes.read().contains_key(&key)
1129 }
1130
1131 #[must_use]
1154 pub fn find_nodes_by_property(&self, property: &str, value: &Value) -> Vec<NodeId> {
1155 let key = PropertyKey::new(property);
1156 let hv = HashableValue::new(value.clone());
1157
1158 let indexes = self.property_indexes.read();
1160 if let Some(index) = indexes.get(&key) {
1161 if let Some(nodes) = index.get(&hv) {
1162 return nodes.iter().copied().collect();
1163 }
1164 return Vec::new();
1165 }
1166 drop(indexes);
1167
1168 self.node_ids()
1170 .into_iter()
1171 .filter(|&node_id| {
1172 self.node_properties
1173 .get(node_id, &key)
1174 .is_some_and(|v| v == *value)
1175 })
1176 .collect()
1177 }
1178
1179 fn update_property_index_on_set(&self, node_id: NodeId, key: &PropertyKey, new_value: &Value) {
1181 let indexes = self.property_indexes.read();
1182 if let Some(index) = indexes.get(key) {
1183 if let Some(old_value) = self.node_properties.get(node_id, key) {
1185 let old_hv = HashableValue::new(old_value);
1186 if let Some(mut nodes) = index.get_mut(&old_hv) {
1187 nodes.remove(&node_id);
1188 if nodes.is_empty() {
1189 drop(nodes);
1190 index.remove(&old_hv);
1191 }
1192 }
1193 }
1194
1195 let new_hv = HashableValue::new(new_value.clone());
1197 index
1198 .entry(new_hv)
1199 .or_insert_with(FxHashSet::default)
1200 .insert(node_id);
1201 }
1202 }
1203
1204 fn update_property_index_on_remove(&self, node_id: NodeId, key: &PropertyKey) {
1206 let indexes = self.property_indexes.read();
1207 if let Some(index) = indexes.get(key) {
1208 if let Some(old_value) = self.node_properties.get(node_id, key) {
1210 let old_hv = HashableValue::new(old_value);
1211 if let Some(mut nodes) = index.get_mut(&old_hv) {
1212 nodes.remove(&node_id);
1213 if nodes.is_empty() {
1214 drop(nodes);
1215 index.remove(&old_hv);
1216 }
1217 }
1218 }
1219 }
1220 }
1221
1222 #[cfg(not(feature = "tiered-storage"))]
1227 pub fn add_label(&self, node_id: NodeId, label: &str) -> bool {
1228 let epoch = self.current_epoch();
1229
1230 let nodes = self.nodes.read();
1232 if let Some(chain) = nodes.get(&node_id) {
1233 if chain.visible_at(epoch).map_or(true, |r| r.is_deleted()) {
1234 return false;
1235 }
1236 } else {
1237 return false;
1238 }
1239 drop(nodes);
1240
1241 let label_id = self.get_or_create_label_id(label);
1243
1244 let mut node_labels = self.node_labels.write();
1246 let label_set = node_labels
1247 .entry(node_id)
1248 .or_insert_with(FxHashSet::default);
1249
1250 if label_set.contains(&label_id) {
1251 return false; }
1253
1254 label_set.insert(label_id);
1255 drop(node_labels);
1256
1257 let mut index = self.label_index.write();
1259 if (label_id as usize) >= index.len() {
1260 index.resize(label_id as usize + 1, FxHashMap::default());
1261 }
1262 index[label_id as usize].insert(node_id, ());
1263
1264 if let Some(chain) = self.nodes.write().get_mut(&node_id) {
1266 if let Some(record) = chain.latest_mut() {
1267 let count = self.node_labels.read().get(&node_id).map_or(0, |s| s.len());
1268 record.set_label_count(count as u16);
1269 }
1270 }
1271
1272 true
1273 }
1274
1275 #[cfg(feature = "tiered-storage")]
1278 pub fn add_label(&self, node_id: NodeId, label: &str) -> bool {
1279 let epoch = self.current_epoch();
1280
1281 let versions = self.node_versions.read();
1283 if let Some(index) = versions.get(&node_id) {
1284 if let Some(vref) = index.visible_at(epoch) {
1285 if let Some(record) = self.read_node_record(&vref) {
1286 if record.is_deleted() {
1287 return false;
1288 }
1289 } else {
1290 return false;
1291 }
1292 } else {
1293 return false;
1294 }
1295 } else {
1296 return false;
1297 }
1298 drop(versions);
1299
1300 let label_id = self.get_or_create_label_id(label);
1302
1303 let mut node_labels = self.node_labels.write();
1305 let label_set = node_labels
1306 .entry(node_id)
1307 .or_insert_with(FxHashSet::default);
1308
1309 if label_set.contains(&label_id) {
1310 return false; }
1312
1313 label_set.insert(label_id);
1314 drop(node_labels);
1315
1316 let mut index = self.label_index.write();
1318 if (label_id as usize) >= index.len() {
1319 index.resize(label_id as usize + 1, FxHashMap::default());
1320 }
1321 index[label_id as usize].insert(node_id, ());
1322
1323 true
1327 }
1328
1329 #[cfg(not(feature = "tiered-storage"))]
1334 pub fn remove_label(&self, node_id: NodeId, label: &str) -> bool {
1335 let epoch = self.current_epoch();
1336
1337 let nodes = self.nodes.read();
1339 if let Some(chain) = nodes.get(&node_id) {
1340 if chain.visible_at(epoch).map_or(true, |r| r.is_deleted()) {
1341 return false;
1342 }
1343 } else {
1344 return false;
1345 }
1346 drop(nodes);
1347
1348 let label_id = {
1350 let label_ids = self.label_to_id.read();
1351 match label_ids.get(label) {
1352 Some(&id) => id,
1353 None => return false, }
1355 };
1356
1357 let mut node_labels = self.node_labels.write();
1359 if let Some(label_set) = node_labels.get_mut(&node_id) {
1360 if !label_set.remove(&label_id) {
1361 return false; }
1363 } else {
1364 return false;
1365 }
1366 drop(node_labels);
1367
1368 let mut index = self.label_index.write();
1370 if (label_id as usize) < index.len() {
1371 index[label_id as usize].remove(&node_id);
1372 }
1373
1374 if let Some(chain) = self.nodes.write().get_mut(&node_id) {
1376 if let Some(record) = chain.latest_mut() {
1377 let count = self.node_labels.read().get(&node_id).map_or(0, |s| s.len());
1378 record.set_label_count(count as u16);
1379 }
1380 }
1381
1382 true
1383 }
1384
1385 #[cfg(feature = "tiered-storage")]
1388 pub fn remove_label(&self, node_id: NodeId, label: &str) -> bool {
1389 let epoch = self.current_epoch();
1390
1391 let versions = self.node_versions.read();
1393 if let Some(index) = versions.get(&node_id) {
1394 if let Some(vref) = index.visible_at(epoch) {
1395 if let Some(record) = self.read_node_record(&vref) {
1396 if record.is_deleted() {
1397 return false;
1398 }
1399 } else {
1400 return false;
1401 }
1402 } else {
1403 return false;
1404 }
1405 } else {
1406 return false;
1407 }
1408 drop(versions);
1409
1410 let label_id = {
1412 let label_ids = self.label_to_id.read();
1413 match label_ids.get(label) {
1414 Some(&id) => id,
1415 None => return false, }
1417 };
1418
1419 let mut node_labels = self.node_labels.write();
1421 if let Some(label_set) = node_labels.get_mut(&node_id) {
1422 if !label_set.remove(&label_id) {
1423 return false; }
1425 } else {
1426 return false;
1427 }
1428 drop(node_labels);
1429
1430 let mut index = self.label_index.write();
1432 if (label_id as usize) < index.len() {
1433 index[label_id as usize].remove(&node_id);
1434 }
1435
1436 true
1439 }
1440
1441 #[must_use]
1443 #[cfg(not(feature = "tiered-storage"))]
1444 pub fn node_count(&self) -> usize {
1445 let epoch = self.current_epoch();
1446 self.nodes
1447 .read()
1448 .values()
1449 .filter_map(|chain| chain.visible_at(epoch))
1450 .filter(|r| !r.is_deleted())
1451 .count()
1452 }
1453
1454 #[must_use]
1457 #[cfg(feature = "tiered-storage")]
1458 pub fn node_count(&self) -> usize {
1459 let epoch = self.current_epoch();
1460 let versions = self.node_versions.read();
1461 versions
1462 .iter()
1463 .filter(|(_, index)| {
1464 index.visible_at(epoch).map_or(false, |vref| {
1465 self.read_node_record(&vref)
1466 .map_or(false, |r| !r.is_deleted())
1467 })
1468 })
1469 .count()
1470 }
1471
1472 #[must_use]
1478 #[cfg(not(feature = "tiered-storage"))]
1479 pub fn node_ids(&self) -> Vec<NodeId> {
1480 let epoch = self.current_epoch();
1481 let mut ids: Vec<NodeId> = self
1482 .nodes
1483 .read()
1484 .iter()
1485 .filter_map(|(id, chain)| {
1486 chain
1487 .visible_at(epoch)
1488 .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
1489 })
1490 .collect();
1491 ids.sort_unstable();
1492 ids
1493 }
1494
1495 #[must_use]
1498 #[cfg(feature = "tiered-storage")]
1499 pub fn node_ids(&self) -> Vec<NodeId> {
1500 let epoch = self.current_epoch();
1501 let versions = self.node_versions.read();
1502 let mut ids: Vec<NodeId> = versions
1503 .iter()
1504 .filter_map(|(id, index)| {
1505 index.visible_at(epoch).and_then(|vref| {
1506 self.read_node_record(&vref)
1507 .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
1508 })
1509 })
1510 .collect();
1511 ids.sort_unstable();
1512 ids
1513 }
1514
1515 pub fn create_edge(&self, src: NodeId, dst: NodeId, edge_type: &str) -> EdgeId {
1519 self.create_edge_versioned(src, dst, edge_type, self.current_epoch(), TxId::SYSTEM)
1520 }
1521
1522 #[cfg(not(feature = "tiered-storage"))]
1524 pub fn create_edge_versioned(
1525 &self,
1526 src: NodeId,
1527 dst: NodeId,
1528 edge_type: &str,
1529 epoch: EpochId,
1530 tx_id: TxId,
1531 ) -> EdgeId {
1532 let id = EdgeId::new(self.next_edge_id.fetch_add(1, Ordering::Relaxed));
1533 let type_id = self.get_or_create_edge_type_id(edge_type);
1534
1535 let record = EdgeRecord::new(id, src, dst, type_id, epoch);
1536 let chain = VersionChain::with_initial(record, epoch, tx_id);
1537 self.edges.write().insert(id, chain);
1538
1539 self.forward_adj.add_edge(src, dst, id);
1541 if let Some(ref backward) = self.backward_adj {
1542 backward.add_edge(dst, src, id);
1543 }
1544
1545 id
1546 }
1547
1548 #[cfg(feature = "tiered-storage")]
1551 pub fn create_edge_versioned(
1552 &self,
1553 src: NodeId,
1554 dst: NodeId,
1555 edge_type: &str,
1556 epoch: EpochId,
1557 tx_id: TxId,
1558 ) -> EdgeId {
1559 let id = EdgeId::new(self.next_edge_id.fetch_add(1, Ordering::Relaxed));
1560 let type_id = self.get_or_create_edge_type_id(edge_type);
1561
1562 let record = EdgeRecord::new(id, src, dst, type_id, epoch);
1563
1564 let arena = self.arena_allocator.arena_or_create(epoch);
1566 let (offset, _stored) = arena.alloc_value_with_offset(record);
1567
1568 let hot_ref = HotVersionRef::new(epoch, offset, tx_id);
1570
1571 let mut versions = self.edge_versions.write();
1573 if let Some(index) = versions.get_mut(&id) {
1574 index.add_hot(hot_ref);
1575 } else {
1576 versions.insert(id, VersionIndex::with_initial(hot_ref));
1577 }
1578
1579 self.forward_adj.add_edge(src, dst, id);
1581 if let Some(ref backward) = self.backward_adj {
1582 backward.add_edge(dst, src, id);
1583 }
1584
1585 id
1586 }
1587
1588 pub fn create_edge_with_props(
1590 &self,
1591 src: NodeId,
1592 dst: NodeId,
1593 edge_type: &str,
1594 properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
1595 ) -> EdgeId {
1596 let id = self.create_edge(src, dst, edge_type);
1597
1598 for (key, value) in properties {
1599 self.edge_properties.set(id, key.into(), value.into());
1600 }
1601
1602 id
1603 }
1604
1605 #[must_use]
1607 pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
1608 self.get_edge_at_epoch(id, self.current_epoch())
1609 }
1610
1611 #[must_use]
1613 #[cfg(not(feature = "tiered-storage"))]
1614 pub fn get_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> Option<Edge> {
1615 let edges = self.edges.read();
1616 let chain = edges.get(&id)?;
1617 let record = chain.visible_at(epoch)?;
1618
1619 if record.is_deleted() {
1620 return None;
1621 }
1622
1623 let edge_type = {
1624 let id_to_type = self.id_to_edge_type.read();
1625 id_to_type.get(record.type_id as usize)?.clone()
1626 };
1627
1628 let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1629
1630 edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1632
1633 Some(edge)
1634 }
1635
1636 #[must_use]
1639 #[cfg(feature = "tiered-storage")]
1640 pub fn get_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> Option<Edge> {
1641 let versions = self.edge_versions.read();
1642 let index = versions.get(&id)?;
1643 let version_ref = index.visible_at(epoch)?;
1644
1645 let record = self.read_edge_record(&version_ref)?;
1646
1647 if record.is_deleted() {
1648 return None;
1649 }
1650
1651 let edge_type = {
1652 let id_to_type = self.id_to_edge_type.read();
1653 id_to_type.get(record.type_id as usize)?.clone()
1654 };
1655
1656 let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1657
1658 edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1660
1661 Some(edge)
1662 }
1663
1664 #[must_use]
1666 #[cfg(not(feature = "tiered-storage"))]
1667 pub fn get_edge_versioned(&self, id: EdgeId, epoch: EpochId, tx_id: TxId) -> Option<Edge> {
1668 let edges = self.edges.read();
1669 let chain = edges.get(&id)?;
1670 let record = chain.visible_to(epoch, tx_id)?;
1671
1672 if record.is_deleted() {
1673 return None;
1674 }
1675
1676 let edge_type = {
1677 let id_to_type = self.id_to_edge_type.read();
1678 id_to_type.get(record.type_id as usize)?.clone()
1679 };
1680
1681 let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1682
1683 edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1685
1686 Some(edge)
1687 }
1688
1689 #[must_use]
1692 #[cfg(feature = "tiered-storage")]
1693 pub fn get_edge_versioned(&self, id: EdgeId, epoch: EpochId, tx_id: TxId) -> Option<Edge> {
1694 let versions = self.edge_versions.read();
1695 let index = versions.get(&id)?;
1696 let version_ref = index.visible_to(epoch, tx_id)?;
1697
1698 let record = self.read_edge_record(&version_ref)?;
1699
1700 if record.is_deleted() {
1701 return None;
1702 }
1703
1704 let edge_type = {
1705 let id_to_type = self.id_to_edge_type.read();
1706 id_to_type.get(record.type_id as usize)?.clone()
1707 };
1708
1709 let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1710
1711 edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1713
1714 Some(edge)
1715 }
1716
1717 #[cfg(feature = "tiered-storage")]
1719 #[allow(unsafe_code)]
1720 fn read_edge_record(&self, version_ref: &VersionRef) -> Option<EdgeRecord> {
1721 match version_ref {
1722 VersionRef::Hot(hot_ref) => {
1723 let arena = self.arena_allocator.arena(hot_ref.epoch);
1724 let record: &EdgeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
1726 Some(record.clone())
1727 }
1728 VersionRef::Cold(cold_ref) => {
1729 self.epoch_store
1731 .get_edge(cold_ref.epoch, cold_ref.block_offset, cold_ref.length)
1732 }
1733 }
1734 }
1735
1736 pub fn delete_edge(&self, id: EdgeId) -> bool {
1738 self.delete_edge_at_epoch(id, self.current_epoch())
1739 }
1740
1741 #[cfg(not(feature = "tiered-storage"))]
1743 pub fn delete_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> bool {
1744 let mut edges = self.edges.write();
1745 if let Some(chain) = edges.get_mut(&id) {
1746 let (src, dst) = {
1748 match chain.visible_at(epoch) {
1749 Some(record) => {
1750 if record.is_deleted() {
1751 return false;
1752 }
1753 (record.src, record.dst)
1754 }
1755 None => return false, }
1757 };
1758
1759 chain.mark_deleted(epoch);
1761
1762 drop(edges); self.forward_adj.mark_deleted(src, id);
1766 if let Some(ref backward) = self.backward_adj {
1767 backward.mark_deleted(dst, id);
1768 }
1769
1770 self.edge_properties.remove_all(id);
1772
1773 true
1774 } else {
1775 false
1776 }
1777 }
1778
1779 #[cfg(feature = "tiered-storage")]
1782 pub fn delete_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> bool {
1783 let mut versions = self.edge_versions.write();
1784 if let Some(index) = versions.get_mut(&id) {
1785 let (src, dst) = {
1787 match index.visible_at(epoch) {
1788 Some(version_ref) => {
1789 if let Some(record) = self.read_edge_record(&version_ref) {
1790 if record.is_deleted() {
1791 return false;
1792 }
1793 (record.src, record.dst)
1794 } else {
1795 return false;
1796 }
1797 }
1798 None => return false,
1799 }
1800 };
1801
1802 index.mark_deleted(epoch);
1804
1805 drop(versions); self.forward_adj.mark_deleted(src, id);
1809 if let Some(ref backward) = self.backward_adj {
1810 backward.mark_deleted(dst, id);
1811 }
1812
1813 self.edge_properties.remove_all(id);
1815
1816 true
1817 } else {
1818 false
1819 }
1820 }
1821
1822 #[must_use]
1824 #[cfg(not(feature = "tiered-storage"))]
1825 pub fn edge_count(&self) -> usize {
1826 let epoch = self.current_epoch();
1827 self.edges
1828 .read()
1829 .values()
1830 .filter_map(|chain| chain.visible_at(epoch))
1831 .filter(|r| !r.is_deleted())
1832 .count()
1833 }
1834
1835 #[must_use]
1838 #[cfg(feature = "tiered-storage")]
1839 pub fn edge_count(&self) -> usize {
1840 let epoch = self.current_epoch();
1841 let versions = self.edge_versions.read();
1842 versions
1843 .iter()
1844 .filter(|(_, index)| {
1845 index.visible_at(epoch).map_or(false, |vref| {
1846 self.read_edge_record(&vref)
1847 .map_or(false, |r| !r.is_deleted())
1848 })
1849 })
1850 .count()
1851 }
1852
1853 #[cfg(not(feature = "tiered-storage"))]
1858 pub fn discard_uncommitted_versions(&self, tx_id: TxId) {
1859 {
1861 let mut nodes = self.nodes.write();
1862 for chain in nodes.values_mut() {
1863 chain.remove_versions_by(tx_id);
1864 }
1865 nodes.retain(|_, chain| !chain.is_empty());
1867 }
1868
1869 {
1871 let mut edges = self.edges.write();
1872 for chain in edges.values_mut() {
1873 chain.remove_versions_by(tx_id);
1874 }
1875 edges.retain(|_, chain| !chain.is_empty());
1877 }
1878 }
1879
1880 #[cfg(feature = "tiered-storage")]
1883 pub fn discard_uncommitted_versions(&self, tx_id: TxId) {
1884 {
1886 let mut versions = self.node_versions.write();
1887 for index in versions.values_mut() {
1888 index.remove_versions_by(tx_id);
1889 }
1890 versions.retain(|_, index| !index.is_empty());
1892 }
1893
1894 {
1896 let mut versions = self.edge_versions.write();
1897 for index in versions.values_mut() {
1898 index.remove_versions_by(tx_id);
1899 }
1900 versions.retain(|_, index| !index.is_empty());
1902 }
1903 }
1904
1905 #[cfg(feature = "tiered-storage")]
1924 #[allow(unsafe_code)]
1925 pub fn freeze_epoch(&self, epoch: EpochId) -> usize {
1926 let mut node_records: Vec<(u64, NodeRecord)> = Vec::new();
1928 let mut node_hot_refs: Vec<(NodeId, HotVersionRef)> = Vec::new();
1929
1930 {
1931 let versions = self.node_versions.read();
1932 for (node_id, index) in versions.iter() {
1933 for hot_ref in index.hot_refs_for_epoch(epoch) {
1934 let arena = self.arena_allocator.arena(hot_ref.epoch);
1935 let record: &NodeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
1937 node_records.push((node_id.as_u64(), record.clone()));
1938 node_hot_refs.push((*node_id, *hot_ref));
1939 }
1940 }
1941 }
1942
1943 let mut edge_records: Vec<(u64, EdgeRecord)> = Vec::new();
1945 let mut edge_hot_refs: Vec<(EdgeId, HotVersionRef)> = Vec::new();
1946
1947 {
1948 let versions = self.edge_versions.read();
1949 for (edge_id, index) in versions.iter() {
1950 for hot_ref in index.hot_refs_for_epoch(epoch) {
1951 let arena = self.arena_allocator.arena(hot_ref.epoch);
1952 let record: &EdgeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
1954 edge_records.push((edge_id.as_u64(), record.clone()));
1955 edge_hot_refs.push((*edge_id, *hot_ref));
1956 }
1957 }
1958 }
1959
1960 let total_frozen = node_records.len() + edge_records.len();
1961
1962 if total_frozen == 0 {
1963 return 0;
1964 }
1965
1966 let (node_entries, edge_entries) =
1968 self.epoch_store
1969 .freeze_epoch(epoch, node_records, edge_records);
1970
1971 let node_entry_map: FxHashMap<u64, _> = node_entries
1973 .iter()
1974 .map(|e| (e.entity_id, (e.offset, e.length)))
1975 .collect();
1976 let edge_entry_map: FxHashMap<u64, _> = edge_entries
1977 .iter()
1978 .map(|e| (e.entity_id, (e.offset, e.length)))
1979 .collect();
1980
1981 {
1983 let mut versions = self.node_versions.write();
1984 for (node_id, hot_ref) in &node_hot_refs {
1985 if let Some(index) = versions.get_mut(node_id) {
1986 if let Some(&(offset, length)) = node_entry_map.get(&node_id.as_u64()) {
1987 let cold_ref = ColdVersionRef {
1988 epoch,
1989 block_offset: offset,
1990 length,
1991 created_by: hot_ref.created_by,
1992 deleted_epoch: hot_ref.deleted_epoch,
1993 };
1994 index.freeze_epoch(epoch, std::iter::once(cold_ref));
1995 }
1996 }
1997 }
1998 }
1999
2000 {
2001 let mut versions = self.edge_versions.write();
2002 for (edge_id, hot_ref) in &edge_hot_refs {
2003 if let Some(index) = versions.get_mut(edge_id) {
2004 if let Some(&(offset, length)) = edge_entry_map.get(&edge_id.as_u64()) {
2005 let cold_ref = ColdVersionRef {
2006 epoch,
2007 block_offset: offset,
2008 length,
2009 created_by: hot_ref.created_by,
2010 deleted_epoch: hot_ref.deleted_epoch,
2011 };
2012 index.freeze_epoch(epoch, std::iter::once(cold_ref));
2013 }
2014 }
2015 }
2016 }
2017
2018 total_frozen
2019 }
2020
2021 #[cfg(feature = "tiered-storage")]
2023 #[must_use]
2024 pub fn epoch_store(&self) -> &EpochStore {
2025 &self.epoch_store
2026 }
2027
2028 #[must_use]
2030 pub fn label_count(&self) -> usize {
2031 self.id_to_label.read().len()
2032 }
2033
2034 #[must_use]
2038 pub fn property_key_count(&self) -> usize {
2039 let node_keys = self.node_properties.column_count();
2040 let edge_keys = self.edge_properties.column_count();
2041 node_keys + edge_keys
2045 }
2046
2047 #[must_use]
2049 pub fn edge_type_count(&self) -> usize {
2050 self.id_to_edge_type.read().len()
2051 }
2052
2053 pub fn neighbors(
2060 &self,
2061 node: NodeId,
2062 direction: Direction,
2063 ) -> impl Iterator<Item = NodeId> + '_ {
2064 let forward: Box<dyn Iterator<Item = NodeId>> = match direction {
2065 Direction::Outgoing | Direction::Both => {
2066 Box::new(self.forward_adj.neighbors(node).into_iter())
2067 }
2068 Direction::Incoming => Box::new(std::iter::empty()),
2069 };
2070
2071 let backward: Box<dyn Iterator<Item = NodeId>> = match direction {
2072 Direction::Incoming | Direction::Both => {
2073 if let Some(ref adj) = self.backward_adj {
2074 Box::new(adj.neighbors(node).into_iter())
2075 } else {
2076 Box::new(std::iter::empty())
2077 }
2078 }
2079 Direction::Outgoing => Box::new(std::iter::empty()),
2080 };
2081
2082 forward.chain(backward)
2083 }
2084
2085 pub fn edges_from(
2089 &self,
2090 node: NodeId,
2091 direction: Direction,
2092 ) -> impl Iterator<Item = (NodeId, EdgeId)> + '_ {
2093 let forward: Box<dyn Iterator<Item = (NodeId, EdgeId)>> = match direction {
2094 Direction::Outgoing | Direction::Both => {
2095 Box::new(self.forward_adj.edges_from(node).into_iter())
2096 }
2097 Direction::Incoming => Box::new(std::iter::empty()),
2098 };
2099
2100 let backward: Box<dyn Iterator<Item = (NodeId, EdgeId)>> = match direction {
2101 Direction::Incoming | Direction::Both => {
2102 if let Some(ref adj) = self.backward_adj {
2103 Box::new(adj.edges_from(node).into_iter())
2104 } else {
2105 Box::new(std::iter::empty())
2106 }
2107 }
2108 Direction::Outgoing => Box::new(std::iter::empty()),
2109 };
2110
2111 forward.chain(backward)
2112 }
2113
2114 pub fn edges_to(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
2127 if let Some(ref backward) = self.backward_adj {
2128 backward.edges_from(node)
2129 } else {
2130 self.all_edges()
2132 .filter_map(|edge| {
2133 if edge.dst == node {
2134 Some((edge.src, edge.id))
2135 } else {
2136 None
2137 }
2138 })
2139 .collect()
2140 }
2141 }
2142
2143 #[must_use]
2147 pub fn out_degree(&self, node: NodeId) -> usize {
2148 self.forward_adj.out_degree(node)
2149 }
2150
2151 #[must_use]
2156 pub fn in_degree(&self, node: NodeId) -> usize {
2157 if let Some(ref backward) = self.backward_adj {
2158 backward.in_degree(node)
2159 } else {
2160 self.all_edges().filter(|edge| edge.dst == node).count()
2162 }
2163 }
2164
2165 #[must_use]
2167 #[cfg(not(feature = "tiered-storage"))]
2168 pub fn edge_type(&self, id: EdgeId) -> Option<Arc<str>> {
2169 let edges = self.edges.read();
2170 let chain = edges.get(&id)?;
2171 let epoch = self.current_epoch();
2172 let record = chain.visible_at(epoch)?;
2173 let id_to_type = self.id_to_edge_type.read();
2174 id_to_type.get(record.type_id as usize).cloned()
2175 }
2176
2177 #[must_use]
2180 #[cfg(feature = "tiered-storage")]
2181 pub fn edge_type(&self, id: EdgeId) -> Option<Arc<str>> {
2182 let versions = self.edge_versions.read();
2183 let index = versions.get(&id)?;
2184 let epoch = self.current_epoch();
2185 let vref = index.visible_at(epoch)?;
2186 let record = self.read_edge_record(&vref)?;
2187 let id_to_type = self.id_to_edge_type.read();
2188 id_to_type.get(record.type_id as usize).cloned()
2189 }
2190
2191 pub fn nodes_by_label(&self, label: &str) -> Vec<NodeId> {
2197 let label_to_id = self.label_to_id.read();
2198 if let Some(&label_id) = label_to_id.get(label) {
2199 let index = self.label_index.read();
2200 if let Some(set) = index.get(label_id as usize) {
2201 let mut ids: Vec<NodeId> = set.keys().copied().collect();
2202 ids.sort_unstable();
2203 return ids;
2204 }
2205 }
2206 Vec::new()
2207 }
2208
2209 #[cfg(not(feature = "tiered-storage"))]
2216 pub fn all_nodes(&self) -> impl Iterator<Item = Node> + '_ {
2217 let epoch = self.current_epoch();
2218 let node_ids: Vec<NodeId> = self
2219 .nodes
2220 .read()
2221 .iter()
2222 .filter_map(|(id, chain)| {
2223 chain
2224 .visible_at(epoch)
2225 .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
2226 })
2227 .collect();
2228
2229 node_ids.into_iter().filter_map(move |id| self.get_node(id))
2230 }
2231
2232 #[cfg(feature = "tiered-storage")]
2235 pub fn all_nodes(&self) -> impl Iterator<Item = Node> + '_ {
2236 let node_ids = self.node_ids();
2237 node_ids.into_iter().filter_map(move |id| self.get_node(id))
2238 }
2239
2240 #[cfg(not(feature = "tiered-storage"))]
2245 pub fn all_edges(&self) -> impl Iterator<Item = Edge> + '_ {
2246 let epoch = self.current_epoch();
2247 let edge_ids: Vec<EdgeId> = self
2248 .edges
2249 .read()
2250 .iter()
2251 .filter_map(|(id, chain)| {
2252 chain
2253 .visible_at(epoch)
2254 .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
2255 })
2256 .collect();
2257
2258 edge_ids.into_iter().filter_map(move |id| self.get_edge(id))
2259 }
2260
2261 #[cfg(feature = "tiered-storage")]
2264 pub fn all_edges(&self) -> impl Iterator<Item = Edge> + '_ {
2265 let epoch = self.current_epoch();
2266 let versions = self.edge_versions.read();
2267 let edge_ids: Vec<EdgeId> = versions
2268 .iter()
2269 .filter_map(|(id, index)| {
2270 index.visible_at(epoch).and_then(|vref| {
2271 self.read_edge_record(&vref)
2272 .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
2273 })
2274 })
2275 .collect();
2276
2277 edge_ids.into_iter().filter_map(move |id| self.get_edge(id))
2278 }
2279
2280 pub fn all_labels(&self) -> Vec<String> {
2282 self.id_to_label
2283 .read()
2284 .iter()
2285 .map(|s| s.to_string())
2286 .collect()
2287 }
2288
2289 pub fn all_edge_types(&self) -> Vec<String> {
2291 self.id_to_edge_type
2292 .read()
2293 .iter()
2294 .map(|s| s.to_string())
2295 .collect()
2296 }
2297
2298 pub fn all_property_keys(&self) -> Vec<String> {
2300 let mut keys = std::collections::HashSet::new();
2301 for key in self.node_properties.keys() {
2302 keys.insert(key.to_string());
2303 }
2304 for key in self.edge_properties.keys() {
2305 keys.insert(key.to_string());
2306 }
2307 keys.into_iter().collect()
2308 }
2309
2310 pub fn nodes_with_label<'a>(&'a self, label: &str) -> impl Iterator<Item = Node> + 'a {
2312 let node_ids = self.nodes_by_label(label);
2313 node_ids.into_iter().filter_map(move |id| self.get_node(id))
2314 }
2315
2316 #[cfg(not(feature = "tiered-storage"))]
2318 pub fn edges_with_type<'a>(&'a self, edge_type: &str) -> impl Iterator<Item = Edge> + 'a {
2319 let epoch = self.current_epoch();
2320 let type_to_id = self.edge_type_to_id.read();
2321
2322 if let Some(&type_id) = type_to_id.get(edge_type) {
2323 let edge_ids: Vec<EdgeId> = self
2324 .edges
2325 .read()
2326 .iter()
2327 .filter_map(|(id, chain)| {
2328 chain.visible_at(epoch).and_then(|r| {
2329 if !r.is_deleted() && r.type_id == type_id {
2330 Some(*id)
2331 } else {
2332 None
2333 }
2334 })
2335 })
2336 .collect();
2337
2338 Box::new(edge_ids.into_iter().filter_map(move |id| self.get_edge(id)))
2340 as Box<dyn Iterator<Item = Edge> + 'a>
2341 } else {
2342 Box::new(std::iter::empty()) as Box<dyn Iterator<Item = Edge> + 'a>
2344 }
2345 }
2346
2347 #[cfg(feature = "tiered-storage")]
2350 pub fn edges_with_type<'a>(&'a self, edge_type: &str) -> impl Iterator<Item = Edge> + 'a {
2351 let epoch = self.current_epoch();
2352 let type_to_id = self.edge_type_to_id.read();
2353
2354 if let Some(&type_id) = type_to_id.get(edge_type) {
2355 let versions = self.edge_versions.read();
2356 let edge_ids: Vec<EdgeId> = versions
2357 .iter()
2358 .filter_map(|(id, index)| {
2359 index.visible_at(epoch).and_then(|vref| {
2360 self.read_edge_record(&vref).and_then(|r| {
2361 if !r.is_deleted() && r.type_id == type_id {
2362 Some(*id)
2363 } else {
2364 None
2365 }
2366 })
2367 })
2368 })
2369 .collect();
2370
2371 Box::new(edge_ids.into_iter().filter_map(move |id| self.get_edge(id)))
2372 as Box<dyn Iterator<Item = Edge> + 'a>
2373 } else {
2374 Box::new(std::iter::empty()) as Box<dyn Iterator<Item = Edge> + 'a>
2375 }
2376 }
2377
2378 #[must_use]
2385 pub fn node_property_might_match(
2386 &self,
2387 property: &PropertyKey,
2388 op: CompareOp,
2389 value: &Value,
2390 ) -> bool {
2391 self.node_properties.might_match(property, op, value)
2392 }
2393
2394 #[must_use]
2396 pub fn edge_property_might_match(
2397 &self,
2398 property: &PropertyKey,
2399 op: CompareOp,
2400 value: &Value,
2401 ) -> bool {
2402 self.edge_properties.might_match(property, op, value)
2403 }
2404
2405 #[must_use]
2407 pub fn node_property_zone_map(&self, property: &PropertyKey) -> Option<ZoneMapEntry> {
2408 self.node_properties.zone_map(property)
2409 }
2410
2411 #[must_use]
2413 pub fn edge_property_zone_map(&self, property: &PropertyKey) -> Option<ZoneMapEntry> {
2414 self.edge_properties.zone_map(property)
2415 }
2416
2417 pub fn rebuild_zone_maps(&self) {
2419 self.node_properties.rebuild_zone_maps();
2420 self.edge_properties.rebuild_zone_maps();
2421 }
2422
2423 #[must_use]
2427 pub fn statistics(&self) -> Statistics {
2428 self.statistics.read().clone()
2429 }
2430
2431 #[cfg(not(feature = "tiered-storage"))]
2436 pub fn compute_statistics(&self) {
2437 let mut stats = Statistics::new();
2438
2439 stats.total_nodes = self.node_count() as u64;
2441 stats.total_edges = self.edge_count() as u64;
2442
2443 let id_to_label = self.id_to_label.read();
2445 let label_index = self.label_index.read();
2446
2447 for (label_id, label_name) in id_to_label.iter().enumerate() {
2448 let node_count = label_index
2449 .get(label_id)
2450 .map(|set| set.len() as u64)
2451 .unwrap_or(0);
2452
2453 if node_count > 0 {
2454 let avg_out_degree = if stats.total_nodes > 0 {
2456 stats.total_edges as f64 / stats.total_nodes as f64
2457 } else {
2458 0.0
2459 };
2460
2461 let label_stats =
2462 LabelStatistics::new(node_count).with_degrees(avg_out_degree, avg_out_degree);
2463
2464 stats.update_label(label_name.as_ref(), label_stats);
2465 }
2466 }
2467
2468 let id_to_edge_type = self.id_to_edge_type.read();
2470 let edges = self.edges.read();
2471 let epoch = self.current_epoch();
2472
2473 let mut edge_type_counts: FxHashMap<u32, u64> = FxHashMap::default();
2474 for chain in edges.values() {
2475 if let Some(record) = chain.visible_at(epoch) {
2476 if !record.is_deleted() {
2477 *edge_type_counts.entry(record.type_id).or_default() += 1;
2478 }
2479 }
2480 }
2481
2482 for (type_id, count) in edge_type_counts {
2483 if let Some(type_name) = id_to_edge_type.get(type_id as usize) {
2484 let avg_degree = if stats.total_nodes > 0 {
2485 count as f64 / stats.total_nodes as f64
2486 } else {
2487 0.0
2488 };
2489
2490 let edge_stats = EdgeTypeStatistics::new(count, avg_degree, avg_degree);
2491 stats.update_edge_type(type_name.as_ref(), edge_stats);
2492 }
2493 }
2494
2495 *self.statistics.write() = stats;
2496 }
2497
2498 #[cfg(feature = "tiered-storage")]
2501 pub fn compute_statistics(&self) {
2502 let mut stats = Statistics::new();
2503
2504 stats.total_nodes = self.node_count() as u64;
2506 stats.total_edges = self.edge_count() as u64;
2507
2508 let id_to_label = self.id_to_label.read();
2510 let label_index = self.label_index.read();
2511
2512 for (label_id, label_name) in id_to_label.iter().enumerate() {
2513 let node_count = label_index
2514 .get(label_id)
2515 .map(|set| set.len() as u64)
2516 .unwrap_or(0);
2517
2518 if node_count > 0 {
2519 let avg_out_degree = if stats.total_nodes > 0 {
2520 stats.total_edges as f64 / stats.total_nodes as f64
2521 } else {
2522 0.0
2523 };
2524
2525 let label_stats =
2526 LabelStatistics::new(node_count).with_degrees(avg_out_degree, avg_out_degree);
2527
2528 stats.update_label(label_name.as_ref(), label_stats);
2529 }
2530 }
2531
2532 let id_to_edge_type = self.id_to_edge_type.read();
2534 let versions = self.edge_versions.read();
2535 let epoch = self.current_epoch();
2536
2537 let mut edge_type_counts: FxHashMap<u32, u64> = FxHashMap::default();
2538 for index in versions.values() {
2539 if let Some(vref) = index.visible_at(epoch) {
2540 if let Some(record) = self.read_edge_record(&vref) {
2541 if !record.is_deleted() {
2542 *edge_type_counts.entry(record.type_id).or_default() += 1;
2543 }
2544 }
2545 }
2546 }
2547
2548 for (type_id, count) in edge_type_counts {
2549 if let Some(type_name) = id_to_edge_type.get(type_id as usize) {
2550 let avg_degree = if stats.total_nodes > 0 {
2551 count as f64 / stats.total_nodes as f64
2552 } else {
2553 0.0
2554 };
2555
2556 let edge_stats = EdgeTypeStatistics::new(count, avg_degree, avg_degree);
2557 stats.update_edge_type(type_name.as_ref(), edge_stats);
2558 }
2559 }
2560
2561 *self.statistics.write() = stats;
2562 }
2563
2564 #[must_use]
2566 pub fn estimate_label_cardinality(&self, label: &str) -> f64 {
2567 self.statistics.read().estimate_label_cardinality(label)
2568 }
2569
2570 #[must_use]
2572 pub fn estimate_avg_degree(&self, edge_type: &str, outgoing: bool) -> f64 {
2573 self.statistics
2574 .read()
2575 .estimate_avg_degree(edge_type, outgoing)
2576 }
2577
2578 fn get_or_create_label_id(&self, label: &str) -> u32 {
2581 {
2582 let label_to_id = self.label_to_id.read();
2583 if let Some(&id) = label_to_id.get(label) {
2584 return id;
2585 }
2586 }
2587
2588 let mut label_to_id = self.label_to_id.write();
2589 let mut id_to_label = self.id_to_label.write();
2590
2591 if let Some(&id) = label_to_id.get(label) {
2593 return id;
2594 }
2595
2596 let id = id_to_label.len() as u32;
2597
2598 let label: Arc<str> = label.into();
2599 label_to_id.insert(label.clone(), id);
2600 id_to_label.push(label);
2601
2602 id
2603 }
2604
2605 fn get_or_create_edge_type_id(&self, edge_type: &str) -> u32 {
2606 {
2607 let type_to_id = self.edge_type_to_id.read();
2608 if let Some(&id) = type_to_id.get(edge_type) {
2609 return id;
2610 }
2611 }
2612
2613 let mut type_to_id = self.edge_type_to_id.write();
2614 let mut id_to_type = self.id_to_edge_type.write();
2615
2616 if let Some(&id) = type_to_id.get(edge_type) {
2618 return id;
2619 }
2620
2621 let id = id_to_type.len() as u32;
2622 let edge_type: Arc<str> = edge_type.into();
2623 type_to_id.insert(edge_type.clone(), id);
2624 id_to_type.push(edge_type);
2625
2626 id
2627 }
2628
2629 #[cfg(not(feature = "tiered-storage"))]
2636 pub fn create_node_with_id(&self, id: NodeId, labels: &[&str]) {
2637 let epoch = self.current_epoch();
2638 let mut record = NodeRecord::new(id, epoch);
2639 record.set_label_count(labels.len() as u16);
2640
2641 let mut node_label_set = FxHashSet::default();
2643 for label in labels {
2644 let label_id = self.get_or_create_label_id(*label);
2645 node_label_set.insert(label_id);
2646
2647 let mut index = self.label_index.write();
2649 while index.len() <= label_id as usize {
2650 index.push(FxHashMap::default());
2651 }
2652 index[label_id as usize].insert(id, ());
2653 }
2654
2655 self.node_labels.write().insert(id, node_label_set);
2657
2658 let chain = VersionChain::with_initial(record, epoch, TxId::SYSTEM);
2660 self.nodes.write().insert(id, chain);
2661
2662 let id_val = id.as_u64();
2664 let _ = self
2665 .next_node_id
2666 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
2667 if id_val >= current {
2668 Some(id_val + 1)
2669 } else {
2670 None
2671 }
2672 });
2673 }
2674
2675 #[cfg(feature = "tiered-storage")]
2678 pub fn create_node_with_id(&self, id: NodeId, labels: &[&str]) {
2679 let epoch = self.current_epoch();
2680 let mut record = NodeRecord::new(id, epoch);
2681 record.set_label_count(labels.len() as u16);
2682
2683 let mut node_label_set = FxHashSet::default();
2685 for label in labels {
2686 let label_id = self.get_or_create_label_id(*label);
2687 node_label_set.insert(label_id);
2688
2689 let mut index = self.label_index.write();
2691 while index.len() <= label_id as usize {
2692 index.push(FxHashMap::default());
2693 }
2694 index[label_id as usize].insert(id, ());
2695 }
2696
2697 self.node_labels.write().insert(id, node_label_set);
2699
2700 let arena = self.arena_allocator.arena_or_create(epoch);
2702 let (offset, _stored) = arena.alloc_value_with_offset(record);
2703
2704 let hot_ref = HotVersionRef::new(epoch, offset, TxId::SYSTEM);
2706 let mut versions = self.node_versions.write();
2707 versions.insert(id, VersionIndex::with_initial(hot_ref));
2708
2709 let id_val = id.as_u64();
2711 let _ = self
2712 .next_node_id
2713 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
2714 if id_val >= current {
2715 Some(id_val + 1)
2716 } else {
2717 None
2718 }
2719 });
2720 }
2721
2722 #[cfg(not(feature = "tiered-storage"))]
2726 pub fn create_edge_with_id(&self, id: EdgeId, src: NodeId, dst: NodeId, edge_type: &str) {
2727 let epoch = self.current_epoch();
2728 let type_id = self.get_or_create_edge_type_id(edge_type);
2729
2730 let record = EdgeRecord::new(id, src, dst, type_id, epoch);
2731 let chain = VersionChain::with_initial(record, epoch, TxId::SYSTEM);
2732 self.edges.write().insert(id, chain);
2733
2734 self.forward_adj.add_edge(src, dst, id);
2736 if let Some(ref backward) = self.backward_adj {
2737 backward.add_edge(dst, src, id);
2738 }
2739
2740 let id_val = id.as_u64();
2742 let _ = self
2743 .next_edge_id
2744 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
2745 if id_val >= current {
2746 Some(id_val + 1)
2747 } else {
2748 None
2749 }
2750 });
2751 }
2752
2753 #[cfg(feature = "tiered-storage")]
2756 pub fn create_edge_with_id(&self, id: EdgeId, src: NodeId, dst: NodeId, edge_type: &str) {
2757 let epoch = self.current_epoch();
2758 let type_id = self.get_or_create_edge_type_id(edge_type);
2759
2760 let record = EdgeRecord::new(id, src, dst, type_id, epoch);
2761
2762 let arena = self.arena_allocator.arena_or_create(epoch);
2764 let (offset, _stored) = arena.alloc_value_with_offset(record);
2765
2766 let hot_ref = HotVersionRef::new(epoch, offset, TxId::SYSTEM);
2768 let mut versions = self.edge_versions.write();
2769 versions.insert(id, VersionIndex::with_initial(hot_ref));
2770
2771 self.forward_adj.add_edge(src, dst, id);
2773 if let Some(ref backward) = self.backward_adj {
2774 backward.add_edge(dst, src, id);
2775 }
2776
2777 let id_val = id.as_u64();
2779 let _ = self
2780 .next_edge_id
2781 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
2782 if id_val >= current {
2783 Some(id_val + 1)
2784 } else {
2785 None
2786 }
2787 });
2788 }
2789
2790 pub fn set_epoch(&self, epoch: EpochId) {
2792 self.current_epoch.store(epoch.as_u64(), Ordering::SeqCst);
2793 }
2794}
2795
2796impl Default for LpgStore {
2797 fn default() -> Self {
2798 Self::new()
2799 }
2800}
2801
2802#[cfg(test)]
2803mod tests {
2804 use super::*;
2805
2806 #[test]
2807 fn test_create_node() {
2808 let store = LpgStore::new();
2809
2810 let id = store.create_node(&["Person"]);
2811 assert!(id.is_valid());
2812
2813 let node = store.get_node(id).unwrap();
2814 assert!(node.has_label("Person"));
2815 assert!(!node.has_label("Animal"));
2816 }
2817
2818 #[test]
2819 fn test_create_node_with_props() {
2820 let store = LpgStore::new();
2821
2822 let id = store.create_node_with_props(
2823 &["Person"],
2824 [("name", Value::from("Alice")), ("age", Value::from(30i64))],
2825 );
2826
2827 let node = store.get_node(id).unwrap();
2828 assert_eq!(
2829 node.get_property("name").and_then(|v| v.as_str()),
2830 Some("Alice")
2831 );
2832 assert_eq!(
2833 node.get_property("age").and_then(|v| v.as_int64()),
2834 Some(30)
2835 );
2836 }
2837
2838 #[test]
2839 fn test_delete_node() {
2840 let store = LpgStore::new();
2841
2842 let id = store.create_node(&["Person"]);
2843 assert_eq!(store.node_count(), 1);
2844
2845 assert!(store.delete_node(id));
2846 assert_eq!(store.node_count(), 0);
2847 assert!(store.get_node(id).is_none());
2848
2849 assert!(!store.delete_node(id));
2851 }
2852
2853 #[test]
2854 fn test_create_edge() {
2855 let store = LpgStore::new();
2856
2857 let alice = store.create_node(&["Person"]);
2858 let bob = store.create_node(&["Person"]);
2859
2860 let edge_id = store.create_edge(alice, bob, "KNOWS");
2861 assert!(edge_id.is_valid());
2862
2863 let edge = store.get_edge(edge_id).unwrap();
2864 assert_eq!(edge.src, alice);
2865 assert_eq!(edge.dst, bob);
2866 assert_eq!(edge.edge_type.as_ref(), "KNOWS");
2867 }
2868
2869 #[test]
2870 fn test_neighbors() {
2871 let store = LpgStore::new();
2872
2873 let a = store.create_node(&["Person"]);
2874 let b = store.create_node(&["Person"]);
2875 let c = store.create_node(&["Person"]);
2876
2877 store.create_edge(a, b, "KNOWS");
2878 store.create_edge(a, c, "KNOWS");
2879
2880 let outgoing: Vec<_> = store.neighbors(a, Direction::Outgoing).collect();
2881 assert_eq!(outgoing.len(), 2);
2882 assert!(outgoing.contains(&b));
2883 assert!(outgoing.contains(&c));
2884
2885 let incoming: Vec<_> = store.neighbors(b, Direction::Incoming).collect();
2886 assert_eq!(incoming.len(), 1);
2887 assert!(incoming.contains(&a));
2888 }
2889
2890 #[test]
2891 fn test_nodes_by_label() {
2892 let store = LpgStore::new();
2893
2894 let p1 = store.create_node(&["Person"]);
2895 let p2 = store.create_node(&["Person"]);
2896 let _a = store.create_node(&["Animal"]);
2897
2898 let persons = store.nodes_by_label("Person");
2899 assert_eq!(persons.len(), 2);
2900 assert!(persons.contains(&p1));
2901 assert!(persons.contains(&p2));
2902
2903 let animals = store.nodes_by_label("Animal");
2904 assert_eq!(animals.len(), 1);
2905 }
2906
2907 #[test]
2908 fn test_delete_edge() {
2909 let store = LpgStore::new();
2910
2911 let a = store.create_node(&["Person"]);
2912 let b = store.create_node(&["Person"]);
2913 let edge_id = store.create_edge(a, b, "KNOWS");
2914
2915 assert_eq!(store.edge_count(), 1);
2916
2917 assert!(store.delete_edge(edge_id));
2918 assert_eq!(store.edge_count(), 0);
2919 assert!(store.get_edge(edge_id).is_none());
2920 }
2921
2922 #[test]
2925 fn test_lpg_store_config() {
2926 let config = LpgStoreConfig {
2928 backward_edges: false,
2929 initial_node_capacity: 100,
2930 initial_edge_capacity: 200,
2931 };
2932 let store = LpgStore::with_config(config);
2933
2934 let a = store.create_node(&["Person"]);
2936 let b = store.create_node(&["Person"]);
2937 store.create_edge(a, b, "KNOWS");
2938
2939 let outgoing: Vec<_> = store.neighbors(a, Direction::Outgoing).collect();
2941 assert_eq!(outgoing.len(), 1);
2942
2943 let incoming: Vec<_> = store.neighbors(b, Direction::Incoming).collect();
2945 assert_eq!(incoming.len(), 0);
2946 }
2947
2948 #[test]
2949 fn test_epoch_management() {
2950 let store = LpgStore::new();
2951
2952 let epoch0 = store.current_epoch();
2953 assert_eq!(epoch0.as_u64(), 0);
2954
2955 let epoch1 = store.new_epoch();
2956 assert_eq!(epoch1.as_u64(), 1);
2957
2958 let current = store.current_epoch();
2959 assert_eq!(current.as_u64(), 1);
2960 }
2961
2962 #[test]
2963 fn test_node_properties() {
2964 let store = LpgStore::new();
2965 let id = store.create_node(&["Person"]);
2966
2967 store.set_node_property(id, "name", Value::from("Alice"));
2969 let name = store.get_node_property(id, &"name".into());
2970 assert!(matches!(name, Some(Value::String(s)) if s.as_ref() == "Alice"));
2971
2972 store.set_node_property(id, "name", Value::from("Bob"));
2974 let name = store.get_node_property(id, &"name".into());
2975 assert!(matches!(name, Some(Value::String(s)) if s.as_ref() == "Bob"));
2976
2977 let old = store.remove_node_property(id, "name");
2979 assert!(matches!(old, Some(Value::String(s)) if s.as_ref() == "Bob"));
2980
2981 let name = store.get_node_property(id, &"name".into());
2983 assert!(name.is_none());
2984
2985 let none = store.remove_node_property(id, "nonexistent");
2987 assert!(none.is_none());
2988 }
2989
2990 #[test]
2991 fn test_edge_properties() {
2992 let store = LpgStore::new();
2993 let a = store.create_node(&["Person"]);
2994 let b = store.create_node(&["Person"]);
2995 let edge_id = store.create_edge(a, b, "KNOWS");
2996
2997 store.set_edge_property(edge_id, "since", Value::from(2020i64));
2999 let since = store.get_edge_property(edge_id, &"since".into());
3000 assert_eq!(since.and_then(|v| v.as_int64()), Some(2020));
3001
3002 let old = store.remove_edge_property(edge_id, "since");
3004 assert_eq!(old.and_then(|v| v.as_int64()), Some(2020));
3005
3006 let since = store.get_edge_property(edge_id, &"since".into());
3007 assert!(since.is_none());
3008 }
3009
3010 #[test]
3011 fn test_add_remove_label() {
3012 let store = LpgStore::new();
3013 let id = store.create_node(&["Person"]);
3014
3015 assert!(store.add_label(id, "Employee"));
3017
3018 let node = store.get_node(id).unwrap();
3019 assert!(node.has_label("Person"));
3020 assert!(node.has_label("Employee"));
3021
3022 assert!(!store.add_label(id, "Employee"));
3024
3025 assert!(store.remove_label(id, "Employee"));
3027
3028 let node = store.get_node(id).unwrap();
3029 assert!(node.has_label("Person"));
3030 assert!(!node.has_label("Employee"));
3031
3032 assert!(!store.remove_label(id, "Employee"));
3034 assert!(!store.remove_label(id, "NonExistent"));
3035 }
3036
3037 #[test]
3038 fn test_add_label_to_nonexistent_node() {
3039 let store = LpgStore::new();
3040 let fake_id = NodeId::new(999);
3041 assert!(!store.add_label(fake_id, "Label"));
3042 }
3043
3044 #[test]
3045 fn test_remove_label_from_nonexistent_node() {
3046 let store = LpgStore::new();
3047 let fake_id = NodeId::new(999);
3048 assert!(!store.remove_label(fake_id, "Label"));
3049 }
3050
3051 #[test]
3052 fn test_node_ids() {
3053 let store = LpgStore::new();
3054
3055 let n1 = store.create_node(&["Person"]);
3056 let n2 = store.create_node(&["Person"]);
3057 let n3 = store.create_node(&["Person"]);
3058
3059 let ids = store.node_ids();
3060 assert_eq!(ids.len(), 3);
3061 assert!(ids.contains(&n1));
3062 assert!(ids.contains(&n2));
3063 assert!(ids.contains(&n3));
3064
3065 store.delete_node(n2);
3067 let ids = store.node_ids();
3068 assert_eq!(ids.len(), 2);
3069 assert!(!ids.contains(&n2));
3070 }
3071
3072 #[test]
3073 fn test_delete_node_nonexistent() {
3074 let store = LpgStore::new();
3075 let fake_id = NodeId::new(999);
3076 assert!(!store.delete_node(fake_id));
3077 }
3078
3079 #[test]
3080 fn test_delete_edge_nonexistent() {
3081 let store = LpgStore::new();
3082 let fake_id = EdgeId::new(999);
3083 assert!(!store.delete_edge(fake_id));
3084 }
3085
3086 #[test]
3087 fn test_delete_edge_double() {
3088 let store = LpgStore::new();
3089 let a = store.create_node(&["Person"]);
3090 let b = store.create_node(&["Person"]);
3091 let edge_id = store.create_edge(a, b, "KNOWS");
3092
3093 assert!(store.delete_edge(edge_id));
3094 assert!(!store.delete_edge(edge_id)); }
3096
3097 #[test]
3098 fn test_create_edge_with_props() {
3099 let store = LpgStore::new();
3100 let a = store.create_node(&["Person"]);
3101 let b = store.create_node(&["Person"]);
3102
3103 let edge_id = store.create_edge_with_props(
3104 a,
3105 b,
3106 "KNOWS",
3107 [
3108 ("since", Value::from(2020i64)),
3109 ("weight", Value::from(1.0)),
3110 ],
3111 );
3112
3113 let edge = store.get_edge(edge_id).unwrap();
3114 assert_eq!(
3115 edge.get_property("since").and_then(|v| v.as_int64()),
3116 Some(2020)
3117 );
3118 assert_eq!(
3119 edge.get_property("weight").and_then(|v| v.as_float64()),
3120 Some(1.0)
3121 );
3122 }
3123
3124 #[test]
3125 fn test_delete_node_edges() {
3126 let store = LpgStore::new();
3127
3128 let a = store.create_node(&["Person"]);
3129 let b = store.create_node(&["Person"]);
3130 let c = store.create_node(&["Person"]);
3131
3132 store.create_edge(a, b, "KNOWS"); store.create_edge(c, a, "KNOWS"); assert_eq!(store.edge_count(), 2);
3136
3137 store.delete_node_edges(a);
3139
3140 assert_eq!(store.edge_count(), 0);
3141 }
3142
3143 #[test]
3144 fn test_neighbors_both_directions() {
3145 let store = LpgStore::new();
3146
3147 let a = store.create_node(&["Person"]);
3148 let b = store.create_node(&["Person"]);
3149 let c = store.create_node(&["Person"]);
3150
3151 store.create_edge(a, b, "KNOWS"); store.create_edge(c, a, "KNOWS"); let neighbors: Vec<_> = store.neighbors(a, Direction::Both).collect();
3156 assert_eq!(neighbors.len(), 2);
3157 assert!(neighbors.contains(&b)); assert!(neighbors.contains(&c)); }
3160
3161 #[test]
3162 fn test_edges_from() {
3163 let store = LpgStore::new();
3164
3165 let a = store.create_node(&["Person"]);
3166 let b = store.create_node(&["Person"]);
3167 let c = store.create_node(&["Person"]);
3168
3169 let e1 = store.create_edge(a, b, "KNOWS");
3170 let e2 = store.create_edge(a, c, "KNOWS");
3171
3172 let edges: Vec<_> = store.edges_from(a, Direction::Outgoing).collect();
3173 assert_eq!(edges.len(), 2);
3174 assert!(edges.iter().any(|(_, e)| *e == e1));
3175 assert!(edges.iter().any(|(_, e)| *e == e2));
3176
3177 let incoming: Vec<_> = store.edges_from(b, Direction::Incoming).collect();
3179 assert_eq!(incoming.len(), 1);
3180 assert_eq!(incoming[0].1, e1);
3181 }
3182
3183 #[test]
3184 fn test_edges_to() {
3185 let store = LpgStore::new();
3186
3187 let a = store.create_node(&["Person"]);
3188 let b = store.create_node(&["Person"]);
3189 let c = store.create_node(&["Person"]);
3190
3191 let e1 = store.create_edge(a, b, "KNOWS");
3192 let e2 = store.create_edge(c, b, "KNOWS");
3193
3194 let to_b = store.edges_to(b);
3196 assert_eq!(to_b.len(), 2);
3197 assert!(to_b.iter().any(|(src, e)| *src == a && *e == e1));
3198 assert!(to_b.iter().any(|(src, e)| *src == c && *e == e2));
3199 }
3200
3201 #[test]
3202 fn test_out_degree_in_degree() {
3203 let store = LpgStore::new();
3204
3205 let a = store.create_node(&["Person"]);
3206 let b = store.create_node(&["Person"]);
3207 let c = store.create_node(&["Person"]);
3208
3209 store.create_edge(a, b, "KNOWS");
3210 store.create_edge(a, c, "KNOWS");
3211 store.create_edge(c, b, "KNOWS");
3212
3213 assert_eq!(store.out_degree(a), 2);
3214 assert_eq!(store.out_degree(b), 0);
3215 assert_eq!(store.out_degree(c), 1);
3216
3217 assert_eq!(store.in_degree(a), 0);
3218 assert_eq!(store.in_degree(b), 2);
3219 assert_eq!(store.in_degree(c), 1);
3220 }
3221
3222 #[test]
3223 fn test_edge_type() {
3224 let store = LpgStore::new();
3225
3226 let a = store.create_node(&["Person"]);
3227 let b = store.create_node(&["Person"]);
3228 let edge_id = store.create_edge(a, b, "KNOWS");
3229
3230 let edge_type = store.edge_type(edge_id);
3231 assert_eq!(edge_type.as_deref(), Some("KNOWS"));
3232
3233 let fake_id = EdgeId::new(999);
3235 assert!(store.edge_type(fake_id).is_none());
3236 }
3237
3238 #[test]
3239 fn test_count_methods() {
3240 let store = LpgStore::new();
3241
3242 assert_eq!(store.label_count(), 0);
3243 assert_eq!(store.edge_type_count(), 0);
3244 assert_eq!(store.property_key_count(), 0);
3245
3246 let a = store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
3247 let b = store.create_node(&["Company"]);
3248 store.create_edge_with_props(a, b, "WORKS_AT", [("since", Value::from(2020i64))]);
3249
3250 assert_eq!(store.label_count(), 2); assert_eq!(store.edge_type_count(), 1); assert_eq!(store.property_key_count(), 2); }
3254
3255 #[test]
3256 fn test_all_nodes_and_edges() {
3257 let store = LpgStore::new();
3258
3259 let a = store.create_node(&["Person"]);
3260 let b = store.create_node(&["Person"]);
3261 store.create_edge(a, b, "KNOWS");
3262
3263 let nodes: Vec<_> = store.all_nodes().collect();
3264 assert_eq!(nodes.len(), 2);
3265
3266 let edges: Vec<_> = store.all_edges().collect();
3267 assert_eq!(edges.len(), 1);
3268 }
3269
3270 #[test]
3271 fn test_all_labels_and_edge_types() {
3272 let store = LpgStore::new();
3273
3274 store.create_node(&["Person"]);
3275 store.create_node(&["Company"]);
3276 let a = store.create_node(&["Animal"]);
3277 let b = store.create_node(&["Animal"]);
3278 store.create_edge(a, b, "EATS");
3279
3280 let labels = store.all_labels();
3281 assert_eq!(labels.len(), 3);
3282 assert!(labels.contains(&"Person".to_string()));
3283 assert!(labels.contains(&"Company".to_string()));
3284 assert!(labels.contains(&"Animal".to_string()));
3285
3286 let edge_types = store.all_edge_types();
3287 assert_eq!(edge_types.len(), 1);
3288 assert!(edge_types.contains(&"EATS".to_string()));
3289 }
3290
3291 #[test]
3292 fn test_all_property_keys() {
3293 let store = LpgStore::new();
3294
3295 let a = store.create_node_with_props(&["Person"], [("name", Value::from("Alice"))]);
3296 let b = store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
3297 store.create_edge_with_props(a, b, "KNOWS", [("since", Value::from(2020i64))]);
3298
3299 let keys = store.all_property_keys();
3300 assert!(keys.contains(&"name".to_string()));
3301 assert!(keys.contains(&"age".to_string()));
3302 assert!(keys.contains(&"since".to_string()));
3303 }
3304
3305 #[test]
3306 fn test_nodes_with_label() {
3307 let store = LpgStore::new();
3308
3309 store.create_node(&["Person"]);
3310 store.create_node(&["Person"]);
3311 store.create_node(&["Company"]);
3312
3313 let persons: Vec<_> = store.nodes_with_label("Person").collect();
3314 assert_eq!(persons.len(), 2);
3315
3316 let companies: Vec<_> = store.nodes_with_label("Company").collect();
3317 assert_eq!(companies.len(), 1);
3318
3319 let none: Vec<_> = store.nodes_with_label("NonExistent").collect();
3320 assert_eq!(none.len(), 0);
3321 }
3322
3323 #[test]
3324 fn test_edges_with_type() {
3325 let store = LpgStore::new();
3326
3327 let a = store.create_node(&["Person"]);
3328 let b = store.create_node(&["Person"]);
3329 let c = store.create_node(&["Company"]);
3330
3331 store.create_edge(a, b, "KNOWS");
3332 store.create_edge(a, c, "WORKS_AT");
3333
3334 let knows: Vec<_> = store.edges_with_type("KNOWS").collect();
3335 assert_eq!(knows.len(), 1);
3336
3337 let works_at: Vec<_> = store.edges_with_type("WORKS_AT").collect();
3338 assert_eq!(works_at.len(), 1);
3339
3340 let none: Vec<_> = store.edges_with_type("NonExistent").collect();
3341 assert_eq!(none.len(), 0);
3342 }
3343
3344 #[test]
3345 fn test_nodes_by_label_nonexistent() {
3346 let store = LpgStore::new();
3347 store.create_node(&["Person"]);
3348
3349 let empty = store.nodes_by_label("NonExistent");
3350 assert!(empty.is_empty());
3351 }
3352
3353 #[test]
3354 fn test_statistics() {
3355 let store = LpgStore::new();
3356
3357 let a = store.create_node(&["Person"]);
3358 let b = store.create_node(&["Person"]);
3359 let c = store.create_node(&["Company"]);
3360
3361 store.create_edge(a, b, "KNOWS");
3362 store.create_edge(a, c, "WORKS_AT");
3363
3364 store.compute_statistics();
3365 let stats = store.statistics();
3366
3367 assert_eq!(stats.total_nodes, 3);
3368 assert_eq!(stats.total_edges, 2);
3369
3370 let person_card = store.estimate_label_cardinality("Person");
3372 assert!(person_card > 0.0);
3373
3374 let avg_degree = store.estimate_avg_degree("KNOWS", true);
3375 assert!(avg_degree >= 0.0);
3376 }
3377
3378 #[test]
3379 fn test_zone_maps() {
3380 let store = LpgStore::new();
3381
3382 store.create_node_with_props(&["Person"], [("age", Value::from(25i64))]);
3383 store.create_node_with_props(&["Person"], [("age", Value::from(35i64))]);
3384
3385 let might_match =
3387 store.node_property_might_match(&"age".into(), CompareOp::Eq, &Value::from(30i64));
3388 assert!(might_match);
3390
3391 let zone = store.node_property_zone_map(&"age".into());
3392 assert!(zone.is_some());
3393
3394 let no_zone = store.node_property_zone_map(&"nonexistent".into());
3396 assert!(no_zone.is_none());
3397
3398 let a = store.create_node(&["A"]);
3400 let b = store.create_node(&["B"]);
3401 store.create_edge_with_props(a, b, "REL", [("weight", Value::from(1.0))]);
3402
3403 let edge_zone = store.edge_property_zone_map(&"weight".into());
3404 assert!(edge_zone.is_some());
3405 }
3406
3407 #[test]
3408 fn test_rebuild_zone_maps() {
3409 let store = LpgStore::new();
3410 store.create_node_with_props(&["Person"], [("age", Value::from(25i64))]);
3411
3412 store.rebuild_zone_maps();
3414 }
3415
3416 #[test]
3417 fn test_create_node_with_id() {
3418 let store = LpgStore::new();
3419
3420 let specific_id = NodeId::new(100);
3421 store.create_node_with_id(specific_id, &["Person", "Employee"]);
3422
3423 let node = store.get_node(specific_id).unwrap();
3424 assert!(node.has_label("Person"));
3425 assert!(node.has_label("Employee"));
3426
3427 let next = store.create_node(&["Other"]);
3429 assert!(next.as_u64() > 100);
3430 }
3431
3432 #[test]
3433 fn test_create_edge_with_id() {
3434 let store = LpgStore::new();
3435
3436 let a = store.create_node(&["A"]);
3437 let b = store.create_node(&["B"]);
3438
3439 let specific_id = EdgeId::new(500);
3440 store.create_edge_with_id(specific_id, a, b, "REL");
3441
3442 let edge = store.get_edge(specific_id).unwrap();
3443 assert_eq!(edge.src, a);
3444 assert_eq!(edge.dst, b);
3445 assert_eq!(edge.edge_type.as_ref(), "REL");
3446
3447 let next = store.create_edge(a, b, "OTHER");
3449 assert!(next.as_u64() > 500);
3450 }
3451
3452 #[test]
3453 fn test_set_epoch() {
3454 let store = LpgStore::new();
3455
3456 assert_eq!(store.current_epoch().as_u64(), 0);
3457
3458 store.set_epoch(EpochId::new(42));
3459 assert_eq!(store.current_epoch().as_u64(), 42);
3460 }
3461
3462 #[test]
3463 fn test_get_node_nonexistent() {
3464 let store = LpgStore::new();
3465 let fake_id = NodeId::new(999);
3466 assert!(store.get_node(fake_id).is_none());
3467 }
3468
3469 #[test]
3470 fn test_get_edge_nonexistent() {
3471 let store = LpgStore::new();
3472 let fake_id = EdgeId::new(999);
3473 assert!(store.get_edge(fake_id).is_none());
3474 }
3475
3476 #[test]
3477 fn test_multiple_labels() {
3478 let store = LpgStore::new();
3479
3480 let id = store.create_node(&["Person", "Employee", "Manager"]);
3481 let node = store.get_node(id).unwrap();
3482
3483 assert!(node.has_label("Person"));
3484 assert!(node.has_label("Employee"));
3485 assert!(node.has_label("Manager"));
3486 assert!(!node.has_label("Other"));
3487 }
3488
3489 #[test]
3490 fn test_default_impl() {
3491 let store: LpgStore = Default::default();
3492 assert_eq!(store.node_count(), 0);
3493 assert_eq!(store.edge_count(), 0);
3494 }
3495
3496 #[test]
3497 fn test_edges_from_both_directions() {
3498 let store = LpgStore::new();
3499
3500 let a = store.create_node(&["A"]);
3501 let b = store.create_node(&["B"]);
3502 let c = store.create_node(&["C"]);
3503
3504 let e1 = store.create_edge(a, b, "R1"); let e2 = store.create_edge(c, a, "R2"); let edges: Vec<_> = store.edges_from(a, Direction::Both).collect();
3509 assert_eq!(edges.len(), 2);
3510 assert!(edges.iter().any(|(_, e)| *e == e1)); assert!(edges.iter().any(|(_, e)| *e == e2)); }
3513
3514 #[test]
3515 fn test_no_backward_adj_in_degree() {
3516 let config = LpgStoreConfig {
3517 backward_edges: false,
3518 initial_node_capacity: 10,
3519 initial_edge_capacity: 10,
3520 };
3521 let store = LpgStore::with_config(config);
3522
3523 let a = store.create_node(&["A"]);
3524 let b = store.create_node(&["B"]);
3525 store.create_edge(a, b, "R");
3526
3527 let degree = store.in_degree(b);
3529 assert_eq!(degree, 1);
3530 }
3531
3532 #[test]
3533 fn test_no_backward_adj_edges_to() {
3534 let config = LpgStoreConfig {
3535 backward_edges: false,
3536 initial_node_capacity: 10,
3537 initial_edge_capacity: 10,
3538 };
3539 let store = LpgStore::with_config(config);
3540
3541 let a = store.create_node(&["A"]);
3542 let b = store.create_node(&["B"]);
3543 let e = store.create_edge(a, b, "R");
3544
3545 let edges = store.edges_to(b);
3547 assert_eq!(edges.len(), 1);
3548 assert_eq!(edges[0].1, e);
3549 }
3550
3551 #[test]
3552 fn test_node_versioned_creation() {
3553 let store = LpgStore::new();
3554
3555 let epoch = store.new_epoch();
3556 let tx_id = TxId::new(1);
3557
3558 let id = store.create_node_versioned(&["Person"], epoch, tx_id);
3559 assert!(store.get_node(id).is_some());
3560 }
3561
3562 #[test]
3563 fn test_edge_versioned_creation() {
3564 let store = LpgStore::new();
3565
3566 let a = store.create_node(&["A"]);
3567 let b = store.create_node(&["B"]);
3568
3569 let epoch = store.new_epoch();
3570 let tx_id = TxId::new(1);
3571
3572 let edge_id = store.create_edge_versioned(a, b, "REL", epoch, tx_id);
3573 assert!(store.get_edge(edge_id).is_some());
3574 }
3575
3576 #[test]
3577 fn test_node_with_props_versioned() {
3578 let store = LpgStore::new();
3579
3580 let epoch = store.new_epoch();
3581 let tx_id = TxId::new(1);
3582
3583 let id = store.create_node_with_props_versioned(
3584 &["Person"],
3585 [("name", Value::from("Alice"))],
3586 epoch,
3587 tx_id,
3588 );
3589
3590 let node = store.get_node(id).unwrap();
3591 assert_eq!(
3592 node.get_property("name").and_then(|v| v.as_str()),
3593 Some("Alice")
3594 );
3595 }
3596
3597 #[test]
3598 fn test_discard_uncommitted_versions() {
3599 let store = LpgStore::new();
3600
3601 let epoch = store.new_epoch();
3602 let tx_id = TxId::new(42);
3603
3604 let node_id = store.create_node_versioned(&["Person"], epoch, tx_id);
3606 assert!(store.get_node(node_id).is_some());
3607
3608 store.discard_uncommitted_versions(tx_id);
3610
3611 assert!(store.get_node(node_id).is_none());
3613 }
3614
3615 #[test]
3618 fn test_property_index_create_and_lookup() {
3619 let store = LpgStore::new();
3620
3621 let alice = store.create_node(&["Person"]);
3623 let bob = store.create_node(&["Person"]);
3624 let charlie = store.create_node(&["Person"]);
3625
3626 store.set_node_property(alice, "city", Value::from("NYC"));
3627 store.set_node_property(bob, "city", Value::from("NYC"));
3628 store.set_node_property(charlie, "city", Value::from("LA"));
3629
3630 let nyc_people = store.find_nodes_by_property("city", &Value::from("NYC"));
3632 assert_eq!(nyc_people.len(), 2);
3633
3634 store.create_property_index("city");
3636 assert!(store.has_property_index("city"));
3637
3638 let nyc_people = store.find_nodes_by_property("city", &Value::from("NYC"));
3640 assert_eq!(nyc_people.len(), 2);
3641 assert!(nyc_people.contains(&alice));
3642 assert!(nyc_people.contains(&bob));
3643
3644 let la_people = store.find_nodes_by_property("city", &Value::from("LA"));
3645 assert_eq!(la_people.len(), 1);
3646 assert!(la_people.contains(&charlie));
3647 }
3648
3649 #[test]
3650 fn test_property_index_maintained_on_update() {
3651 let store = LpgStore::new();
3652
3653 store.create_property_index("status");
3655
3656 let node = store.create_node(&["Task"]);
3657 store.set_node_property(node, "status", Value::from("pending"));
3658
3659 let pending = store.find_nodes_by_property("status", &Value::from("pending"));
3661 assert_eq!(pending.len(), 1);
3662 assert!(pending.contains(&node));
3663
3664 store.set_node_property(node, "status", Value::from("done"));
3666
3667 let pending = store.find_nodes_by_property("status", &Value::from("pending"));
3669 assert!(pending.is_empty());
3670
3671 let done = store.find_nodes_by_property("status", &Value::from("done"));
3673 assert_eq!(done.len(), 1);
3674 assert!(done.contains(&node));
3675 }
3676
3677 #[test]
3678 fn test_property_index_maintained_on_remove() {
3679 let store = LpgStore::new();
3680
3681 store.create_property_index("tag");
3682
3683 let node = store.create_node(&["Item"]);
3684 store.set_node_property(node, "tag", Value::from("important"));
3685
3686 let found = store.find_nodes_by_property("tag", &Value::from("important"));
3688 assert_eq!(found.len(), 1);
3689
3690 store.remove_node_property(node, "tag");
3692
3693 let found = store.find_nodes_by_property("tag", &Value::from("important"));
3695 assert!(found.is_empty());
3696 }
3697
3698 #[test]
3699 fn test_property_index_drop() {
3700 let store = LpgStore::new();
3701
3702 store.create_property_index("key");
3703 assert!(store.has_property_index("key"));
3704
3705 assert!(store.drop_property_index("key"));
3706 assert!(!store.has_property_index("key"));
3707
3708 assert!(!store.drop_property_index("key"));
3710 }
3711
3712 #[test]
3713 fn test_property_index_multiple_values() {
3714 let store = LpgStore::new();
3715
3716 store.create_property_index("age");
3717
3718 let n1 = store.create_node(&["Person"]);
3720 let n2 = store.create_node(&["Person"]);
3721 let n3 = store.create_node(&["Person"]);
3722 let n4 = store.create_node(&["Person"]);
3723
3724 store.set_node_property(n1, "age", Value::from(25i64));
3725 store.set_node_property(n2, "age", Value::from(25i64));
3726 store.set_node_property(n3, "age", Value::from(30i64));
3727 store.set_node_property(n4, "age", Value::from(25i64));
3728
3729 let age_25 = store.find_nodes_by_property("age", &Value::from(25i64));
3730 assert_eq!(age_25.len(), 3);
3731
3732 let age_30 = store.find_nodes_by_property("age", &Value::from(30i64));
3733 assert_eq!(age_30.len(), 1);
3734
3735 let age_40 = store.find_nodes_by_property("age", &Value::from(40i64));
3736 assert!(age_40.is_empty());
3737 }
3738
3739 #[test]
3740 fn test_property_index_builds_from_existing_data() {
3741 let store = LpgStore::new();
3742
3743 let n1 = store.create_node(&["Person"]);
3745 let n2 = store.create_node(&["Person"]);
3746 store.set_node_property(n1, "email", Value::from("alice@example.com"));
3747 store.set_node_property(n2, "email", Value::from("bob@example.com"));
3748
3749 store.create_property_index("email");
3751
3752 let alice = store.find_nodes_by_property("email", &Value::from("alice@example.com"));
3754 assert_eq!(alice.len(), 1);
3755 assert!(alice.contains(&n1));
3756
3757 let bob = store.find_nodes_by_property("email", &Value::from("bob@example.com"));
3758 assert_eq!(bob.len(), 1);
3759 assert!(bob.contains(&n2));
3760 }
3761
3762 #[test]
3763 fn test_get_node_property_batch() {
3764 let store = LpgStore::new();
3765
3766 let n1 = store.create_node(&["Person"]);
3767 let n2 = store.create_node(&["Person"]);
3768 let n3 = store.create_node(&["Person"]);
3769
3770 store.set_node_property(n1, "age", Value::from(25i64));
3771 store.set_node_property(n2, "age", Value::from(30i64));
3772 let age_key = PropertyKey::new("age");
3775 let values = store.get_node_property_batch(&[n1, n2, n3], &age_key);
3776
3777 assert_eq!(values.len(), 3);
3778 assert_eq!(values[0], Some(Value::from(25i64)));
3779 assert_eq!(values[1], Some(Value::from(30i64)));
3780 assert_eq!(values[2], None);
3781 }
3782
3783 #[test]
3784 fn test_get_node_property_batch_empty() {
3785 let store = LpgStore::new();
3786 let key = PropertyKey::new("any");
3787
3788 let values = store.get_node_property_batch(&[], &key);
3789 assert!(values.is_empty());
3790 }
3791
3792 #[test]
3793 fn test_get_nodes_properties_batch() {
3794 let store = LpgStore::new();
3795
3796 let n1 = store.create_node(&["Person"]);
3797 let n2 = store.create_node(&["Person"]);
3798 let n3 = store.create_node(&["Person"]);
3799
3800 store.set_node_property(n1, "name", Value::from("Alice"));
3801 store.set_node_property(n1, "age", Value::from(25i64));
3802 store.set_node_property(n2, "name", Value::from("Bob"));
3803 let all_props = store.get_nodes_properties_batch(&[n1, n2, n3]);
3806
3807 assert_eq!(all_props.len(), 3);
3808 assert_eq!(all_props[0].len(), 2); assert_eq!(all_props[1].len(), 1); assert_eq!(all_props[2].len(), 0); assert_eq!(
3813 all_props[0].get(&PropertyKey::new("name")),
3814 Some(&Value::from("Alice"))
3815 );
3816 assert_eq!(
3817 all_props[1].get(&PropertyKey::new("name")),
3818 Some(&Value::from("Bob"))
3819 );
3820 }
3821
3822 #[test]
3823 fn test_get_nodes_properties_batch_empty() {
3824 let store = LpgStore::new();
3825
3826 let all_props = store.get_nodes_properties_batch(&[]);
3827 assert!(all_props.is_empty());
3828 }
3829}