1use super::property::CompareOp;
13use super::{Edge, EdgeRecord, Node, NodeRecord, PropertyStorage};
14use crate::graph::Direction;
15use crate::index::adjacency::ChunkedAdjacency;
16use crate::index::zone_map::ZoneMapEntry;
17use crate::statistics::{EdgeTypeStatistics, LabelStatistics, Statistics};
18use grafeo_common::mvcc::VersionChain;
19use grafeo_common::types::{EdgeId, EpochId, NodeId, PropertyKey, TxId, Value};
20use grafeo_common::utils::hash::{FxHashMap, FxHashSet};
21use parking_lot::RwLock;
22use std::sync::Arc;
23use std::sync::atomic::{AtomicU64, Ordering};
24
25#[derive(Debug, Clone)]
31pub struct LpgStoreConfig {
32 pub backward_edges: bool,
35 pub initial_node_capacity: usize,
37 pub initial_edge_capacity: usize,
39}
40
41impl Default for LpgStoreConfig {
42 fn default() -> Self {
43 Self {
44 backward_edges: true,
45 initial_node_capacity: 1024,
46 initial_edge_capacity: 4096,
47 }
48 }
49}
50
51pub struct LpgStore {
79 #[allow(dead_code)]
81 config: LpgStoreConfig,
82
83 nodes: RwLock<FxHashMap<NodeId, VersionChain<NodeRecord>>>,
85
86 edges: RwLock<FxHashMap<EdgeId, VersionChain<EdgeRecord>>>,
88
89 node_properties: PropertyStorage<NodeId>,
91
92 edge_properties: PropertyStorage<EdgeId>,
94
95 label_to_id: RwLock<FxHashMap<Arc<str>, u32>>,
97
98 id_to_label: RwLock<Vec<Arc<str>>>,
100
101 edge_type_to_id: RwLock<FxHashMap<Arc<str>, u32>>,
103
104 id_to_edge_type: RwLock<Vec<Arc<str>>>,
106
107 forward_adj: ChunkedAdjacency,
109
110 backward_adj: Option<ChunkedAdjacency>,
113
114 label_index: RwLock<Vec<FxHashMap<NodeId, ()>>>,
116
117 node_labels: RwLock<FxHashMap<NodeId, FxHashSet<u32>>>,
120
121 next_node_id: AtomicU64,
123
124 next_edge_id: AtomicU64,
126
127 current_epoch: AtomicU64,
129
130 statistics: RwLock<Statistics>,
132}
133
134impl LpgStore {
135 #[must_use]
137 pub fn new() -> Self {
138 Self::with_config(LpgStoreConfig::default())
139 }
140
141 #[must_use]
143 pub fn with_config(config: LpgStoreConfig) -> Self {
144 let backward_adj = if config.backward_edges {
145 Some(ChunkedAdjacency::new())
146 } else {
147 None
148 };
149
150 Self {
151 nodes: RwLock::new(FxHashMap::default()),
152 edges: RwLock::new(FxHashMap::default()),
153 node_properties: PropertyStorage::new(),
154 edge_properties: PropertyStorage::new(),
155 label_to_id: RwLock::new(FxHashMap::default()),
156 id_to_label: RwLock::new(Vec::new()),
157 edge_type_to_id: RwLock::new(FxHashMap::default()),
158 id_to_edge_type: RwLock::new(Vec::new()),
159 forward_adj: ChunkedAdjacency::new(),
160 backward_adj,
161 label_index: RwLock::new(Vec::new()),
162 node_labels: RwLock::new(FxHashMap::default()),
163 next_node_id: AtomicU64::new(0),
164 next_edge_id: AtomicU64::new(0),
165 current_epoch: AtomicU64::new(0),
166 statistics: RwLock::new(Statistics::new()),
167 config,
168 }
169 }
170
171 #[must_use]
173 pub fn current_epoch(&self) -> EpochId {
174 EpochId::new(self.current_epoch.load(Ordering::Acquire))
175 }
176
177 pub fn new_epoch(&self) -> EpochId {
179 let id = self.current_epoch.fetch_add(1, Ordering::AcqRel) + 1;
180 EpochId::new(id)
181 }
182
183 pub fn create_node(&self, labels: &[&str]) -> NodeId {
189 self.create_node_versioned(labels, self.current_epoch(), TxId::SYSTEM)
190 }
191
192 pub fn create_node_versioned(&self, labels: &[&str], epoch: EpochId, tx_id: TxId) -> NodeId {
194 let id = NodeId::new(self.next_node_id.fetch_add(1, Ordering::Relaxed));
195
196 let mut record = NodeRecord::new(id, epoch);
197 record.set_label_count(labels.len() as u16);
198
199 let mut node_label_set = FxHashSet::default();
201 for label in labels {
202 let label_id = self.get_or_create_label_id(*label);
203 node_label_set.insert(label_id);
204
205 let mut index = self.label_index.write();
207 while index.len() <= label_id as usize {
208 index.push(FxHashMap::default());
209 }
210 index[label_id as usize].insert(id, ());
211 }
212
213 self.node_labels.write().insert(id, node_label_set);
215
216 let chain = VersionChain::with_initial(record, epoch, tx_id);
218 self.nodes.write().insert(id, chain);
219 id
220 }
221
222 pub fn create_node_with_props(
224 &self,
225 labels: &[&str],
226 properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
227 ) -> NodeId {
228 self.create_node_with_props_versioned(
229 labels,
230 properties,
231 self.current_epoch(),
232 TxId::SYSTEM,
233 )
234 }
235
236 pub fn create_node_with_props_versioned(
238 &self,
239 labels: &[&str],
240 properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
241 epoch: EpochId,
242 tx_id: TxId,
243 ) -> NodeId {
244 let id = self.create_node_versioned(labels, epoch, tx_id);
245
246 for (key, value) in properties {
247 self.node_properties.set(id, key.into(), value.into());
248 }
249
250 let count = self.node_properties.get_all(id).len() as u16;
252 if let Some(chain) = self.nodes.write().get_mut(&id) {
253 if let Some(record) = chain.latest_mut() {
254 record.props_count = count;
255 }
256 }
257
258 id
259 }
260
261 #[must_use]
263 pub fn get_node(&self, id: NodeId) -> Option<Node> {
264 self.get_node_at_epoch(id, self.current_epoch())
265 }
266
267 #[must_use]
269 pub fn get_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> Option<Node> {
270 let nodes = self.nodes.read();
271 let chain = nodes.get(&id)?;
272 let record = chain.visible_at(epoch)?;
273
274 if record.is_deleted() {
275 return None;
276 }
277
278 let mut node = Node::new(id);
279
280 let id_to_label = self.id_to_label.read();
282 let node_labels = self.node_labels.read();
283 if let Some(label_ids) = node_labels.get(&id) {
284 for &label_id in label_ids {
285 if let Some(label) = id_to_label.get(label_id as usize) {
286 node.labels.push(label.clone());
287 }
288 }
289 }
290
291 node.properties = self.node_properties.get_all(id).into_iter().collect();
293
294 Some(node)
295 }
296
297 #[must_use]
299 pub fn get_node_versioned(&self, id: NodeId, epoch: EpochId, tx_id: TxId) -> Option<Node> {
300 let nodes = self.nodes.read();
301 let chain = nodes.get(&id)?;
302 let record = chain.visible_to(epoch, tx_id)?;
303
304 if record.is_deleted() {
305 return None;
306 }
307
308 let mut node = Node::new(id);
309
310 let id_to_label = self.id_to_label.read();
312 let node_labels = self.node_labels.read();
313 if let Some(label_ids) = node_labels.get(&id) {
314 for &label_id in label_ids {
315 if let Some(label) = id_to_label.get(label_id as usize) {
316 node.labels.push(label.clone());
317 }
318 }
319 }
320
321 node.properties = self.node_properties.get_all(id).into_iter().collect();
323
324 Some(node)
325 }
326
327 pub fn delete_node(&self, id: NodeId) -> bool {
329 self.delete_node_at_epoch(id, self.current_epoch())
330 }
331
332 pub fn delete_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> bool {
334 let mut nodes = self.nodes.write();
335 if let Some(chain) = nodes.get_mut(&id) {
336 if let Some(record) = chain.visible_at(epoch) {
338 if record.is_deleted() {
339 return false;
340 }
341 } else {
342 return false;
344 }
345
346 chain.mark_deleted(epoch);
348
349 let mut index = self.label_index.write();
351 let mut node_labels = self.node_labels.write();
352 if let Some(label_ids) = node_labels.remove(&id) {
353 for label_id in label_ids {
354 if let Some(set) = index.get_mut(label_id as usize) {
355 set.remove(&id);
356 }
357 }
358 }
359
360 drop(nodes); drop(index);
363 drop(node_labels);
364 self.node_properties.remove_all(id);
365
366 true
369 } else {
370 false
371 }
372 }
373
374 pub fn delete_node_edges(&self, node_id: NodeId) {
379 let outgoing: Vec<EdgeId> = self
381 .forward_adj
382 .edges_from(node_id)
383 .into_iter()
384 .map(|(_, edge_id)| edge_id)
385 .collect();
386
387 let incoming: Vec<EdgeId> = if let Some(ref backward) = self.backward_adj {
389 backward
390 .edges_from(node_id)
391 .into_iter()
392 .map(|(_, edge_id)| edge_id)
393 .collect()
394 } else {
395 let epoch = self.current_epoch();
397 self.edges
398 .read()
399 .iter()
400 .filter_map(|(id, chain)| {
401 chain.visible_at(epoch).and_then(|r| {
402 if !r.is_deleted() && r.dst == node_id {
403 Some(*id)
404 } else {
405 None
406 }
407 })
408 })
409 .collect()
410 };
411
412 for edge_id in outgoing.into_iter().chain(incoming) {
414 self.delete_edge(edge_id);
415 }
416 }
417
418 pub fn set_node_property(&self, id: NodeId, key: &str, value: Value) {
420 self.node_properties.set(id, key.into(), value);
421
422 let count = self.node_properties.get_all(id).len() as u16;
424 if let Some(chain) = self.nodes.write().get_mut(&id) {
425 if let Some(record) = chain.latest_mut() {
426 record.props_count = count;
427 }
428 }
429 }
430
431 pub fn set_edge_property(&self, id: EdgeId, key: &str, value: Value) {
433 self.edge_properties.set(id, key.into(), value);
434 }
435
436 pub fn remove_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
440 let result = self.node_properties.remove(id, &key.into());
441
442 let count = self.node_properties.get_all(id).len() as u16;
444 if let Some(chain) = self.nodes.write().get_mut(&id) {
445 if let Some(record) = chain.latest_mut() {
446 record.props_count = count;
447 }
448 }
449
450 result
451 }
452
453 pub fn remove_edge_property(&self, id: EdgeId, key: &str) -> Option<Value> {
457 self.edge_properties.remove(id, &key.into())
458 }
459
460 pub fn add_label(&self, node_id: NodeId, label: &str) -> bool {
465 let epoch = self.current_epoch();
466
467 let nodes = self.nodes.read();
469 if let Some(chain) = nodes.get(&node_id) {
470 if chain.visible_at(epoch).map_or(true, |r| r.is_deleted()) {
471 return false;
472 }
473 } else {
474 return false;
475 }
476 drop(nodes);
477
478 let label_id = self.get_or_create_label_id(label);
480
481 let mut node_labels = self.node_labels.write();
483 let label_set = node_labels
484 .entry(node_id)
485 .or_insert_with(FxHashSet::default);
486
487 if label_set.contains(&label_id) {
488 return false; }
490
491 label_set.insert(label_id);
492 drop(node_labels);
493
494 let mut index = self.label_index.write();
496 if (label_id as usize) >= index.len() {
497 index.resize(label_id as usize + 1, FxHashMap::default());
498 }
499 index[label_id as usize].insert(node_id, ());
500
501 if let Some(chain) = self.nodes.write().get_mut(&node_id) {
503 if let Some(record) = chain.latest_mut() {
504 let count = self.node_labels.read().get(&node_id).map_or(0, |s| s.len());
505 record.set_label_count(count as u16);
506 }
507 }
508
509 true
510 }
511
512 pub fn remove_label(&self, node_id: NodeId, label: &str) -> bool {
517 let epoch = self.current_epoch();
518
519 let nodes = self.nodes.read();
521 if let Some(chain) = nodes.get(&node_id) {
522 if chain.visible_at(epoch).map_or(true, |r| r.is_deleted()) {
523 return false;
524 }
525 } else {
526 return false;
527 }
528 drop(nodes);
529
530 let label_id = {
532 let label_ids = self.label_to_id.read();
533 match label_ids.get(label) {
534 Some(&id) => id,
535 None => return false, }
537 };
538
539 let mut node_labels = self.node_labels.write();
541 if let Some(label_set) = node_labels.get_mut(&node_id) {
542 if !label_set.remove(&label_id) {
543 return false; }
545 } else {
546 return false;
547 }
548 drop(node_labels);
549
550 let mut index = self.label_index.write();
552 if (label_id as usize) < index.len() {
553 index[label_id as usize].remove(&node_id);
554 }
555
556 if let Some(chain) = self.nodes.write().get_mut(&node_id) {
558 if let Some(record) = chain.latest_mut() {
559 let count = self.node_labels.read().get(&node_id).map_or(0, |s| s.len());
560 record.set_label_count(count as u16);
561 }
562 }
563
564 true
565 }
566
567 #[must_use]
569 pub fn node_count(&self) -> usize {
570 let epoch = self.current_epoch();
571 self.nodes
572 .read()
573 .values()
574 .filter_map(|chain| chain.visible_at(epoch))
575 .filter(|r| !r.is_deleted())
576 .count()
577 }
578
579 #[must_use]
584 pub fn node_ids(&self) -> Vec<NodeId> {
585 let epoch = self.current_epoch();
586 self.nodes
587 .read()
588 .iter()
589 .filter_map(|(id, chain)| {
590 chain
591 .visible_at(epoch)
592 .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
593 })
594 .collect()
595 }
596
597 pub fn create_edge(&self, src: NodeId, dst: NodeId, edge_type: &str) -> EdgeId {
601 self.create_edge_versioned(src, dst, edge_type, self.current_epoch(), TxId::SYSTEM)
602 }
603
604 pub fn create_edge_versioned(
606 &self,
607 src: NodeId,
608 dst: NodeId,
609 edge_type: &str,
610 epoch: EpochId,
611 tx_id: TxId,
612 ) -> EdgeId {
613 let id = EdgeId::new(self.next_edge_id.fetch_add(1, Ordering::Relaxed));
614 let type_id = self.get_or_create_edge_type_id(edge_type);
615
616 let record = EdgeRecord::new(id, src, dst, type_id, epoch);
617 let chain = VersionChain::with_initial(record, epoch, tx_id);
618 self.edges.write().insert(id, chain);
619
620 self.forward_adj.add_edge(src, dst, id);
622 if let Some(ref backward) = self.backward_adj {
623 backward.add_edge(dst, src, id);
624 }
625
626 id
627 }
628
629 pub fn create_edge_with_props(
631 &self,
632 src: NodeId,
633 dst: NodeId,
634 edge_type: &str,
635 properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
636 ) -> EdgeId {
637 let id = self.create_edge(src, dst, edge_type);
638
639 for (key, value) in properties {
640 self.edge_properties.set(id, key.into(), value.into());
641 }
642
643 id
644 }
645
646 #[must_use]
648 pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
649 self.get_edge_at_epoch(id, self.current_epoch())
650 }
651
652 #[must_use]
654 pub fn get_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> Option<Edge> {
655 let edges = self.edges.read();
656 let chain = edges.get(&id)?;
657 let record = chain.visible_at(epoch)?;
658
659 if record.is_deleted() {
660 return None;
661 }
662
663 let edge_type = {
664 let id_to_type = self.id_to_edge_type.read();
665 id_to_type.get(record.type_id as usize)?.clone()
666 };
667
668 let mut edge = Edge::new(id, record.src, record.dst, edge_type);
669
670 edge.properties = self.edge_properties.get_all(id).into_iter().collect();
672
673 Some(edge)
674 }
675
676 #[must_use]
678 pub fn get_edge_versioned(&self, id: EdgeId, epoch: EpochId, tx_id: TxId) -> Option<Edge> {
679 let edges = self.edges.read();
680 let chain = edges.get(&id)?;
681 let record = chain.visible_to(epoch, tx_id)?;
682
683 if record.is_deleted() {
684 return None;
685 }
686
687 let edge_type = {
688 let id_to_type = self.id_to_edge_type.read();
689 id_to_type.get(record.type_id as usize)?.clone()
690 };
691
692 let mut edge = Edge::new(id, record.src, record.dst, edge_type);
693
694 edge.properties = self.edge_properties.get_all(id).into_iter().collect();
696
697 Some(edge)
698 }
699
700 pub fn delete_edge(&self, id: EdgeId) -> bool {
702 self.delete_edge_at_epoch(id, self.current_epoch())
703 }
704
705 pub fn delete_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> bool {
707 let mut edges = self.edges.write();
708 if let Some(chain) = edges.get_mut(&id) {
709 let (src, dst) = {
711 match chain.visible_at(epoch) {
712 Some(record) => {
713 if record.is_deleted() {
714 return false;
715 }
716 (record.src, record.dst)
717 }
718 None => return false, }
720 };
721
722 chain.mark_deleted(epoch);
724
725 drop(edges); self.forward_adj.mark_deleted(src, id);
729 if let Some(ref backward) = self.backward_adj {
730 backward.mark_deleted(dst, id);
731 }
732
733 self.edge_properties.remove_all(id);
735
736 true
737 } else {
738 false
739 }
740 }
741
742 #[must_use]
744 pub fn edge_count(&self) -> usize {
745 let epoch = self.current_epoch();
746 self.edges
747 .read()
748 .values()
749 .filter_map(|chain| chain.visible_at(epoch))
750 .filter(|r| !r.is_deleted())
751 .count()
752 }
753
754 pub fn discard_uncommitted_versions(&self, tx_id: TxId) {
759 {
761 let mut nodes = self.nodes.write();
762 for chain in nodes.values_mut() {
763 chain.remove_versions_by(tx_id);
764 }
765 nodes.retain(|_, chain| !chain.is_empty());
767 }
768
769 {
771 let mut edges = self.edges.write();
772 for chain in edges.values_mut() {
773 chain.remove_versions_by(tx_id);
774 }
775 edges.retain(|_, chain| !chain.is_empty());
777 }
778 }
779
780 #[must_use]
782 pub fn label_count(&self) -> usize {
783 self.id_to_label.read().len()
784 }
785
786 #[must_use]
790 pub fn property_key_count(&self) -> usize {
791 let node_keys = self.node_properties.column_count();
792 let edge_keys = self.edge_properties.column_count();
793 node_keys + edge_keys
797 }
798
799 #[must_use]
801 pub fn edge_type_count(&self) -> usize {
802 self.id_to_edge_type.read().len()
803 }
804
805 pub fn neighbors(
812 &self,
813 node: NodeId,
814 direction: Direction,
815 ) -> impl Iterator<Item = NodeId> + '_ {
816 let forward: Box<dyn Iterator<Item = NodeId>> = match direction {
817 Direction::Outgoing | Direction::Both => {
818 Box::new(self.forward_adj.neighbors(node).into_iter())
819 }
820 Direction::Incoming => Box::new(std::iter::empty()),
821 };
822
823 let backward: Box<dyn Iterator<Item = NodeId>> = match direction {
824 Direction::Incoming | Direction::Both => {
825 if let Some(ref adj) = self.backward_adj {
826 Box::new(adj.neighbors(node).into_iter())
827 } else {
828 Box::new(std::iter::empty())
829 }
830 }
831 Direction::Outgoing => Box::new(std::iter::empty()),
832 };
833
834 forward.chain(backward)
835 }
836
837 pub fn edges_from(
841 &self,
842 node: NodeId,
843 direction: Direction,
844 ) -> impl Iterator<Item = (NodeId, EdgeId)> + '_ {
845 let forward: Box<dyn Iterator<Item = (NodeId, EdgeId)>> = match direction {
846 Direction::Outgoing | Direction::Both => {
847 Box::new(self.forward_adj.edges_from(node).into_iter())
848 }
849 Direction::Incoming => Box::new(std::iter::empty()),
850 };
851
852 let backward: Box<dyn Iterator<Item = (NodeId, EdgeId)>> = match direction {
853 Direction::Incoming | Direction::Both => {
854 if let Some(ref adj) = self.backward_adj {
855 Box::new(adj.edges_from(node).into_iter())
856 } else {
857 Box::new(std::iter::empty())
858 }
859 }
860 Direction::Outgoing => Box::new(std::iter::empty()),
861 };
862
863 forward.chain(backward)
864 }
865
866 #[must_use]
868 pub fn edge_type(&self, id: EdgeId) -> Option<Arc<str>> {
869 let edges = self.edges.read();
870 let chain = edges.get(&id)?;
871 let epoch = self.current_epoch();
872 let record = chain.visible_at(epoch)?;
873 let id_to_type = self.id_to_edge_type.read();
874 id_to_type.get(record.type_id as usize).cloned()
875 }
876
877 pub fn nodes_by_label(&self, label: &str) -> Vec<NodeId> {
882 let label_to_id = self.label_to_id.read();
883 if let Some(&label_id) = label_to_id.get(label) {
884 let index = self.label_index.read();
885 if let Some(set) = index.get(label_id as usize) {
886 return set.keys().copied().collect();
887 }
888 }
889 Vec::new()
890 }
891
892 pub fn all_nodes(&self) -> impl Iterator<Item = Node> + '_ {
899 let epoch = self.current_epoch();
900 let node_ids: Vec<NodeId> = self
901 .nodes
902 .read()
903 .iter()
904 .filter_map(|(id, chain)| {
905 chain
906 .visible_at(epoch)
907 .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
908 })
909 .collect();
910
911 node_ids.into_iter().filter_map(move |id| self.get_node(id))
912 }
913
914 pub fn all_edges(&self) -> impl Iterator<Item = Edge> + '_ {
919 let epoch = self.current_epoch();
920 let edge_ids: Vec<EdgeId> = self
921 .edges
922 .read()
923 .iter()
924 .filter_map(|(id, chain)| {
925 chain
926 .visible_at(epoch)
927 .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
928 })
929 .collect();
930
931 edge_ids.into_iter().filter_map(move |id| self.get_edge(id))
932 }
933
934 pub fn all_labels(&self) -> Vec<String> {
936 self.id_to_label
937 .read()
938 .iter()
939 .map(|s| s.to_string())
940 .collect()
941 }
942
943 pub fn all_edge_types(&self) -> Vec<String> {
945 self.id_to_edge_type
946 .read()
947 .iter()
948 .map(|s| s.to_string())
949 .collect()
950 }
951
952 pub fn all_property_keys(&self) -> Vec<String> {
954 let mut keys = std::collections::HashSet::new();
955 for key in self.node_properties.keys() {
956 keys.insert(key.to_string());
957 }
958 for key in self.edge_properties.keys() {
959 keys.insert(key.to_string());
960 }
961 keys.into_iter().collect()
962 }
963
964 pub fn nodes_with_label<'a>(&'a self, label: &str) -> impl Iterator<Item = Node> + 'a {
966 let node_ids = self.nodes_by_label(label);
967 node_ids.into_iter().filter_map(move |id| self.get_node(id))
968 }
969
970 pub fn edges_with_type<'a>(&'a self, edge_type: &str) -> impl Iterator<Item = Edge> + 'a {
972 let epoch = self.current_epoch();
973 let type_to_id = self.edge_type_to_id.read();
974
975 if let Some(&type_id) = type_to_id.get(edge_type) {
976 let edge_ids: Vec<EdgeId> = self
977 .edges
978 .read()
979 .iter()
980 .filter_map(|(id, chain)| {
981 chain.visible_at(epoch).and_then(|r| {
982 if !r.is_deleted() && r.type_id == type_id {
983 Some(*id)
984 } else {
985 None
986 }
987 })
988 })
989 .collect();
990
991 Box::new(edge_ids.into_iter().filter_map(move |id| self.get_edge(id)))
993 as Box<dyn Iterator<Item = Edge> + 'a>
994 } else {
995 Box::new(std::iter::empty()) as Box<dyn Iterator<Item = Edge> + 'a>
997 }
998 }
999
1000 #[must_use]
1007 pub fn node_property_might_match(
1008 &self,
1009 property: &PropertyKey,
1010 op: CompareOp,
1011 value: &Value,
1012 ) -> bool {
1013 self.node_properties.might_match(property, op, value)
1014 }
1015
1016 #[must_use]
1018 pub fn edge_property_might_match(
1019 &self,
1020 property: &PropertyKey,
1021 op: CompareOp,
1022 value: &Value,
1023 ) -> bool {
1024 self.edge_properties.might_match(property, op, value)
1025 }
1026
1027 #[must_use]
1029 pub fn node_property_zone_map(&self, property: &PropertyKey) -> Option<ZoneMapEntry> {
1030 self.node_properties.zone_map(property)
1031 }
1032
1033 #[must_use]
1035 pub fn edge_property_zone_map(&self, property: &PropertyKey) -> Option<ZoneMapEntry> {
1036 self.edge_properties.zone_map(property)
1037 }
1038
1039 pub fn rebuild_zone_maps(&self) {
1041 self.node_properties.rebuild_zone_maps();
1042 self.edge_properties.rebuild_zone_maps();
1043 }
1044
1045 #[must_use]
1049 pub fn statistics(&self) -> Statistics {
1050 self.statistics.read().clone()
1051 }
1052
1053 pub fn compute_statistics(&self) {
1058 let mut stats = Statistics::new();
1059
1060 stats.total_nodes = self.node_count() as u64;
1062 stats.total_edges = self.edge_count() as u64;
1063
1064 let id_to_label = self.id_to_label.read();
1066 let label_index = self.label_index.read();
1067
1068 for (label_id, label_name) in id_to_label.iter().enumerate() {
1069 let node_count = label_index
1070 .get(label_id)
1071 .map(|set| set.len() as u64)
1072 .unwrap_or(0);
1073
1074 if node_count > 0 {
1075 let avg_out_degree = if stats.total_nodes > 0 {
1077 stats.total_edges as f64 / stats.total_nodes as f64
1078 } else {
1079 0.0
1080 };
1081
1082 let label_stats =
1083 LabelStatistics::new(node_count).with_degrees(avg_out_degree, avg_out_degree);
1084
1085 stats.update_label(label_name.as_ref(), label_stats);
1086 }
1087 }
1088
1089 let id_to_edge_type = self.id_to_edge_type.read();
1091 let edges = self.edges.read();
1092 let epoch = self.current_epoch();
1093
1094 let mut edge_type_counts: FxHashMap<u32, u64> = FxHashMap::default();
1095 for chain in edges.values() {
1096 if let Some(record) = chain.visible_at(epoch) {
1097 if !record.is_deleted() {
1098 *edge_type_counts.entry(record.type_id).or_default() += 1;
1099 }
1100 }
1101 }
1102
1103 for (type_id, count) in edge_type_counts {
1104 if let Some(type_name) = id_to_edge_type.get(type_id as usize) {
1105 let avg_degree = if stats.total_nodes > 0 {
1106 count as f64 / stats.total_nodes as f64
1107 } else {
1108 0.0
1109 };
1110
1111 let edge_stats = EdgeTypeStatistics::new(count, avg_degree, avg_degree);
1112 stats.update_edge_type(type_name.as_ref(), edge_stats);
1113 }
1114 }
1115
1116 *self.statistics.write() = stats;
1117 }
1118
1119 #[must_use]
1121 pub fn estimate_label_cardinality(&self, label: &str) -> f64 {
1122 self.statistics.read().estimate_label_cardinality(label)
1123 }
1124
1125 #[must_use]
1127 pub fn estimate_avg_degree(&self, edge_type: &str, outgoing: bool) -> f64 {
1128 self.statistics
1129 .read()
1130 .estimate_avg_degree(edge_type, outgoing)
1131 }
1132
1133 fn get_or_create_label_id(&self, label: &str) -> u32 {
1136 {
1137 let label_to_id = self.label_to_id.read();
1138 if let Some(&id) = label_to_id.get(label) {
1139 return id;
1140 }
1141 }
1142
1143 let mut label_to_id = self.label_to_id.write();
1144 let mut id_to_label = self.id_to_label.write();
1145
1146 if let Some(&id) = label_to_id.get(label) {
1148 return id;
1149 }
1150
1151 let id = id_to_label.len() as u32;
1152
1153 let label: Arc<str> = label.into();
1154 label_to_id.insert(label.clone(), id);
1155 id_to_label.push(label);
1156
1157 id
1158 }
1159
1160 fn get_or_create_edge_type_id(&self, edge_type: &str) -> u32 {
1161 {
1162 let type_to_id = self.edge_type_to_id.read();
1163 if let Some(&id) = type_to_id.get(edge_type) {
1164 return id;
1165 }
1166 }
1167
1168 let mut type_to_id = self.edge_type_to_id.write();
1169 let mut id_to_type = self.id_to_edge_type.write();
1170
1171 if let Some(&id) = type_to_id.get(edge_type) {
1173 return id;
1174 }
1175
1176 let id = id_to_type.len() as u32;
1177 let edge_type: Arc<str> = edge_type.into();
1178 type_to_id.insert(edge_type.clone(), id);
1179 id_to_type.push(edge_type);
1180
1181 id
1182 }
1183
1184 pub fn create_node_with_id(&self, id: NodeId, labels: &[&str]) {
1191 let epoch = self.current_epoch();
1192 let mut record = NodeRecord::new(id, epoch);
1193 record.set_label_count(labels.len() as u16);
1194
1195 let mut node_label_set = FxHashSet::default();
1197 for label in labels {
1198 let label_id = self.get_or_create_label_id(*label);
1199 node_label_set.insert(label_id);
1200
1201 let mut index = self.label_index.write();
1203 while index.len() <= label_id as usize {
1204 index.push(FxHashMap::default());
1205 }
1206 index[label_id as usize].insert(id, ());
1207 }
1208
1209 self.node_labels.write().insert(id, node_label_set);
1211
1212 let chain = VersionChain::with_initial(record, epoch, TxId::SYSTEM);
1214 self.nodes.write().insert(id, chain);
1215
1216 let id_val = id.as_u64();
1218 let _ = self
1219 .next_node_id
1220 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
1221 if id_val >= current {
1222 Some(id_val + 1)
1223 } else {
1224 None
1225 }
1226 });
1227 }
1228
1229 pub fn create_edge_with_id(&self, id: EdgeId, src: NodeId, dst: NodeId, edge_type: &str) {
1233 let epoch = self.current_epoch();
1234 let type_id = self.get_or_create_edge_type_id(edge_type);
1235
1236 let record = EdgeRecord::new(id, src, dst, type_id, epoch);
1237 let chain = VersionChain::with_initial(record, epoch, TxId::SYSTEM);
1238 self.edges.write().insert(id, chain);
1239
1240 self.forward_adj.add_edge(src, dst, id);
1242 if let Some(ref backward) = self.backward_adj {
1243 backward.add_edge(dst, src, id);
1244 }
1245
1246 let id_val = id.as_u64();
1248 let _ = self
1249 .next_edge_id
1250 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
1251 if id_val >= current {
1252 Some(id_val + 1)
1253 } else {
1254 None
1255 }
1256 });
1257 }
1258
1259 pub fn set_epoch(&self, epoch: EpochId) {
1261 self.current_epoch.store(epoch.as_u64(), Ordering::SeqCst);
1262 }
1263}
1264
1265impl Default for LpgStore {
1266 fn default() -> Self {
1267 Self::new()
1268 }
1269}
1270
1271#[cfg(test)]
1272mod tests {
1273 use super::*;
1274
1275 #[test]
1276 fn test_create_node() {
1277 let store = LpgStore::new();
1278
1279 let id = store.create_node(&["Person"]);
1280 assert!(id.is_valid());
1281
1282 let node = store.get_node(id).unwrap();
1283 assert!(node.has_label("Person"));
1284 assert!(!node.has_label("Animal"));
1285 }
1286
1287 #[test]
1288 fn test_create_node_with_props() {
1289 let store = LpgStore::new();
1290
1291 let id = store.create_node_with_props(
1292 &["Person"],
1293 [("name", Value::from("Alice")), ("age", Value::from(30i64))],
1294 );
1295
1296 let node = store.get_node(id).unwrap();
1297 assert_eq!(
1298 node.get_property("name").and_then(|v| v.as_str()),
1299 Some("Alice")
1300 );
1301 assert_eq!(
1302 node.get_property("age").and_then(|v| v.as_int64()),
1303 Some(30)
1304 );
1305 }
1306
1307 #[test]
1308 fn test_delete_node() {
1309 let store = LpgStore::new();
1310
1311 let id = store.create_node(&["Person"]);
1312 assert_eq!(store.node_count(), 1);
1313
1314 assert!(store.delete_node(id));
1315 assert_eq!(store.node_count(), 0);
1316 assert!(store.get_node(id).is_none());
1317
1318 assert!(!store.delete_node(id));
1320 }
1321
1322 #[test]
1323 fn test_create_edge() {
1324 let store = LpgStore::new();
1325
1326 let alice = store.create_node(&["Person"]);
1327 let bob = store.create_node(&["Person"]);
1328
1329 let edge_id = store.create_edge(alice, bob, "KNOWS");
1330 assert!(edge_id.is_valid());
1331
1332 let edge = store.get_edge(edge_id).unwrap();
1333 assert_eq!(edge.src, alice);
1334 assert_eq!(edge.dst, bob);
1335 assert_eq!(edge.edge_type.as_ref(), "KNOWS");
1336 }
1337
1338 #[test]
1339 fn test_neighbors() {
1340 let store = LpgStore::new();
1341
1342 let a = store.create_node(&["Person"]);
1343 let b = store.create_node(&["Person"]);
1344 let c = store.create_node(&["Person"]);
1345
1346 store.create_edge(a, b, "KNOWS");
1347 store.create_edge(a, c, "KNOWS");
1348
1349 let outgoing: Vec<_> = store.neighbors(a, Direction::Outgoing).collect();
1350 assert_eq!(outgoing.len(), 2);
1351 assert!(outgoing.contains(&b));
1352 assert!(outgoing.contains(&c));
1353
1354 let incoming: Vec<_> = store.neighbors(b, Direction::Incoming).collect();
1355 assert_eq!(incoming.len(), 1);
1356 assert!(incoming.contains(&a));
1357 }
1358
1359 #[test]
1360 fn test_nodes_by_label() {
1361 let store = LpgStore::new();
1362
1363 let p1 = store.create_node(&["Person"]);
1364 let p2 = store.create_node(&["Person"]);
1365 let _a = store.create_node(&["Animal"]);
1366
1367 let persons = store.nodes_by_label("Person");
1368 assert_eq!(persons.len(), 2);
1369 assert!(persons.contains(&p1));
1370 assert!(persons.contains(&p2));
1371
1372 let animals = store.nodes_by_label("Animal");
1373 assert_eq!(animals.len(), 1);
1374 }
1375
1376 #[test]
1377 fn test_delete_edge() {
1378 let store = LpgStore::new();
1379
1380 let a = store.create_node(&["Person"]);
1381 let b = store.create_node(&["Person"]);
1382 let edge_id = store.create_edge(a, b, "KNOWS");
1383
1384 assert_eq!(store.edge_count(), 1);
1385
1386 assert!(store.delete_edge(edge_id));
1387 assert_eq!(store.edge_count(), 0);
1388 assert!(store.get_edge(edge_id).is_none());
1389 }
1390}