1use super::property::CompareOp;
13use super::{Edge, EdgeRecord, Node, NodeRecord, PropertyStorage};
14use crate::graph::Direction;
15use crate::index::adjacency::ChunkedAdjacency;
16use crate::index::zone_map::ZoneMapEntry;
17use crate::statistics::{EdgeTypeStatistics, LabelStatistics, Statistics};
18#[cfg(not(feature = "tiered-storage"))]
19use grafeo_common::mvcc::VersionChain;
20use grafeo_common::types::{EdgeId, EpochId, NodeId, PropertyKey, TxId, Value};
21use grafeo_common::utils::hash::{FxHashMap, FxHashSet};
22use parking_lot::RwLock;
23use std::sync::Arc;
24use std::sync::atomic::{AtomicU64, Ordering};
25
26#[cfg(feature = "tiered-storage")]
28use crate::storage::EpochStore;
29#[cfg(feature = "tiered-storage")]
30use grafeo_common::memory::arena::ArenaAllocator;
31#[cfg(feature = "tiered-storage")]
32use grafeo_common::mvcc::{ColdVersionRef, HotVersionRef, VersionIndex, VersionRef};
33
34#[derive(Debug, Clone)]
40pub struct LpgStoreConfig {
41 pub backward_edges: bool,
44 pub initial_node_capacity: usize,
46 pub initial_edge_capacity: usize,
48}
49
50impl Default for LpgStoreConfig {
51 fn default() -> Self {
52 Self {
53 backward_edges: true,
54 initial_node_capacity: 1024,
55 initial_edge_capacity: 4096,
56 }
57 }
58}
59
60pub struct LpgStore {
88 #[allow(dead_code)]
90 config: LpgStoreConfig,
91
92 #[cfg(not(feature = "tiered-storage"))]
95 nodes: RwLock<FxHashMap<NodeId, VersionChain<NodeRecord>>>,
96
97 #[cfg(not(feature = "tiered-storage"))]
100 edges: RwLock<FxHashMap<EdgeId, VersionChain<EdgeRecord>>>,
101
102 #[cfg(feature = "tiered-storage")]
106 arena_allocator: Arc<ArenaAllocator>,
107
108 #[cfg(feature = "tiered-storage")]
111 node_versions: RwLock<FxHashMap<NodeId, VersionIndex>>,
112
113 #[cfg(feature = "tiered-storage")]
116 edge_versions: RwLock<FxHashMap<EdgeId, VersionIndex>>,
117
118 #[cfg(feature = "tiered-storage")]
121 epoch_store: Arc<EpochStore>,
122
123 node_properties: PropertyStorage<NodeId>,
125
126 edge_properties: PropertyStorage<EdgeId>,
128
129 label_to_id: RwLock<FxHashMap<Arc<str>, u32>>,
131
132 id_to_label: RwLock<Vec<Arc<str>>>,
134
135 edge_type_to_id: RwLock<FxHashMap<Arc<str>, u32>>,
137
138 id_to_edge_type: RwLock<Vec<Arc<str>>>,
140
141 forward_adj: ChunkedAdjacency,
143
144 backward_adj: Option<ChunkedAdjacency>,
147
148 label_index: RwLock<Vec<FxHashMap<NodeId, ()>>>,
150
151 node_labels: RwLock<FxHashMap<NodeId, FxHashSet<u32>>>,
154
155 next_node_id: AtomicU64,
157
158 next_edge_id: AtomicU64,
160
161 current_epoch: AtomicU64,
163
164 statistics: RwLock<Statistics>,
166}
167
168impl LpgStore {
169 #[must_use]
171 pub fn new() -> Self {
172 Self::with_config(LpgStoreConfig::default())
173 }
174
175 #[must_use]
177 pub fn with_config(config: LpgStoreConfig) -> Self {
178 let backward_adj = if config.backward_edges {
179 Some(ChunkedAdjacency::new())
180 } else {
181 None
182 };
183
184 Self {
185 #[cfg(not(feature = "tiered-storage"))]
186 nodes: RwLock::new(FxHashMap::default()),
187 #[cfg(not(feature = "tiered-storage"))]
188 edges: RwLock::new(FxHashMap::default()),
189 #[cfg(feature = "tiered-storage")]
190 arena_allocator: Arc::new(ArenaAllocator::new()),
191 #[cfg(feature = "tiered-storage")]
192 node_versions: RwLock::new(FxHashMap::default()),
193 #[cfg(feature = "tiered-storage")]
194 edge_versions: RwLock::new(FxHashMap::default()),
195 #[cfg(feature = "tiered-storage")]
196 epoch_store: Arc::new(EpochStore::new()),
197 node_properties: PropertyStorage::new(),
198 edge_properties: PropertyStorage::new(),
199 label_to_id: RwLock::new(FxHashMap::default()),
200 id_to_label: RwLock::new(Vec::new()),
201 edge_type_to_id: RwLock::new(FxHashMap::default()),
202 id_to_edge_type: RwLock::new(Vec::new()),
203 forward_adj: ChunkedAdjacency::new(),
204 backward_adj,
205 label_index: RwLock::new(Vec::new()),
206 node_labels: RwLock::new(FxHashMap::default()),
207 next_node_id: AtomicU64::new(0),
208 next_edge_id: AtomicU64::new(0),
209 current_epoch: AtomicU64::new(0),
210 statistics: RwLock::new(Statistics::new()),
211 config,
212 }
213 }
214
215 #[must_use]
217 pub fn current_epoch(&self) -> EpochId {
218 EpochId::new(self.current_epoch.load(Ordering::Acquire))
219 }
220
221 pub fn new_epoch(&self) -> EpochId {
223 let id = self.current_epoch.fetch_add(1, Ordering::AcqRel) + 1;
224 EpochId::new(id)
225 }
226
227 pub fn create_node(&self, labels: &[&str]) -> NodeId {
233 self.create_node_versioned(labels, self.current_epoch(), TxId::SYSTEM)
234 }
235
236 #[cfg(not(feature = "tiered-storage"))]
238 pub fn create_node_versioned(&self, labels: &[&str], epoch: EpochId, tx_id: TxId) -> NodeId {
239 let id = NodeId::new(self.next_node_id.fetch_add(1, Ordering::Relaxed));
240
241 let mut record = NodeRecord::new(id, epoch);
242 record.set_label_count(labels.len() as u16);
243
244 let mut node_label_set = FxHashSet::default();
246 for label in labels {
247 let label_id = self.get_or_create_label_id(*label);
248 node_label_set.insert(label_id);
249
250 let mut index = self.label_index.write();
252 while index.len() <= label_id as usize {
253 index.push(FxHashMap::default());
254 }
255 index[label_id as usize].insert(id, ());
256 }
257
258 self.node_labels.write().insert(id, node_label_set);
260
261 let chain = VersionChain::with_initial(record, epoch, tx_id);
263 self.nodes.write().insert(id, chain);
264 id
265 }
266
267 #[cfg(feature = "tiered-storage")]
270 pub fn create_node_versioned(&self, labels: &[&str], epoch: EpochId, tx_id: TxId) -> NodeId {
271 let id = NodeId::new(self.next_node_id.fetch_add(1, Ordering::Relaxed));
272
273 let mut record = NodeRecord::new(id, epoch);
274 record.set_label_count(labels.len() as u16);
275
276 let mut node_label_set = FxHashSet::default();
278 for label in labels {
279 let label_id = self.get_or_create_label_id(*label);
280 node_label_set.insert(label_id);
281
282 let mut index = self.label_index.write();
284 while index.len() <= label_id as usize {
285 index.push(FxHashMap::default());
286 }
287 index[label_id as usize].insert(id, ());
288 }
289
290 self.node_labels.write().insert(id, node_label_set);
292
293 let arena = self.arena_allocator.arena_or_create(epoch);
295 let (offset, _stored) = arena.alloc_value_with_offset(record);
296
297 let hot_ref = HotVersionRef::new(epoch, offset, tx_id);
299
300 let mut versions = self.node_versions.write();
302 if let Some(index) = versions.get_mut(&id) {
303 index.add_hot(hot_ref);
304 } else {
305 versions.insert(id, VersionIndex::with_initial(hot_ref));
306 }
307
308 id
309 }
310
311 pub fn create_node_with_props(
313 &self,
314 labels: &[&str],
315 properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
316 ) -> NodeId {
317 self.create_node_with_props_versioned(
318 labels,
319 properties,
320 self.current_epoch(),
321 TxId::SYSTEM,
322 )
323 }
324
325 #[cfg(not(feature = "tiered-storage"))]
327 pub fn create_node_with_props_versioned(
328 &self,
329 labels: &[&str],
330 properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
331 epoch: EpochId,
332 tx_id: TxId,
333 ) -> NodeId {
334 let id = self.create_node_versioned(labels, epoch, tx_id);
335
336 for (key, value) in properties {
337 self.node_properties.set(id, key.into(), value.into());
338 }
339
340 let count = self.node_properties.get_all(id).len() as u16;
342 if let Some(chain) = self.nodes.write().get_mut(&id) {
343 if let Some(record) = chain.latest_mut() {
344 record.props_count = count;
345 }
346 }
347
348 id
349 }
350
351 #[cfg(feature = "tiered-storage")]
354 pub fn create_node_with_props_versioned(
355 &self,
356 labels: &[&str],
357 properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
358 epoch: EpochId,
359 tx_id: TxId,
360 ) -> NodeId {
361 let id = self.create_node_versioned(labels, epoch, tx_id);
362
363 for (key, value) in properties {
364 self.node_properties.set(id, key.into(), value.into());
365 }
366
367 id
371 }
372
373 #[must_use]
375 pub fn get_node(&self, id: NodeId) -> Option<Node> {
376 self.get_node_at_epoch(id, self.current_epoch())
377 }
378
379 #[must_use]
381 #[cfg(not(feature = "tiered-storage"))]
382 pub fn get_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> Option<Node> {
383 let nodes = self.nodes.read();
384 let chain = nodes.get(&id)?;
385 let record = chain.visible_at(epoch)?;
386
387 if record.is_deleted() {
388 return None;
389 }
390
391 let mut node = Node::new(id);
392
393 let id_to_label = self.id_to_label.read();
395 let node_labels = self.node_labels.read();
396 if let Some(label_ids) = node_labels.get(&id) {
397 for &label_id in label_ids {
398 if let Some(label) = id_to_label.get(label_id as usize) {
399 node.labels.push(label.clone());
400 }
401 }
402 }
403
404 node.properties = self.node_properties.get_all(id).into_iter().collect();
406
407 Some(node)
408 }
409
410 #[must_use]
413 #[cfg(feature = "tiered-storage")]
414 pub fn get_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> Option<Node> {
415 let versions = self.node_versions.read();
416 let index = versions.get(&id)?;
417 let version_ref = index.visible_at(epoch)?;
418
419 let record = self.read_node_record(&version_ref)?;
421
422 if record.is_deleted() {
423 return None;
424 }
425
426 let mut node = Node::new(id);
427
428 let id_to_label = self.id_to_label.read();
430 let node_labels = self.node_labels.read();
431 if let Some(label_ids) = node_labels.get(&id) {
432 for &label_id in label_ids {
433 if let Some(label) = id_to_label.get(label_id as usize) {
434 node.labels.push(label.clone());
435 }
436 }
437 }
438
439 node.properties = self.node_properties.get_all(id).into_iter().collect();
441
442 Some(node)
443 }
444
445 #[must_use]
447 #[cfg(not(feature = "tiered-storage"))]
448 pub fn get_node_versioned(&self, id: NodeId, epoch: EpochId, tx_id: TxId) -> Option<Node> {
449 let nodes = self.nodes.read();
450 let chain = nodes.get(&id)?;
451 let record = chain.visible_to(epoch, tx_id)?;
452
453 if record.is_deleted() {
454 return None;
455 }
456
457 let mut node = Node::new(id);
458
459 let id_to_label = self.id_to_label.read();
461 let node_labels = self.node_labels.read();
462 if let Some(label_ids) = node_labels.get(&id) {
463 for &label_id in label_ids {
464 if let Some(label) = id_to_label.get(label_id as usize) {
465 node.labels.push(label.clone());
466 }
467 }
468 }
469
470 node.properties = self.node_properties.get_all(id).into_iter().collect();
472
473 Some(node)
474 }
475
476 #[must_use]
479 #[cfg(feature = "tiered-storage")]
480 pub fn get_node_versioned(&self, id: NodeId, epoch: EpochId, tx_id: TxId) -> Option<Node> {
481 let versions = self.node_versions.read();
482 let index = versions.get(&id)?;
483 let version_ref = index.visible_to(epoch, tx_id)?;
484
485 let record = self.read_node_record(&version_ref)?;
487
488 if record.is_deleted() {
489 return None;
490 }
491
492 let mut node = Node::new(id);
493
494 let id_to_label = self.id_to_label.read();
496 let node_labels = self.node_labels.read();
497 if let Some(label_ids) = node_labels.get(&id) {
498 for &label_id in label_ids {
499 if let Some(label) = id_to_label.get(label_id as usize) {
500 node.labels.push(label.clone());
501 }
502 }
503 }
504
505 node.properties = self.node_properties.get_all(id).into_iter().collect();
507
508 Some(node)
509 }
510
511 #[cfg(feature = "tiered-storage")]
513 #[allow(unsafe_code)]
514 fn read_node_record(&self, version_ref: &VersionRef) -> Option<NodeRecord> {
515 match version_ref {
516 VersionRef::Hot(hot_ref) => {
517 let arena = self.arena_allocator.arena(hot_ref.epoch);
518 let record: &NodeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
520 Some(record.clone())
521 }
522 VersionRef::Cold(cold_ref) => {
523 self.epoch_store
525 .get_node(cold_ref.epoch, cold_ref.block_offset, cold_ref.length)
526 }
527 }
528 }
529
530 pub fn delete_node(&self, id: NodeId) -> bool {
532 self.delete_node_at_epoch(id, self.current_epoch())
533 }
534
535 #[cfg(not(feature = "tiered-storage"))]
537 pub fn delete_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> bool {
538 let mut nodes = self.nodes.write();
539 if let Some(chain) = nodes.get_mut(&id) {
540 if let Some(record) = chain.visible_at(epoch) {
542 if record.is_deleted() {
543 return false;
544 }
545 } else {
546 return false;
548 }
549
550 chain.mark_deleted(epoch);
552
553 let mut index = self.label_index.write();
555 let mut node_labels = self.node_labels.write();
556 if let Some(label_ids) = node_labels.remove(&id) {
557 for label_id in label_ids {
558 if let Some(set) = index.get_mut(label_id as usize) {
559 set.remove(&id);
560 }
561 }
562 }
563
564 drop(nodes); drop(index);
567 drop(node_labels);
568 self.node_properties.remove_all(id);
569
570 true
573 } else {
574 false
575 }
576 }
577
578 #[cfg(feature = "tiered-storage")]
581 pub fn delete_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> bool {
582 let mut versions = self.node_versions.write();
583 if let Some(index) = versions.get_mut(&id) {
584 if let Some(version_ref) = index.visible_at(epoch) {
586 if let Some(record) = self.read_node_record(&version_ref) {
587 if record.is_deleted() {
588 return false;
589 }
590 } else {
591 return false;
592 }
593 } else {
594 return false;
595 }
596
597 index.mark_deleted(epoch);
599
600 let mut label_index = self.label_index.write();
602 let mut node_labels = self.node_labels.write();
603 if let Some(label_ids) = node_labels.remove(&id) {
604 for label_id in label_ids {
605 if let Some(set) = label_index.get_mut(label_id as usize) {
606 set.remove(&id);
607 }
608 }
609 }
610
611 drop(versions);
613 drop(label_index);
614 drop(node_labels);
615 self.node_properties.remove_all(id);
616
617 true
618 } else {
619 false
620 }
621 }
622
623 #[cfg(not(feature = "tiered-storage"))]
628 pub fn delete_node_edges(&self, node_id: NodeId) {
629 let outgoing: Vec<EdgeId> = self
631 .forward_adj
632 .edges_from(node_id)
633 .into_iter()
634 .map(|(_, edge_id)| edge_id)
635 .collect();
636
637 let incoming: Vec<EdgeId> = if let Some(ref backward) = self.backward_adj {
639 backward
640 .edges_from(node_id)
641 .into_iter()
642 .map(|(_, edge_id)| edge_id)
643 .collect()
644 } else {
645 let epoch = self.current_epoch();
647 self.edges
648 .read()
649 .iter()
650 .filter_map(|(id, chain)| {
651 chain.visible_at(epoch).and_then(|r| {
652 if !r.is_deleted() && r.dst == node_id {
653 Some(*id)
654 } else {
655 None
656 }
657 })
658 })
659 .collect()
660 };
661
662 for edge_id in outgoing.into_iter().chain(incoming) {
664 self.delete_edge(edge_id);
665 }
666 }
667
668 #[cfg(feature = "tiered-storage")]
671 pub fn delete_node_edges(&self, node_id: NodeId) {
672 let outgoing: Vec<EdgeId> = self
674 .forward_adj
675 .edges_from(node_id)
676 .into_iter()
677 .map(|(_, edge_id)| edge_id)
678 .collect();
679
680 let incoming: Vec<EdgeId> = if let Some(ref backward) = self.backward_adj {
682 backward
683 .edges_from(node_id)
684 .into_iter()
685 .map(|(_, edge_id)| edge_id)
686 .collect()
687 } else {
688 let epoch = self.current_epoch();
690 let versions = self.edge_versions.read();
691 versions
692 .iter()
693 .filter_map(|(id, index)| {
694 index.visible_at(epoch).and_then(|vref| {
695 self.read_edge_record(&vref).and_then(|r| {
696 if !r.is_deleted() && r.dst == node_id {
697 Some(*id)
698 } else {
699 None
700 }
701 })
702 })
703 })
704 .collect()
705 };
706
707 for edge_id in outgoing.into_iter().chain(incoming) {
709 self.delete_edge(edge_id);
710 }
711 }
712
713 #[cfg(not(feature = "tiered-storage"))]
715 pub fn set_node_property(&self, id: NodeId, key: &str, value: Value) {
716 self.node_properties.set(id, key.into(), value);
717
718 let count = self.node_properties.get_all(id).len() as u16;
720 if let Some(chain) = self.nodes.write().get_mut(&id) {
721 if let Some(record) = chain.latest_mut() {
722 record.props_count = count;
723 }
724 }
725 }
726
727 #[cfg(feature = "tiered-storage")]
730 pub fn set_node_property(&self, id: NodeId, key: &str, value: Value) {
731 self.node_properties.set(id, key.into(), value);
732 }
736
737 pub fn set_edge_property(&self, id: EdgeId, key: &str, value: Value) {
739 self.edge_properties.set(id, key.into(), value);
740 }
741
742 #[cfg(not(feature = "tiered-storage"))]
746 pub fn remove_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
747 let result = self.node_properties.remove(id, &key.into());
748
749 let count = self.node_properties.get_all(id).len() as u16;
751 if let Some(chain) = self.nodes.write().get_mut(&id) {
752 if let Some(record) = chain.latest_mut() {
753 record.props_count = count;
754 }
755 }
756
757 result
758 }
759
760 #[cfg(feature = "tiered-storage")]
763 pub fn remove_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
764 self.node_properties.remove(id, &key.into())
765 }
767
768 pub fn remove_edge_property(&self, id: EdgeId, key: &str) -> Option<Value> {
772 self.edge_properties.remove(id, &key.into())
773 }
774
775 #[must_use]
790 pub fn get_node_property(&self, id: NodeId, key: &PropertyKey) -> Option<Value> {
791 self.node_properties.get(id, key)
792 }
793
794 #[must_use]
798 pub fn get_edge_property(&self, id: EdgeId, key: &PropertyKey) -> Option<Value> {
799 self.edge_properties.get(id, key)
800 }
801
802 #[cfg(not(feature = "tiered-storage"))]
807 pub fn add_label(&self, node_id: NodeId, label: &str) -> bool {
808 let epoch = self.current_epoch();
809
810 let nodes = self.nodes.read();
812 if let Some(chain) = nodes.get(&node_id) {
813 if chain.visible_at(epoch).map_or(true, |r| r.is_deleted()) {
814 return false;
815 }
816 } else {
817 return false;
818 }
819 drop(nodes);
820
821 let label_id = self.get_or_create_label_id(label);
823
824 let mut node_labels = self.node_labels.write();
826 let label_set = node_labels
827 .entry(node_id)
828 .or_insert_with(FxHashSet::default);
829
830 if label_set.contains(&label_id) {
831 return false; }
833
834 label_set.insert(label_id);
835 drop(node_labels);
836
837 let mut index = self.label_index.write();
839 if (label_id as usize) >= index.len() {
840 index.resize(label_id as usize + 1, FxHashMap::default());
841 }
842 index[label_id as usize].insert(node_id, ());
843
844 if let Some(chain) = self.nodes.write().get_mut(&node_id) {
846 if let Some(record) = chain.latest_mut() {
847 let count = self.node_labels.read().get(&node_id).map_or(0, |s| s.len());
848 record.set_label_count(count as u16);
849 }
850 }
851
852 true
853 }
854
855 #[cfg(feature = "tiered-storage")]
858 pub fn add_label(&self, node_id: NodeId, label: &str) -> bool {
859 let epoch = self.current_epoch();
860
861 let versions = self.node_versions.read();
863 if let Some(index) = versions.get(&node_id) {
864 if let Some(vref) = index.visible_at(epoch) {
865 if let Some(record) = self.read_node_record(&vref) {
866 if record.is_deleted() {
867 return false;
868 }
869 } else {
870 return false;
871 }
872 } else {
873 return false;
874 }
875 } else {
876 return false;
877 }
878 drop(versions);
879
880 let label_id = self.get_or_create_label_id(label);
882
883 let mut node_labels = self.node_labels.write();
885 let label_set = node_labels
886 .entry(node_id)
887 .or_insert_with(FxHashSet::default);
888
889 if label_set.contains(&label_id) {
890 return false; }
892
893 label_set.insert(label_id);
894 drop(node_labels);
895
896 let mut index = self.label_index.write();
898 if (label_id as usize) >= index.len() {
899 index.resize(label_id as usize + 1, FxHashMap::default());
900 }
901 index[label_id as usize].insert(node_id, ());
902
903 true
907 }
908
909 #[cfg(not(feature = "tiered-storage"))]
914 pub fn remove_label(&self, node_id: NodeId, label: &str) -> bool {
915 let epoch = self.current_epoch();
916
917 let nodes = self.nodes.read();
919 if let Some(chain) = nodes.get(&node_id) {
920 if chain.visible_at(epoch).map_or(true, |r| r.is_deleted()) {
921 return false;
922 }
923 } else {
924 return false;
925 }
926 drop(nodes);
927
928 let label_id = {
930 let label_ids = self.label_to_id.read();
931 match label_ids.get(label) {
932 Some(&id) => id,
933 None => return false, }
935 };
936
937 let mut node_labels = self.node_labels.write();
939 if let Some(label_set) = node_labels.get_mut(&node_id) {
940 if !label_set.remove(&label_id) {
941 return false; }
943 } else {
944 return false;
945 }
946 drop(node_labels);
947
948 let mut index = self.label_index.write();
950 if (label_id as usize) < index.len() {
951 index[label_id as usize].remove(&node_id);
952 }
953
954 if let Some(chain) = self.nodes.write().get_mut(&node_id) {
956 if let Some(record) = chain.latest_mut() {
957 let count = self.node_labels.read().get(&node_id).map_or(0, |s| s.len());
958 record.set_label_count(count as u16);
959 }
960 }
961
962 true
963 }
964
965 #[cfg(feature = "tiered-storage")]
968 pub fn remove_label(&self, node_id: NodeId, label: &str) -> bool {
969 let epoch = self.current_epoch();
970
971 let versions = self.node_versions.read();
973 if let Some(index) = versions.get(&node_id) {
974 if let Some(vref) = index.visible_at(epoch) {
975 if let Some(record) = self.read_node_record(&vref) {
976 if record.is_deleted() {
977 return false;
978 }
979 } else {
980 return false;
981 }
982 } else {
983 return false;
984 }
985 } else {
986 return false;
987 }
988 drop(versions);
989
990 let label_id = {
992 let label_ids = self.label_to_id.read();
993 match label_ids.get(label) {
994 Some(&id) => id,
995 None => return false, }
997 };
998
999 let mut node_labels = self.node_labels.write();
1001 if let Some(label_set) = node_labels.get_mut(&node_id) {
1002 if !label_set.remove(&label_id) {
1003 return false; }
1005 } else {
1006 return false;
1007 }
1008 drop(node_labels);
1009
1010 let mut index = self.label_index.write();
1012 if (label_id as usize) < index.len() {
1013 index[label_id as usize].remove(&node_id);
1014 }
1015
1016 true
1019 }
1020
1021 #[must_use]
1023 #[cfg(not(feature = "tiered-storage"))]
1024 pub fn node_count(&self) -> usize {
1025 let epoch = self.current_epoch();
1026 self.nodes
1027 .read()
1028 .values()
1029 .filter_map(|chain| chain.visible_at(epoch))
1030 .filter(|r| !r.is_deleted())
1031 .count()
1032 }
1033
1034 #[must_use]
1037 #[cfg(feature = "tiered-storage")]
1038 pub fn node_count(&self) -> usize {
1039 let epoch = self.current_epoch();
1040 let versions = self.node_versions.read();
1041 versions
1042 .iter()
1043 .filter(|(_, index)| {
1044 index.visible_at(epoch).map_or(false, |vref| {
1045 self.read_node_record(&vref)
1046 .map_or(false, |r| !r.is_deleted())
1047 })
1048 })
1049 .count()
1050 }
1051
1052 #[must_use]
1058 #[cfg(not(feature = "tiered-storage"))]
1059 pub fn node_ids(&self) -> Vec<NodeId> {
1060 let epoch = self.current_epoch();
1061 let mut ids: Vec<NodeId> = self
1062 .nodes
1063 .read()
1064 .iter()
1065 .filter_map(|(id, chain)| {
1066 chain
1067 .visible_at(epoch)
1068 .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
1069 })
1070 .collect();
1071 ids.sort_unstable();
1072 ids
1073 }
1074
1075 #[must_use]
1078 #[cfg(feature = "tiered-storage")]
1079 pub fn node_ids(&self) -> Vec<NodeId> {
1080 let epoch = self.current_epoch();
1081 let versions = self.node_versions.read();
1082 let mut ids: Vec<NodeId> = versions
1083 .iter()
1084 .filter_map(|(id, index)| {
1085 index.visible_at(epoch).and_then(|vref| {
1086 self.read_node_record(&vref)
1087 .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
1088 })
1089 })
1090 .collect();
1091 ids.sort_unstable();
1092 ids
1093 }
1094
1095 pub fn create_edge(&self, src: NodeId, dst: NodeId, edge_type: &str) -> EdgeId {
1099 self.create_edge_versioned(src, dst, edge_type, self.current_epoch(), TxId::SYSTEM)
1100 }
1101
1102 #[cfg(not(feature = "tiered-storage"))]
1104 pub fn create_edge_versioned(
1105 &self,
1106 src: NodeId,
1107 dst: NodeId,
1108 edge_type: &str,
1109 epoch: EpochId,
1110 tx_id: TxId,
1111 ) -> EdgeId {
1112 let id = EdgeId::new(self.next_edge_id.fetch_add(1, Ordering::Relaxed));
1113 let type_id = self.get_or_create_edge_type_id(edge_type);
1114
1115 let record = EdgeRecord::new(id, src, dst, type_id, epoch);
1116 let chain = VersionChain::with_initial(record, epoch, tx_id);
1117 self.edges.write().insert(id, chain);
1118
1119 self.forward_adj.add_edge(src, dst, id);
1121 if let Some(ref backward) = self.backward_adj {
1122 backward.add_edge(dst, src, id);
1123 }
1124
1125 id
1126 }
1127
1128 #[cfg(feature = "tiered-storage")]
1131 pub fn create_edge_versioned(
1132 &self,
1133 src: NodeId,
1134 dst: NodeId,
1135 edge_type: &str,
1136 epoch: EpochId,
1137 tx_id: TxId,
1138 ) -> EdgeId {
1139 let id = EdgeId::new(self.next_edge_id.fetch_add(1, Ordering::Relaxed));
1140 let type_id = self.get_or_create_edge_type_id(edge_type);
1141
1142 let record = EdgeRecord::new(id, src, dst, type_id, epoch);
1143
1144 let arena = self.arena_allocator.arena_or_create(epoch);
1146 let (offset, _stored) = arena.alloc_value_with_offset(record);
1147
1148 let hot_ref = HotVersionRef::new(epoch, offset, tx_id);
1150
1151 let mut versions = self.edge_versions.write();
1153 if let Some(index) = versions.get_mut(&id) {
1154 index.add_hot(hot_ref);
1155 } else {
1156 versions.insert(id, VersionIndex::with_initial(hot_ref));
1157 }
1158
1159 self.forward_adj.add_edge(src, dst, id);
1161 if let Some(ref backward) = self.backward_adj {
1162 backward.add_edge(dst, src, id);
1163 }
1164
1165 id
1166 }
1167
1168 pub fn create_edge_with_props(
1170 &self,
1171 src: NodeId,
1172 dst: NodeId,
1173 edge_type: &str,
1174 properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
1175 ) -> EdgeId {
1176 let id = self.create_edge(src, dst, edge_type);
1177
1178 for (key, value) in properties {
1179 self.edge_properties.set(id, key.into(), value.into());
1180 }
1181
1182 id
1183 }
1184
1185 #[must_use]
1187 pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
1188 self.get_edge_at_epoch(id, self.current_epoch())
1189 }
1190
1191 #[must_use]
1193 #[cfg(not(feature = "tiered-storage"))]
1194 pub fn get_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> Option<Edge> {
1195 let edges = self.edges.read();
1196 let chain = edges.get(&id)?;
1197 let record = chain.visible_at(epoch)?;
1198
1199 if record.is_deleted() {
1200 return None;
1201 }
1202
1203 let edge_type = {
1204 let id_to_type = self.id_to_edge_type.read();
1205 id_to_type.get(record.type_id as usize)?.clone()
1206 };
1207
1208 let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1209
1210 edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1212
1213 Some(edge)
1214 }
1215
1216 #[must_use]
1219 #[cfg(feature = "tiered-storage")]
1220 pub fn get_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> Option<Edge> {
1221 let versions = self.edge_versions.read();
1222 let index = versions.get(&id)?;
1223 let version_ref = index.visible_at(epoch)?;
1224
1225 let record = self.read_edge_record(&version_ref)?;
1226
1227 if record.is_deleted() {
1228 return None;
1229 }
1230
1231 let edge_type = {
1232 let id_to_type = self.id_to_edge_type.read();
1233 id_to_type.get(record.type_id as usize)?.clone()
1234 };
1235
1236 let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1237
1238 edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1240
1241 Some(edge)
1242 }
1243
1244 #[must_use]
1246 #[cfg(not(feature = "tiered-storage"))]
1247 pub fn get_edge_versioned(&self, id: EdgeId, epoch: EpochId, tx_id: TxId) -> Option<Edge> {
1248 let edges = self.edges.read();
1249 let chain = edges.get(&id)?;
1250 let record = chain.visible_to(epoch, tx_id)?;
1251
1252 if record.is_deleted() {
1253 return None;
1254 }
1255
1256 let edge_type = {
1257 let id_to_type = self.id_to_edge_type.read();
1258 id_to_type.get(record.type_id as usize)?.clone()
1259 };
1260
1261 let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1262
1263 edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1265
1266 Some(edge)
1267 }
1268
1269 #[must_use]
1272 #[cfg(feature = "tiered-storage")]
1273 pub fn get_edge_versioned(&self, id: EdgeId, epoch: EpochId, tx_id: TxId) -> Option<Edge> {
1274 let versions = self.edge_versions.read();
1275 let index = versions.get(&id)?;
1276 let version_ref = index.visible_to(epoch, tx_id)?;
1277
1278 let record = self.read_edge_record(&version_ref)?;
1279
1280 if record.is_deleted() {
1281 return None;
1282 }
1283
1284 let edge_type = {
1285 let id_to_type = self.id_to_edge_type.read();
1286 id_to_type.get(record.type_id as usize)?.clone()
1287 };
1288
1289 let mut edge = Edge::new(id, record.src, record.dst, edge_type);
1290
1291 edge.properties = self.edge_properties.get_all(id).into_iter().collect();
1293
1294 Some(edge)
1295 }
1296
1297 #[cfg(feature = "tiered-storage")]
1299 #[allow(unsafe_code)]
1300 fn read_edge_record(&self, version_ref: &VersionRef) -> Option<EdgeRecord> {
1301 match version_ref {
1302 VersionRef::Hot(hot_ref) => {
1303 let arena = self.arena_allocator.arena(hot_ref.epoch);
1304 let record: &EdgeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
1306 Some(record.clone())
1307 }
1308 VersionRef::Cold(cold_ref) => {
1309 self.epoch_store
1311 .get_edge(cold_ref.epoch, cold_ref.block_offset, cold_ref.length)
1312 }
1313 }
1314 }
1315
1316 pub fn delete_edge(&self, id: EdgeId) -> bool {
1318 self.delete_edge_at_epoch(id, self.current_epoch())
1319 }
1320
1321 #[cfg(not(feature = "tiered-storage"))]
1323 pub fn delete_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> bool {
1324 let mut edges = self.edges.write();
1325 if let Some(chain) = edges.get_mut(&id) {
1326 let (src, dst) = {
1328 match chain.visible_at(epoch) {
1329 Some(record) => {
1330 if record.is_deleted() {
1331 return false;
1332 }
1333 (record.src, record.dst)
1334 }
1335 None => return false, }
1337 };
1338
1339 chain.mark_deleted(epoch);
1341
1342 drop(edges); self.forward_adj.mark_deleted(src, id);
1346 if let Some(ref backward) = self.backward_adj {
1347 backward.mark_deleted(dst, id);
1348 }
1349
1350 self.edge_properties.remove_all(id);
1352
1353 true
1354 } else {
1355 false
1356 }
1357 }
1358
1359 #[cfg(feature = "tiered-storage")]
1362 pub fn delete_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> bool {
1363 let mut versions = self.edge_versions.write();
1364 if let Some(index) = versions.get_mut(&id) {
1365 let (src, dst) = {
1367 match index.visible_at(epoch) {
1368 Some(version_ref) => {
1369 if let Some(record) = self.read_edge_record(&version_ref) {
1370 if record.is_deleted() {
1371 return false;
1372 }
1373 (record.src, record.dst)
1374 } else {
1375 return false;
1376 }
1377 }
1378 None => return false,
1379 }
1380 };
1381
1382 index.mark_deleted(epoch);
1384
1385 drop(versions); self.forward_adj.mark_deleted(src, id);
1389 if let Some(ref backward) = self.backward_adj {
1390 backward.mark_deleted(dst, id);
1391 }
1392
1393 self.edge_properties.remove_all(id);
1395
1396 true
1397 } else {
1398 false
1399 }
1400 }
1401
1402 #[must_use]
1404 #[cfg(not(feature = "tiered-storage"))]
1405 pub fn edge_count(&self) -> usize {
1406 let epoch = self.current_epoch();
1407 self.edges
1408 .read()
1409 .values()
1410 .filter_map(|chain| chain.visible_at(epoch))
1411 .filter(|r| !r.is_deleted())
1412 .count()
1413 }
1414
1415 #[must_use]
1418 #[cfg(feature = "tiered-storage")]
1419 pub fn edge_count(&self) -> usize {
1420 let epoch = self.current_epoch();
1421 let versions = self.edge_versions.read();
1422 versions
1423 .iter()
1424 .filter(|(_, index)| {
1425 index.visible_at(epoch).map_or(false, |vref| {
1426 self.read_edge_record(&vref)
1427 .map_or(false, |r| !r.is_deleted())
1428 })
1429 })
1430 .count()
1431 }
1432
1433 #[cfg(not(feature = "tiered-storage"))]
1438 pub fn discard_uncommitted_versions(&self, tx_id: TxId) {
1439 {
1441 let mut nodes = self.nodes.write();
1442 for chain in nodes.values_mut() {
1443 chain.remove_versions_by(tx_id);
1444 }
1445 nodes.retain(|_, chain| !chain.is_empty());
1447 }
1448
1449 {
1451 let mut edges = self.edges.write();
1452 for chain in edges.values_mut() {
1453 chain.remove_versions_by(tx_id);
1454 }
1455 edges.retain(|_, chain| !chain.is_empty());
1457 }
1458 }
1459
1460 #[cfg(feature = "tiered-storage")]
1463 pub fn discard_uncommitted_versions(&self, tx_id: TxId) {
1464 {
1466 let mut versions = self.node_versions.write();
1467 for index in versions.values_mut() {
1468 index.remove_versions_by(tx_id);
1469 }
1470 versions.retain(|_, index| !index.is_empty());
1472 }
1473
1474 {
1476 let mut versions = self.edge_versions.write();
1477 for index in versions.values_mut() {
1478 index.remove_versions_by(tx_id);
1479 }
1480 versions.retain(|_, index| !index.is_empty());
1482 }
1483 }
1484
1485 #[cfg(feature = "tiered-storage")]
1504 #[allow(unsafe_code)]
1505 pub fn freeze_epoch(&self, epoch: EpochId) -> usize {
1506 let mut node_records: Vec<(u64, NodeRecord)> = Vec::new();
1508 let mut node_hot_refs: Vec<(NodeId, HotVersionRef)> = Vec::new();
1509
1510 {
1511 let versions = self.node_versions.read();
1512 for (node_id, index) in versions.iter() {
1513 for hot_ref in index.hot_refs_for_epoch(epoch) {
1514 let arena = self.arena_allocator.arena(hot_ref.epoch);
1515 let record: &NodeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
1517 node_records.push((node_id.as_u64(), record.clone()));
1518 node_hot_refs.push((*node_id, *hot_ref));
1519 }
1520 }
1521 }
1522
1523 let mut edge_records: Vec<(u64, EdgeRecord)> = Vec::new();
1525 let mut edge_hot_refs: Vec<(EdgeId, HotVersionRef)> = Vec::new();
1526
1527 {
1528 let versions = self.edge_versions.read();
1529 for (edge_id, index) in versions.iter() {
1530 for hot_ref in index.hot_refs_for_epoch(epoch) {
1531 let arena = self.arena_allocator.arena(hot_ref.epoch);
1532 let record: &EdgeRecord = unsafe { arena.read_at(hot_ref.arena_offset) };
1534 edge_records.push((edge_id.as_u64(), record.clone()));
1535 edge_hot_refs.push((*edge_id, *hot_ref));
1536 }
1537 }
1538 }
1539
1540 let total_frozen = node_records.len() + edge_records.len();
1541
1542 if total_frozen == 0 {
1543 return 0;
1544 }
1545
1546 let (node_entries, edge_entries) =
1548 self.epoch_store
1549 .freeze_epoch(epoch, node_records, edge_records);
1550
1551 let node_entry_map: FxHashMap<u64, _> = node_entries
1553 .iter()
1554 .map(|e| (e.entity_id, (e.offset, e.length)))
1555 .collect();
1556 let edge_entry_map: FxHashMap<u64, _> = edge_entries
1557 .iter()
1558 .map(|e| (e.entity_id, (e.offset, e.length)))
1559 .collect();
1560
1561 {
1563 let mut versions = self.node_versions.write();
1564 for (node_id, hot_ref) in &node_hot_refs {
1565 if let Some(index) = versions.get_mut(node_id) {
1566 if let Some(&(offset, length)) = node_entry_map.get(&node_id.as_u64()) {
1567 let cold_ref = ColdVersionRef {
1568 epoch,
1569 block_offset: offset,
1570 length,
1571 created_by: hot_ref.created_by,
1572 deleted_epoch: hot_ref.deleted_epoch,
1573 };
1574 index.freeze_epoch(epoch, std::iter::once(cold_ref));
1575 }
1576 }
1577 }
1578 }
1579
1580 {
1581 let mut versions = self.edge_versions.write();
1582 for (edge_id, hot_ref) in &edge_hot_refs {
1583 if let Some(index) = versions.get_mut(edge_id) {
1584 if let Some(&(offset, length)) = edge_entry_map.get(&edge_id.as_u64()) {
1585 let cold_ref = ColdVersionRef {
1586 epoch,
1587 block_offset: offset,
1588 length,
1589 created_by: hot_ref.created_by,
1590 deleted_epoch: hot_ref.deleted_epoch,
1591 };
1592 index.freeze_epoch(epoch, std::iter::once(cold_ref));
1593 }
1594 }
1595 }
1596 }
1597
1598 total_frozen
1599 }
1600
1601 #[cfg(feature = "tiered-storage")]
1603 #[must_use]
1604 pub fn epoch_store(&self) -> &EpochStore {
1605 &self.epoch_store
1606 }
1607
1608 #[must_use]
1610 pub fn label_count(&self) -> usize {
1611 self.id_to_label.read().len()
1612 }
1613
1614 #[must_use]
1618 pub fn property_key_count(&self) -> usize {
1619 let node_keys = self.node_properties.column_count();
1620 let edge_keys = self.edge_properties.column_count();
1621 node_keys + edge_keys
1625 }
1626
1627 #[must_use]
1629 pub fn edge_type_count(&self) -> usize {
1630 self.id_to_edge_type.read().len()
1631 }
1632
1633 pub fn neighbors(
1640 &self,
1641 node: NodeId,
1642 direction: Direction,
1643 ) -> impl Iterator<Item = NodeId> + '_ {
1644 let forward: Box<dyn Iterator<Item = NodeId>> = match direction {
1645 Direction::Outgoing | Direction::Both => {
1646 Box::new(self.forward_adj.neighbors(node).into_iter())
1647 }
1648 Direction::Incoming => Box::new(std::iter::empty()),
1649 };
1650
1651 let backward: Box<dyn Iterator<Item = NodeId>> = match direction {
1652 Direction::Incoming | Direction::Both => {
1653 if let Some(ref adj) = self.backward_adj {
1654 Box::new(adj.neighbors(node).into_iter())
1655 } else {
1656 Box::new(std::iter::empty())
1657 }
1658 }
1659 Direction::Outgoing => Box::new(std::iter::empty()),
1660 };
1661
1662 forward.chain(backward)
1663 }
1664
1665 pub fn edges_from(
1669 &self,
1670 node: NodeId,
1671 direction: Direction,
1672 ) -> impl Iterator<Item = (NodeId, EdgeId)> + '_ {
1673 let forward: Box<dyn Iterator<Item = (NodeId, EdgeId)>> = match direction {
1674 Direction::Outgoing | Direction::Both => {
1675 Box::new(self.forward_adj.edges_from(node).into_iter())
1676 }
1677 Direction::Incoming => Box::new(std::iter::empty()),
1678 };
1679
1680 let backward: Box<dyn Iterator<Item = (NodeId, EdgeId)>> = match direction {
1681 Direction::Incoming | Direction::Both => {
1682 if let Some(ref adj) = self.backward_adj {
1683 Box::new(adj.edges_from(node).into_iter())
1684 } else {
1685 Box::new(std::iter::empty())
1686 }
1687 }
1688 Direction::Outgoing => Box::new(std::iter::empty()),
1689 };
1690
1691 forward.chain(backward)
1692 }
1693
1694 pub fn edges_to(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
1707 if let Some(ref backward) = self.backward_adj {
1708 backward.edges_from(node)
1709 } else {
1710 self.all_edges()
1712 .filter_map(|edge| {
1713 if edge.dst == node {
1714 Some((edge.src, edge.id))
1715 } else {
1716 None
1717 }
1718 })
1719 .collect()
1720 }
1721 }
1722
1723 #[must_use]
1727 pub fn out_degree(&self, node: NodeId) -> usize {
1728 self.forward_adj.out_degree(node)
1729 }
1730
1731 #[must_use]
1736 pub fn in_degree(&self, node: NodeId) -> usize {
1737 if let Some(ref backward) = self.backward_adj {
1738 backward.in_degree(node)
1739 } else {
1740 self.all_edges().filter(|edge| edge.dst == node).count()
1742 }
1743 }
1744
1745 #[must_use]
1747 #[cfg(not(feature = "tiered-storage"))]
1748 pub fn edge_type(&self, id: EdgeId) -> Option<Arc<str>> {
1749 let edges = self.edges.read();
1750 let chain = edges.get(&id)?;
1751 let epoch = self.current_epoch();
1752 let record = chain.visible_at(epoch)?;
1753 let id_to_type = self.id_to_edge_type.read();
1754 id_to_type.get(record.type_id as usize).cloned()
1755 }
1756
1757 #[must_use]
1760 #[cfg(feature = "tiered-storage")]
1761 pub fn edge_type(&self, id: EdgeId) -> Option<Arc<str>> {
1762 let versions = self.edge_versions.read();
1763 let index = versions.get(&id)?;
1764 let epoch = self.current_epoch();
1765 let vref = index.visible_at(epoch)?;
1766 let record = self.read_edge_record(&vref)?;
1767 let id_to_type = self.id_to_edge_type.read();
1768 id_to_type.get(record.type_id as usize).cloned()
1769 }
1770
1771 pub fn nodes_by_label(&self, label: &str) -> Vec<NodeId> {
1777 let label_to_id = self.label_to_id.read();
1778 if let Some(&label_id) = label_to_id.get(label) {
1779 let index = self.label_index.read();
1780 if let Some(set) = index.get(label_id as usize) {
1781 let mut ids: Vec<NodeId> = set.keys().copied().collect();
1782 ids.sort_unstable();
1783 return ids;
1784 }
1785 }
1786 Vec::new()
1787 }
1788
1789 #[cfg(not(feature = "tiered-storage"))]
1796 pub fn all_nodes(&self) -> impl Iterator<Item = Node> + '_ {
1797 let epoch = self.current_epoch();
1798 let node_ids: Vec<NodeId> = self
1799 .nodes
1800 .read()
1801 .iter()
1802 .filter_map(|(id, chain)| {
1803 chain
1804 .visible_at(epoch)
1805 .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
1806 })
1807 .collect();
1808
1809 node_ids.into_iter().filter_map(move |id| self.get_node(id))
1810 }
1811
1812 #[cfg(feature = "tiered-storage")]
1815 pub fn all_nodes(&self) -> impl Iterator<Item = Node> + '_ {
1816 let node_ids = self.node_ids();
1817 node_ids.into_iter().filter_map(move |id| self.get_node(id))
1818 }
1819
1820 #[cfg(not(feature = "tiered-storage"))]
1825 pub fn all_edges(&self) -> impl Iterator<Item = Edge> + '_ {
1826 let epoch = self.current_epoch();
1827 let edge_ids: Vec<EdgeId> = self
1828 .edges
1829 .read()
1830 .iter()
1831 .filter_map(|(id, chain)| {
1832 chain
1833 .visible_at(epoch)
1834 .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
1835 })
1836 .collect();
1837
1838 edge_ids.into_iter().filter_map(move |id| self.get_edge(id))
1839 }
1840
1841 #[cfg(feature = "tiered-storage")]
1844 pub fn all_edges(&self) -> impl Iterator<Item = Edge> + '_ {
1845 let epoch = self.current_epoch();
1846 let versions = self.edge_versions.read();
1847 let edge_ids: Vec<EdgeId> = versions
1848 .iter()
1849 .filter_map(|(id, index)| {
1850 index.visible_at(epoch).and_then(|vref| {
1851 self.read_edge_record(&vref)
1852 .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
1853 })
1854 })
1855 .collect();
1856
1857 edge_ids.into_iter().filter_map(move |id| self.get_edge(id))
1858 }
1859
1860 pub fn all_labels(&self) -> Vec<String> {
1862 self.id_to_label
1863 .read()
1864 .iter()
1865 .map(|s| s.to_string())
1866 .collect()
1867 }
1868
1869 pub fn all_edge_types(&self) -> Vec<String> {
1871 self.id_to_edge_type
1872 .read()
1873 .iter()
1874 .map(|s| s.to_string())
1875 .collect()
1876 }
1877
1878 pub fn all_property_keys(&self) -> Vec<String> {
1880 let mut keys = std::collections::HashSet::new();
1881 for key in self.node_properties.keys() {
1882 keys.insert(key.to_string());
1883 }
1884 for key in self.edge_properties.keys() {
1885 keys.insert(key.to_string());
1886 }
1887 keys.into_iter().collect()
1888 }
1889
1890 pub fn nodes_with_label<'a>(&'a self, label: &str) -> impl Iterator<Item = Node> + 'a {
1892 let node_ids = self.nodes_by_label(label);
1893 node_ids.into_iter().filter_map(move |id| self.get_node(id))
1894 }
1895
1896 #[cfg(not(feature = "tiered-storage"))]
1898 pub fn edges_with_type<'a>(&'a self, edge_type: &str) -> impl Iterator<Item = Edge> + 'a {
1899 let epoch = self.current_epoch();
1900 let type_to_id = self.edge_type_to_id.read();
1901
1902 if let Some(&type_id) = type_to_id.get(edge_type) {
1903 let edge_ids: Vec<EdgeId> = self
1904 .edges
1905 .read()
1906 .iter()
1907 .filter_map(|(id, chain)| {
1908 chain.visible_at(epoch).and_then(|r| {
1909 if !r.is_deleted() && r.type_id == type_id {
1910 Some(*id)
1911 } else {
1912 None
1913 }
1914 })
1915 })
1916 .collect();
1917
1918 Box::new(edge_ids.into_iter().filter_map(move |id| self.get_edge(id)))
1920 as Box<dyn Iterator<Item = Edge> + 'a>
1921 } else {
1922 Box::new(std::iter::empty()) as Box<dyn Iterator<Item = Edge> + 'a>
1924 }
1925 }
1926
1927 #[cfg(feature = "tiered-storage")]
1930 pub fn edges_with_type<'a>(&'a self, edge_type: &str) -> impl Iterator<Item = Edge> + 'a {
1931 let epoch = self.current_epoch();
1932 let type_to_id = self.edge_type_to_id.read();
1933
1934 if let Some(&type_id) = type_to_id.get(edge_type) {
1935 let versions = self.edge_versions.read();
1936 let edge_ids: Vec<EdgeId> = versions
1937 .iter()
1938 .filter_map(|(id, index)| {
1939 index.visible_at(epoch).and_then(|vref| {
1940 self.read_edge_record(&vref).and_then(|r| {
1941 if !r.is_deleted() && r.type_id == type_id {
1942 Some(*id)
1943 } else {
1944 None
1945 }
1946 })
1947 })
1948 })
1949 .collect();
1950
1951 Box::new(edge_ids.into_iter().filter_map(move |id| self.get_edge(id)))
1952 as Box<dyn Iterator<Item = Edge> + 'a>
1953 } else {
1954 Box::new(std::iter::empty()) as Box<dyn Iterator<Item = Edge> + 'a>
1955 }
1956 }
1957
1958 #[must_use]
1965 pub fn node_property_might_match(
1966 &self,
1967 property: &PropertyKey,
1968 op: CompareOp,
1969 value: &Value,
1970 ) -> bool {
1971 self.node_properties.might_match(property, op, value)
1972 }
1973
1974 #[must_use]
1976 pub fn edge_property_might_match(
1977 &self,
1978 property: &PropertyKey,
1979 op: CompareOp,
1980 value: &Value,
1981 ) -> bool {
1982 self.edge_properties.might_match(property, op, value)
1983 }
1984
1985 #[must_use]
1987 pub fn node_property_zone_map(&self, property: &PropertyKey) -> Option<ZoneMapEntry> {
1988 self.node_properties.zone_map(property)
1989 }
1990
1991 #[must_use]
1993 pub fn edge_property_zone_map(&self, property: &PropertyKey) -> Option<ZoneMapEntry> {
1994 self.edge_properties.zone_map(property)
1995 }
1996
1997 pub fn rebuild_zone_maps(&self) {
1999 self.node_properties.rebuild_zone_maps();
2000 self.edge_properties.rebuild_zone_maps();
2001 }
2002
2003 #[must_use]
2007 pub fn statistics(&self) -> Statistics {
2008 self.statistics.read().clone()
2009 }
2010
2011 #[cfg(not(feature = "tiered-storage"))]
2016 pub fn compute_statistics(&self) {
2017 let mut stats = Statistics::new();
2018
2019 stats.total_nodes = self.node_count() as u64;
2021 stats.total_edges = self.edge_count() as u64;
2022
2023 let id_to_label = self.id_to_label.read();
2025 let label_index = self.label_index.read();
2026
2027 for (label_id, label_name) in id_to_label.iter().enumerate() {
2028 let node_count = label_index
2029 .get(label_id)
2030 .map(|set| set.len() as u64)
2031 .unwrap_or(0);
2032
2033 if node_count > 0 {
2034 let avg_out_degree = if stats.total_nodes > 0 {
2036 stats.total_edges as f64 / stats.total_nodes as f64
2037 } else {
2038 0.0
2039 };
2040
2041 let label_stats =
2042 LabelStatistics::new(node_count).with_degrees(avg_out_degree, avg_out_degree);
2043
2044 stats.update_label(label_name.as_ref(), label_stats);
2045 }
2046 }
2047
2048 let id_to_edge_type = self.id_to_edge_type.read();
2050 let edges = self.edges.read();
2051 let epoch = self.current_epoch();
2052
2053 let mut edge_type_counts: FxHashMap<u32, u64> = FxHashMap::default();
2054 for chain in edges.values() {
2055 if let Some(record) = chain.visible_at(epoch) {
2056 if !record.is_deleted() {
2057 *edge_type_counts.entry(record.type_id).or_default() += 1;
2058 }
2059 }
2060 }
2061
2062 for (type_id, count) in edge_type_counts {
2063 if let Some(type_name) = id_to_edge_type.get(type_id as usize) {
2064 let avg_degree = if stats.total_nodes > 0 {
2065 count as f64 / stats.total_nodes as f64
2066 } else {
2067 0.0
2068 };
2069
2070 let edge_stats = EdgeTypeStatistics::new(count, avg_degree, avg_degree);
2071 stats.update_edge_type(type_name.as_ref(), edge_stats);
2072 }
2073 }
2074
2075 *self.statistics.write() = stats;
2076 }
2077
2078 #[cfg(feature = "tiered-storage")]
2081 pub fn compute_statistics(&self) {
2082 let mut stats = Statistics::new();
2083
2084 stats.total_nodes = self.node_count() as u64;
2086 stats.total_edges = self.edge_count() as u64;
2087
2088 let id_to_label = self.id_to_label.read();
2090 let label_index = self.label_index.read();
2091
2092 for (label_id, label_name) in id_to_label.iter().enumerate() {
2093 let node_count = label_index
2094 .get(label_id)
2095 .map(|set| set.len() as u64)
2096 .unwrap_or(0);
2097
2098 if node_count > 0 {
2099 let avg_out_degree = if stats.total_nodes > 0 {
2100 stats.total_edges as f64 / stats.total_nodes as f64
2101 } else {
2102 0.0
2103 };
2104
2105 let label_stats =
2106 LabelStatistics::new(node_count).with_degrees(avg_out_degree, avg_out_degree);
2107
2108 stats.update_label(label_name.as_ref(), label_stats);
2109 }
2110 }
2111
2112 let id_to_edge_type = self.id_to_edge_type.read();
2114 let versions = self.edge_versions.read();
2115 let epoch = self.current_epoch();
2116
2117 let mut edge_type_counts: FxHashMap<u32, u64> = FxHashMap::default();
2118 for index in versions.values() {
2119 if let Some(vref) = index.visible_at(epoch) {
2120 if let Some(record) = self.read_edge_record(&vref) {
2121 if !record.is_deleted() {
2122 *edge_type_counts.entry(record.type_id).or_default() += 1;
2123 }
2124 }
2125 }
2126 }
2127
2128 for (type_id, count) in edge_type_counts {
2129 if let Some(type_name) = id_to_edge_type.get(type_id as usize) {
2130 let avg_degree = if stats.total_nodes > 0 {
2131 count as f64 / stats.total_nodes as f64
2132 } else {
2133 0.0
2134 };
2135
2136 let edge_stats = EdgeTypeStatistics::new(count, avg_degree, avg_degree);
2137 stats.update_edge_type(type_name.as_ref(), edge_stats);
2138 }
2139 }
2140
2141 *self.statistics.write() = stats;
2142 }
2143
2144 #[must_use]
2146 pub fn estimate_label_cardinality(&self, label: &str) -> f64 {
2147 self.statistics.read().estimate_label_cardinality(label)
2148 }
2149
2150 #[must_use]
2152 pub fn estimate_avg_degree(&self, edge_type: &str, outgoing: bool) -> f64 {
2153 self.statistics
2154 .read()
2155 .estimate_avg_degree(edge_type, outgoing)
2156 }
2157
2158 fn get_or_create_label_id(&self, label: &str) -> u32 {
2161 {
2162 let label_to_id = self.label_to_id.read();
2163 if let Some(&id) = label_to_id.get(label) {
2164 return id;
2165 }
2166 }
2167
2168 let mut label_to_id = self.label_to_id.write();
2169 let mut id_to_label = self.id_to_label.write();
2170
2171 if let Some(&id) = label_to_id.get(label) {
2173 return id;
2174 }
2175
2176 let id = id_to_label.len() as u32;
2177
2178 let label: Arc<str> = label.into();
2179 label_to_id.insert(label.clone(), id);
2180 id_to_label.push(label);
2181
2182 id
2183 }
2184
2185 fn get_or_create_edge_type_id(&self, edge_type: &str) -> u32 {
2186 {
2187 let type_to_id = self.edge_type_to_id.read();
2188 if let Some(&id) = type_to_id.get(edge_type) {
2189 return id;
2190 }
2191 }
2192
2193 let mut type_to_id = self.edge_type_to_id.write();
2194 let mut id_to_type = self.id_to_edge_type.write();
2195
2196 if let Some(&id) = type_to_id.get(edge_type) {
2198 return id;
2199 }
2200
2201 let id = id_to_type.len() as u32;
2202 let edge_type: Arc<str> = edge_type.into();
2203 type_to_id.insert(edge_type.clone(), id);
2204 id_to_type.push(edge_type);
2205
2206 id
2207 }
2208
2209 #[cfg(not(feature = "tiered-storage"))]
2216 pub fn create_node_with_id(&self, id: NodeId, labels: &[&str]) {
2217 let epoch = self.current_epoch();
2218 let mut record = NodeRecord::new(id, epoch);
2219 record.set_label_count(labels.len() as u16);
2220
2221 let mut node_label_set = FxHashSet::default();
2223 for label in labels {
2224 let label_id = self.get_or_create_label_id(*label);
2225 node_label_set.insert(label_id);
2226
2227 let mut index = self.label_index.write();
2229 while index.len() <= label_id as usize {
2230 index.push(FxHashMap::default());
2231 }
2232 index[label_id as usize].insert(id, ());
2233 }
2234
2235 self.node_labels.write().insert(id, node_label_set);
2237
2238 let chain = VersionChain::with_initial(record, epoch, TxId::SYSTEM);
2240 self.nodes.write().insert(id, chain);
2241
2242 let id_val = id.as_u64();
2244 let _ = self
2245 .next_node_id
2246 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
2247 if id_val >= current {
2248 Some(id_val + 1)
2249 } else {
2250 None
2251 }
2252 });
2253 }
2254
2255 #[cfg(feature = "tiered-storage")]
2258 pub fn create_node_with_id(&self, id: NodeId, labels: &[&str]) {
2259 let epoch = self.current_epoch();
2260 let mut record = NodeRecord::new(id, epoch);
2261 record.set_label_count(labels.len() as u16);
2262
2263 let mut node_label_set = FxHashSet::default();
2265 for label in labels {
2266 let label_id = self.get_or_create_label_id(*label);
2267 node_label_set.insert(label_id);
2268
2269 let mut index = self.label_index.write();
2271 while index.len() <= label_id as usize {
2272 index.push(FxHashMap::default());
2273 }
2274 index[label_id as usize].insert(id, ());
2275 }
2276
2277 self.node_labels.write().insert(id, node_label_set);
2279
2280 let arena = self.arena_allocator.arena_or_create(epoch);
2282 let (offset, _stored) = arena.alloc_value_with_offset(record);
2283
2284 let hot_ref = HotVersionRef::new(epoch, offset, TxId::SYSTEM);
2286 let mut versions = self.node_versions.write();
2287 versions.insert(id, VersionIndex::with_initial(hot_ref));
2288
2289 let id_val = id.as_u64();
2291 let _ = self
2292 .next_node_id
2293 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
2294 if id_val >= current {
2295 Some(id_val + 1)
2296 } else {
2297 None
2298 }
2299 });
2300 }
2301
2302 #[cfg(not(feature = "tiered-storage"))]
2306 pub fn create_edge_with_id(&self, id: EdgeId, src: NodeId, dst: NodeId, edge_type: &str) {
2307 let epoch = self.current_epoch();
2308 let type_id = self.get_or_create_edge_type_id(edge_type);
2309
2310 let record = EdgeRecord::new(id, src, dst, type_id, epoch);
2311 let chain = VersionChain::with_initial(record, epoch, TxId::SYSTEM);
2312 self.edges.write().insert(id, chain);
2313
2314 self.forward_adj.add_edge(src, dst, id);
2316 if let Some(ref backward) = self.backward_adj {
2317 backward.add_edge(dst, src, id);
2318 }
2319
2320 let id_val = id.as_u64();
2322 let _ = self
2323 .next_edge_id
2324 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
2325 if id_val >= current {
2326 Some(id_val + 1)
2327 } else {
2328 None
2329 }
2330 });
2331 }
2332
2333 #[cfg(feature = "tiered-storage")]
2336 pub fn create_edge_with_id(&self, id: EdgeId, src: NodeId, dst: NodeId, edge_type: &str) {
2337 let epoch = self.current_epoch();
2338 let type_id = self.get_or_create_edge_type_id(edge_type);
2339
2340 let record = EdgeRecord::new(id, src, dst, type_id, epoch);
2341
2342 let arena = self.arena_allocator.arena_or_create(epoch);
2344 let (offset, _stored) = arena.alloc_value_with_offset(record);
2345
2346 let hot_ref = HotVersionRef::new(epoch, offset, TxId::SYSTEM);
2348 let mut versions = self.edge_versions.write();
2349 versions.insert(id, VersionIndex::with_initial(hot_ref));
2350
2351 self.forward_adj.add_edge(src, dst, id);
2353 if let Some(ref backward) = self.backward_adj {
2354 backward.add_edge(dst, src, id);
2355 }
2356
2357 let id_val = id.as_u64();
2359 let _ = self
2360 .next_edge_id
2361 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
2362 if id_val >= current {
2363 Some(id_val + 1)
2364 } else {
2365 None
2366 }
2367 });
2368 }
2369
2370 pub fn set_epoch(&self, epoch: EpochId) {
2372 self.current_epoch.store(epoch.as_u64(), Ordering::SeqCst);
2373 }
2374}
2375
2376impl Default for LpgStore {
2377 fn default() -> Self {
2378 Self::new()
2379 }
2380}
2381
2382#[cfg(test)]
2383mod tests {
2384 use super::*;
2385
2386 #[test]
2387 fn test_create_node() {
2388 let store = LpgStore::new();
2389
2390 let id = store.create_node(&["Person"]);
2391 assert!(id.is_valid());
2392
2393 let node = store.get_node(id).unwrap();
2394 assert!(node.has_label("Person"));
2395 assert!(!node.has_label("Animal"));
2396 }
2397
2398 #[test]
2399 fn test_create_node_with_props() {
2400 let store = LpgStore::new();
2401
2402 let id = store.create_node_with_props(
2403 &["Person"],
2404 [("name", Value::from("Alice")), ("age", Value::from(30i64))],
2405 );
2406
2407 let node = store.get_node(id).unwrap();
2408 assert_eq!(
2409 node.get_property("name").and_then(|v| v.as_str()),
2410 Some("Alice")
2411 );
2412 assert_eq!(
2413 node.get_property("age").and_then(|v| v.as_int64()),
2414 Some(30)
2415 );
2416 }
2417
2418 #[test]
2419 fn test_delete_node() {
2420 let store = LpgStore::new();
2421
2422 let id = store.create_node(&["Person"]);
2423 assert_eq!(store.node_count(), 1);
2424
2425 assert!(store.delete_node(id));
2426 assert_eq!(store.node_count(), 0);
2427 assert!(store.get_node(id).is_none());
2428
2429 assert!(!store.delete_node(id));
2431 }
2432
2433 #[test]
2434 fn test_create_edge() {
2435 let store = LpgStore::new();
2436
2437 let alice = store.create_node(&["Person"]);
2438 let bob = store.create_node(&["Person"]);
2439
2440 let edge_id = store.create_edge(alice, bob, "KNOWS");
2441 assert!(edge_id.is_valid());
2442
2443 let edge = store.get_edge(edge_id).unwrap();
2444 assert_eq!(edge.src, alice);
2445 assert_eq!(edge.dst, bob);
2446 assert_eq!(edge.edge_type.as_ref(), "KNOWS");
2447 }
2448
2449 #[test]
2450 fn test_neighbors() {
2451 let store = LpgStore::new();
2452
2453 let a = store.create_node(&["Person"]);
2454 let b = store.create_node(&["Person"]);
2455 let c = store.create_node(&["Person"]);
2456
2457 store.create_edge(a, b, "KNOWS");
2458 store.create_edge(a, c, "KNOWS");
2459
2460 let outgoing: Vec<_> = store.neighbors(a, Direction::Outgoing).collect();
2461 assert_eq!(outgoing.len(), 2);
2462 assert!(outgoing.contains(&b));
2463 assert!(outgoing.contains(&c));
2464
2465 let incoming: Vec<_> = store.neighbors(b, Direction::Incoming).collect();
2466 assert_eq!(incoming.len(), 1);
2467 assert!(incoming.contains(&a));
2468 }
2469
2470 #[test]
2471 fn test_nodes_by_label() {
2472 let store = LpgStore::new();
2473
2474 let p1 = store.create_node(&["Person"]);
2475 let p2 = store.create_node(&["Person"]);
2476 let _a = store.create_node(&["Animal"]);
2477
2478 let persons = store.nodes_by_label("Person");
2479 assert_eq!(persons.len(), 2);
2480 assert!(persons.contains(&p1));
2481 assert!(persons.contains(&p2));
2482
2483 let animals = store.nodes_by_label("Animal");
2484 assert_eq!(animals.len(), 1);
2485 }
2486
2487 #[test]
2488 fn test_delete_edge() {
2489 let store = LpgStore::new();
2490
2491 let a = store.create_node(&["Person"]);
2492 let b = store.create_node(&["Person"]);
2493 let edge_id = store.create_edge(a, b, "KNOWS");
2494
2495 assert_eq!(store.edge_count(), 1);
2496
2497 assert!(store.delete_edge(edge_id));
2498 assert_eq!(store.edge_count(), 0);
2499 assert!(store.get_edge(edge_id).is_none());
2500 }
2501
2502 #[test]
2505 fn test_lpg_store_config() {
2506 let config = LpgStoreConfig {
2508 backward_edges: false,
2509 initial_node_capacity: 100,
2510 initial_edge_capacity: 200,
2511 };
2512 let store = LpgStore::with_config(config);
2513
2514 let a = store.create_node(&["Person"]);
2516 let b = store.create_node(&["Person"]);
2517 store.create_edge(a, b, "KNOWS");
2518
2519 let outgoing: Vec<_> = store.neighbors(a, Direction::Outgoing).collect();
2521 assert_eq!(outgoing.len(), 1);
2522
2523 let incoming: Vec<_> = store.neighbors(b, Direction::Incoming).collect();
2525 assert_eq!(incoming.len(), 0);
2526 }
2527
2528 #[test]
2529 fn test_epoch_management() {
2530 let store = LpgStore::new();
2531
2532 let epoch0 = store.current_epoch();
2533 assert_eq!(epoch0.as_u64(), 0);
2534
2535 let epoch1 = store.new_epoch();
2536 assert_eq!(epoch1.as_u64(), 1);
2537
2538 let current = store.current_epoch();
2539 assert_eq!(current.as_u64(), 1);
2540 }
2541
2542 #[test]
2543 fn test_node_properties() {
2544 let store = LpgStore::new();
2545 let id = store.create_node(&["Person"]);
2546
2547 store.set_node_property(id, "name", Value::from("Alice"));
2549 let name = store.get_node_property(id, &"name".into());
2550 assert!(matches!(name, Some(Value::String(s)) if s.as_ref() == "Alice"));
2551
2552 store.set_node_property(id, "name", Value::from("Bob"));
2554 let name = store.get_node_property(id, &"name".into());
2555 assert!(matches!(name, Some(Value::String(s)) if s.as_ref() == "Bob"));
2556
2557 let old = store.remove_node_property(id, "name");
2559 assert!(matches!(old, Some(Value::String(s)) if s.as_ref() == "Bob"));
2560
2561 let name = store.get_node_property(id, &"name".into());
2563 assert!(name.is_none());
2564
2565 let none = store.remove_node_property(id, "nonexistent");
2567 assert!(none.is_none());
2568 }
2569
2570 #[test]
2571 fn test_edge_properties() {
2572 let store = LpgStore::new();
2573 let a = store.create_node(&["Person"]);
2574 let b = store.create_node(&["Person"]);
2575 let edge_id = store.create_edge(a, b, "KNOWS");
2576
2577 store.set_edge_property(edge_id, "since", Value::from(2020i64));
2579 let since = store.get_edge_property(edge_id, &"since".into());
2580 assert_eq!(since.and_then(|v| v.as_int64()), Some(2020));
2581
2582 let old = store.remove_edge_property(edge_id, "since");
2584 assert_eq!(old.and_then(|v| v.as_int64()), Some(2020));
2585
2586 let since = store.get_edge_property(edge_id, &"since".into());
2587 assert!(since.is_none());
2588 }
2589
2590 #[test]
2591 fn test_add_remove_label() {
2592 let store = LpgStore::new();
2593 let id = store.create_node(&["Person"]);
2594
2595 assert!(store.add_label(id, "Employee"));
2597
2598 let node = store.get_node(id).unwrap();
2599 assert!(node.has_label("Person"));
2600 assert!(node.has_label("Employee"));
2601
2602 assert!(!store.add_label(id, "Employee"));
2604
2605 assert!(store.remove_label(id, "Employee"));
2607
2608 let node = store.get_node(id).unwrap();
2609 assert!(node.has_label("Person"));
2610 assert!(!node.has_label("Employee"));
2611
2612 assert!(!store.remove_label(id, "Employee"));
2614 assert!(!store.remove_label(id, "NonExistent"));
2615 }
2616
2617 #[test]
2618 fn test_add_label_to_nonexistent_node() {
2619 let store = LpgStore::new();
2620 let fake_id = NodeId::new(999);
2621 assert!(!store.add_label(fake_id, "Label"));
2622 }
2623
2624 #[test]
2625 fn test_remove_label_from_nonexistent_node() {
2626 let store = LpgStore::new();
2627 let fake_id = NodeId::new(999);
2628 assert!(!store.remove_label(fake_id, "Label"));
2629 }
2630
2631 #[test]
2632 fn test_node_ids() {
2633 let store = LpgStore::new();
2634
2635 let n1 = store.create_node(&["Person"]);
2636 let n2 = store.create_node(&["Person"]);
2637 let n3 = store.create_node(&["Person"]);
2638
2639 let ids = store.node_ids();
2640 assert_eq!(ids.len(), 3);
2641 assert!(ids.contains(&n1));
2642 assert!(ids.contains(&n2));
2643 assert!(ids.contains(&n3));
2644
2645 store.delete_node(n2);
2647 let ids = store.node_ids();
2648 assert_eq!(ids.len(), 2);
2649 assert!(!ids.contains(&n2));
2650 }
2651
2652 #[test]
2653 fn test_delete_node_nonexistent() {
2654 let store = LpgStore::new();
2655 let fake_id = NodeId::new(999);
2656 assert!(!store.delete_node(fake_id));
2657 }
2658
2659 #[test]
2660 fn test_delete_edge_nonexistent() {
2661 let store = LpgStore::new();
2662 let fake_id = EdgeId::new(999);
2663 assert!(!store.delete_edge(fake_id));
2664 }
2665
2666 #[test]
2667 fn test_delete_edge_double() {
2668 let store = LpgStore::new();
2669 let a = store.create_node(&["Person"]);
2670 let b = store.create_node(&["Person"]);
2671 let edge_id = store.create_edge(a, b, "KNOWS");
2672
2673 assert!(store.delete_edge(edge_id));
2674 assert!(!store.delete_edge(edge_id)); }
2676
2677 #[test]
2678 fn test_create_edge_with_props() {
2679 let store = LpgStore::new();
2680 let a = store.create_node(&["Person"]);
2681 let b = store.create_node(&["Person"]);
2682
2683 let edge_id = store.create_edge_with_props(
2684 a,
2685 b,
2686 "KNOWS",
2687 [
2688 ("since", Value::from(2020i64)),
2689 ("weight", Value::from(1.0)),
2690 ],
2691 );
2692
2693 let edge = store.get_edge(edge_id).unwrap();
2694 assert_eq!(
2695 edge.get_property("since").and_then(|v| v.as_int64()),
2696 Some(2020)
2697 );
2698 assert_eq!(
2699 edge.get_property("weight").and_then(|v| v.as_float64()),
2700 Some(1.0)
2701 );
2702 }
2703
2704 #[test]
2705 fn test_delete_node_edges() {
2706 let store = LpgStore::new();
2707
2708 let a = store.create_node(&["Person"]);
2709 let b = store.create_node(&["Person"]);
2710 let c = store.create_node(&["Person"]);
2711
2712 store.create_edge(a, b, "KNOWS"); store.create_edge(c, a, "KNOWS"); assert_eq!(store.edge_count(), 2);
2716
2717 store.delete_node_edges(a);
2719
2720 assert_eq!(store.edge_count(), 0);
2721 }
2722
2723 #[test]
2724 fn test_neighbors_both_directions() {
2725 let store = LpgStore::new();
2726
2727 let a = store.create_node(&["Person"]);
2728 let b = store.create_node(&["Person"]);
2729 let c = store.create_node(&["Person"]);
2730
2731 store.create_edge(a, b, "KNOWS"); store.create_edge(c, a, "KNOWS"); let neighbors: Vec<_> = store.neighbors(a, Direction::Both).collect();
2736 assert_eq!(neighbors.len(), 2);
2737 assert!(neighbors.contains(&b)); assert!(neighbors.contains(&c)); }
2740
2741 #[test]
2742 fn test_edges_from() {
2743 let store = LpgStore::new();
2744
2745 let a = store.create_node(&["Person"]);
2746 let b = store.create_node(&["Person"]);
2747 let c = store.create_node(&["Person"]);
2748
2749 let e1 = store.create_edge(a, b, "KNOWS");
2750 let e2 = store.create_edge(a, c, "KNOWS");
2751
2752 let edges: Vec<_> = store.edges_from(a, Direction::Outgoing).collect();
2753 assert_eq!(edges.len(), 2);
2754 assert!(edges.iter().any(|(_, e)| *e == e1));
2755 assert!(edges.iter().any(|(_, e)| *e == e2));
2756
2757 let incoming: Vec<_> = store.edges_from(b, Direction::Incoming).collect();
2759 assert_eq!(incoming.len(), 1);
2760 assert_eq!(incoming[0].1, e1);
2761 }
2762
2763 #[test]
2764 fn test_edges_to() {
2765 let store = LpgStore::new();
2766
2767 let a = store.create_node(&["Person"]);
2768 let b = store.create_node(&["Person"]);
2769 let c = store.create_node(&["Person"]);
2770
2771 let e1 = store.create_edge(a, b, "KNOWS");
2772 let e2 = store.create_edge(c, b, "KNOWS");
2773
2774 let to_b = store.edges_to(b);
2776 assert_eq!(to_b.len(), 2);
2777 assert!(to_b.iter().any(|(src, e)| *src == a && *e == e1));
2778 assert!(to_b.iter().any(|(src, e)| *src == c && *e == e2));
2779 }
2780
2781 #[test]
2782 fn test_out_degree_in_degree() {
2783 let store = LpgStore::new();
2784
2785 let a = store.create_node(&["Person"]);
2786 let b = store.create_node(&["Person"]);
2787 let c = store.create_node(&["Person"]);
2788
2789 store.create_edge(a, b, "KNOWS");
2790 store.create_edge(a, c, "KNOWS");
2791 store.create_edge(c, b, "KNOWS");
2792
2793 assert_eq!(store.out_degree(a), 2);
2794 assert_eq!(store.out_degree(b), 0);
2795 assert_eq!(store.out_degree(c), 1);
2796
2797 assert_eq!(store.in_degree(a), 0);
2798 assert_eq!(store.in_degree(b), 2);
2799 assert_eq!(store.in_degree(c), 1);
2800 }
2801
2802 #[test]
2803 fn test_edge_type() {
2804 let store = LpgStore::new();
2805
2806 let a = store.create_node(&["Person"]);
2807 let b = store.create_node(&["Person"]);
2808 let edge_id = store.create_edge(a, b, "KNOWS");
2809
2810 let edge_type = store.edge_type(edge_id);
2811 assert_eq!(edge_type.as_deref(), Some("KNOWS"));
2812
2813 let fake_id = EdgeId::new(999);
2815 assert!(store.edge_type(fake_id).is_none());
2816 }
2817
2818 #[test]
2819 fn test_count_methods() {
2820 let store = LpgStore::new();
2821
2822 assert_eq!(store.label_count(), 0);
2823 assert_eq!(store.edge_type_count(), 0);
2824 assert_eq!(store.property_key_count(), 0);
2825
2826 let a = store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
2827 let b = store.create_node(&["Company"]);
2828 store.create_edge_with_props(a, b, "WORKS_AT", [("since", Value::from(2020i64))]);
2829
2830 assert_eq!(store.label_count(), 2); assert_eq!(store.edge_type_count(), 1); assert_eq!(store.property_key_count(), 2); }
2834
2835 #[test]
2836 fn test_all_nodes_and_edges() {
2837 let store = LpgStore::new();
2838
2839 let a = store.create_node(&["Person"]);
2840 let b = store.create_node(&["Person"]);
2841 store.create_edge(a, b, "KNOWS");
2842
2843 let nodes: Vec<_> = store.all_nodes().collect();
2844 assert_eq!(nodes.len(), 2);
2845
2846 let edges: Vec<_> = store.all_edges().collect();
2847 assert_eq!(edges.len(), 1);
2848 }
2849
2850 #[test]
2851 fn test_all_labels_and_edge_types() {
2852 let store = LpgStore::new();
2853
2854 store.create_node(&["Person"]);
2855 store.create_node(&["Company"]);
2856 let a = store.create_node(&["Animal"]);
2857 let b = store.create_node(&["Animal"]);
2858 store.create_edge(a, b, "EATS");
2859
2860 let labels = store.all_labels();
2861 assert_eq!(labels.len(), 3);
2862 assert!(labels.contains(&"Person".to_string()));
2863 assert!(labels.contains(&"Company".to_string()));
2864 assert!(labels.contains(&"Animal".to_string()));
2865
2866 let edge_types = store.all_edge_types();
2867 assert_eq!(edge_types.len(), 1);
2868 assert!(edge_types.contains(&"EATS".to_string()));
2869 }
2870
2871 #[test]
2872 fn test_all_property_keys() {
2873 let store = LpgStore::new();
2874
2875 let a = store.create_node_with_props(&["Person"], [("name", Value::from("Alice"))]);
2876 let b = store.create_node_with_props(&["Person"], [("age", Value::from(30i64))]);
2877 store.create_edge_with_props(a, b, "KNOWS", [("since", Value::from(2020i64))]);
2878
2879 let keys = store.all_property_keys();
2880 assert!(keys.contains(&"name".to_string()));
2881 assert!(keys.contains(&"age".to_string()));
2882 assert!(keys.contains(&"since".to_string()));
2883 }
2884
2885 #[test]
2886 fn test_nodes_with_label() {
2887 let store = LpgStore::new();
2888
2889 store.create_node(&["Person"]);
2890 store.create_node(&["Person"]);
2891 store.create_node(&["Company"]);
2892
2893 let persons: Vec<_> = store.nodes_with_label("Person").collect();
2894 assert_eq!(persons.len(), 2);
2895
2896 let companies: Vec<_> = store.nodes_with_label("Company").collect();
2897 assert_eq!(companies.len(), 1);
2898
2899 let none: Vec<_> = store.nodes_with_label("NonExistent").collect();
2900 assert_eq!(none.len(), 0);
2901 }
2902
2903 #[test]
2904 fn test_edges_with_type() {
2905 let store = LpgStore::new();
2906
2907 let a = store.create_node(&["Person"]);
2908 let b = store.create_node(&["Person"]);
2909 let c = store.create_node(&["Company"]);
2910
2911 store.create_edge(a, b, "KNOWS");
2912 store.create_edge(a, c, "WORKS_AT");
2913
2914 let knows: Vec<_> = store.edges_with_type("KNOWS").collect();
2915 assert_eq!(knows.len(), 1);
2916
2917 let works_at: Vec<_> = store.edges_with_type("WORKS_AT").collect();
2918 assert_eq!(works_at.len(), 1);
2919
2920 let none: Vec<_> = store.edges_with_type("NonExistent").collect();
2921 assert_eq!(none.len(), 0);
2922 }
2923
2924 #[test]
2925 fn test_nodes_by_label_nonexistent() {
2926 let store = LpgStore::new();
2927 store.create_node(&["Person"]);
2928
2929 let empty = store.nodes_by_label("NonExistent");
2930 assert!(empty.is_empty());
2931 }
2932
2933 #[test]
2934 fn test_statistics() {
2935 let store = LpgStore::new();
2936
2937 let a = store.create_node(&["Person"]);
2938 let b = store.create_node(&["Person"]);
2939 let c = store.create_node(&["Company"]);
2940
2941 store.create_edge(a, b, "KNOWS");
2942 store.create_edge(a, c, "WORKS_AT");
2943
2944 store.compute_statistics();
2945 let stats = store.statistics();
2946
2947 assert_eq!(stats.total_nodes, 3);
2948 assert_eq!(stats.total_edges, 2);
2949
2950 let person_card = store.estimate_label_cardinality("Person");
2952 assert!(person_card > 0.0);
2953
2954 let avg_degree = store.estimate_avg_degree("KNOWS", true);
2955 assert!(avg_degree >= 0.0);
2956 }
2957
2958 #[test]
2959 fn test_zone_maps() {
2960 let store = LpgStore::new();
2961
2962 store.create_node_with_props(&["Person"], [("age", Value::from(25i64))]);
2963 store.create_node_with_props(&["Person"], [("age", Value::from(35i64))]);
2964
2965 let might_match =
2967 store.node_property_might_match(&"age".into(), CompareOp::Eq, &Value::from(30i64));
2968 assert!(might_match);
2970
2971 let zone = store.node_property_zone_map(&"age".into());
2972 assert!(zone.is_some());
2973
2974 let no_zone = store.node_property_zone_map(&"nonexistent".into());
2976 assert!(no_zone.is_none());
2977
2978 let a = store.create_node(&["A"]);
2980 let b = store.create_node(&["B"]);
2981 store.create_edge_with_props(a, b, "REL", [("weight", Value::from(1.0))]);
2982
2983 let edge_zone = store.edge_property_zone_map(&"weight".into());
2984 assert!(edge_zone.is_some());
2985 }
2986
2987 #[test]
2988 fn test_rebuild_zone_maps() {
2989 let store = LpgStore::new();
2990 store.create_node_with_props(&["Person"], [("age", Value::from(25i64))]);
2991
2992 store.rebuild_zone_maps();
2994 }
2995
2996 #[test]
2997 fn test_create_node_with_id() {
2998 let store = LpgStore::new();
2999
3000 let specific_id = NodeId::new(100);
3001 store.create_node_with_id(specific_id, &["Person", "Employee"]);
3002
3003 let node = store.get_node(specific_id).unwrap();
3004 assert!(node.has_label("Person"));
3005 assert!(node.has_label("Employee"));
3006
3007 let next = store.create_node(&["Other"]);
3009 assert!(next.as_u64() > 100);
3010 }
3011
3012 #[test]
3013 fn test_create_edge_with_id() {
3014 let store = LpgStore::new();
3015
3016 let a = store.create_node(&["A"]);
3017 let b = store.create_node(&["B"]);
3018
3019 let specific_id = EdgeId::new(500);
3020 store.create_edge_with_id(specific_id, a, b, "REL");
3021
3022 let edge = store.get_edge(specific_id).unwrap();
3023 assert_eq!(edge.src, a);
3024 assert_eq!(edge.dst, b);
3025 assert_eq!(edge.edge_type.as_ref(), "REL");
3026
3027 let next = store.create_edge(a, b, "OTHER");
3029 assert!(next.as_u64() > 500);
3030 }
3031
3032 #[test]
3033 fn test_set_epoch() {
3034 let store = LpgStore::new();
3035
3036 assert_eq!(store.current_epoch().as_u64(), 0);
3037
3038 store.set_epoch(EpochId::new(42));
3039 assert_eq!(store.current_epoch().as_u64(), 42);
3040 }
3041
3042 #[test]
3043 fn test_get_node_nonexistent() {
3044 let store = LpgStore::new();
3045 let fake_id = NodeId::new(999);
3046 assert!(store.get_node(fake_id).is_none());
3047 }
3048
3049 #[test]
3050 fn test_get_edge_nonexistent() {
3051 let store = LpgStore::new();
3052 let fake_id = EdgeId::new(999);
3053 assert!(store.get_edge(fake_id).is_none());
3054 }
3055
3056 #[test]
3057 fn test_multiple_labels() {
3058 let store = LpgStore::new();
3059
3060 let id = store.create_node(&["Person", "Employee", "Manager"]);
3061 let node = store.get_node(id).unwrap();
3062
3063 assert!(node.has_label("Person"));
3064 assert!(node.has_label("Employee"));
3065 assert!(node.has_label("Manager"));
3066 assert!(!node.has_label("Other"));
3067 }
3068
3069 #[test]
3070 fn test_default_impl() {
3071 let store: LpgStore = Default::default();
3072 assert_eq!(store.node_count(), 0);
3073 assert_eq!(store.edge_count(), 0);
3074 }
3075
3076 #[test]
3077 fn test_edges_from_both_directions() {
3078 let store = LpgStore::new();
3079
3080 let a = store.create_node(&["A"]);
3081 let b = store.create_node(&["B"]);
3082 let c = store.create_node(&["C"]);
3083
3084 let e1 = store.create_edge(a, b, "R1"); let e2 = store.create_edge(c, a, "R2"); let edges: Vec<_> = store.edges_from(a, Direction::Both).collect();
3089 assert_eq!(edges.len(), 2);
3090 assert!(edges.iter().any(|(_, e)| *e == e1)); assert!(edges.iter().any(|(_, e)| *e == e2)); }
3093
3094 #[test]
3095 fn test_no_backward_adj_in_degree() {
3096 let config = LpgStoreConfig {
3097 backward_edges: false,
3098 initial_node_capacity: 10,
3099 initial_edge_capacity: 10,
3100 };
3101 let store = LpgStore::with_config(config);
3102
3103 let a = store.create_node(&["A"]);
3104 let b = store.create_node(&["B"]);
3105 store.create_edge(a, b, "R");
3106
3107 let degree = store.in_degree(b);
3109 assert_eq!(degree, 1);
3110 }
3111
3112 #[test]
3113 fn test_no_backward_adj_edges_to() {
3114 let config = LpgStoreConfig {
3115 backward_edges: false,
3116 initial_node_capacity: 10,
3117 initial_edge_capacity: 10,
3118 };
3119 let store = LpgStore::with_config(config);
3120
3121 let a = store.create_node(&["A"]);
3122 let b = store.create_node(&["B"]);
3123 let e = store.create_edge(a, b, "R");
3124
3125 let edges = store.edges_to(b);
3127 assert_eq!(edges.len(), 1);
3128 assert_eq!(edges[0].1, e);
3129 }
3130
3131 #[test]
3132 fn test_node_versioned_creation() {
3133 let store = LpgStore::new();
3134
3135 let epoch = store.new_epoch();
3136 let tx_id = TxId::new(1);
3137
3138 let id = store.create_node_versioned(&["Person"], epoch, tx_id);
3139 assert!(store.get_node(id).is_some());
3140 }
3141
3142 #[test]
3143 fn test_edge_versioned_creation() {
3144 let store = LpgStore::new();
3145
3146 let a = store.create_node(&["A"]);
3147 let b = store.create_node(&["B"]);
3148
3149 let epoch = store.new_epoch();
3150 let tx_id = TxId::new(1);
3151
3152 let edge_id = store.create_edge_versioned(a, b, "REL", epoch, tx_id);
3153 assert!(store.get_edge(edge_id).is_some());
3154 }
3155
3156 #[test]
3157 fn test_node_with_props_versioned() {
3158 let store = LpgStore::new();
3159
3160 let epoch = store.new_epoch();
3161 let tx_id = TxId::new(1);
3162
3163 let id = store.create_node_with_props_versioned(
3164 &["Person"],
3165 [("name", Value::from("Alice"))],
3166 epoch,
3167 tx_id,
3168 );
3169
3170 let node = store.get_node(id).unwrap();
3171 assert_eq!(
3172 node.get_property("name").and_then(|v| v.as_str()),
3173 Some("Alice")
3174 );
3175 }
3176
3177 #[test]
3178 fn test_discard_uncommitted_versions() {
3179 let store = LpgStore::new();
3180
3181 let epoch = store.new_epoch();
3182 let tx_id = TxId::new(42);
3183
3184 let node_id = store.create_node_versioned(&["Person"], epoch, tx_id);
3186 assert!(store.get_node(node_id).is_some());
3187
3188 store.discard_uncommitted_versions(tx_id);
3190
3191 assert!(store.get_node(node_id).is_none());
3193 }
3194}