1use super::property::CompareOp;
13use super::{Edge, EdgeRecord, Node, NodeRecord, PropertyStorage};
14use crate::graph::Direction;
15use crate::index::adjacency::ChunkedAdjacency;
16use crate::index::zone_map::ZoneMapEntry;
17use crate::statistics::{EdgeTypeStatistics, LabelStatistics, Statistics};
18use grafeo_common::mvcc::VersionChain;
19use grafeo_common::types::{EdgeId, EpochId, NodeId, PropertyKey, TxId, Value};
20use grafeo_common::utils::hash::{FxHashMap, FxHashSet};
21use parking_lot::RwLock;
22use std::sync::Arc;
23use std::sync::atomic::{AtomicU64, Ordering};
24
25#[derive(Debug, Clone)]
31pub struct LpgStoreConfig {
32 pub backward_edges: bool,
35 pub initial_node_capacity: usize,
37 pub initial_edge_capacity: usize,
39}
40
41impl Default for LpgStoreConfig {
42 fn default() -> Self {
43 Self {
44 backward_edges: true,
45 initial_node_capacity: 1024,
46 initial_edge_capacity: 4096,
47 }
48 }
49}
50
51pub struct LpgStore {
79 #[allow(dead_code)]
81 config: LpgStoreConfig,
82
83 nodes: RwLock<FxHashMap<NodeId, VersionChain<NodeRecord>>>,
85
86 edges: RwLock<FxHashMap<EdgeId, VersionChain<EdgeRecord>>>,
88
89 node_properties: PropertyStorage<NodeId>,
91
92 edge_properties: PropertyStorage<EdgeId>,
94
95 label_to_id: RwLock<FxHashMap<Arc<str>, u32>>,
97
98 id_to_label: RwLock<Vec<Arc<str>>>,
100
101 edge_type_to_id: RwLock<FxHashMap<Arc<str>, u32>>,
103
104 id_to_edge_type: RwLock<Vec<Arc<str>>>,
106
107 forward_adj: ChunkedAdjacency,
109
110 backward_adj: Option<ChunkedAdjacency>,
113
114 label_index: RwLock<Vec<FxHashMap<NodeId, ()>>>,
116
117 node_labels: RwLock<FxHashMap<NodeId, FxHashSet<u32>>>,
120
121 next_node_id: AtomicU64,
123
124 next_edge_id: AtomicU64,
126
127 current_epoch: AtomicU64,
129
130 statistics: RwLock<Statistics>,
132}
133
134impl LpgStore {
135 #[must_use]
137 pub fn new() -> Self {
138 Self::with_config(LpgStoreConfig::default())
139 }
140
141 #[must_use]
143 pub fn with_config(config: LpgStoreConfig) -> Self {
144 let backward_adj = if config.backward_edges {
145 Some(ChunkedAdjacency::new())
146 } else {
147 None
148 };
149
150 Self {
151 nodes: RwLock::new(FxHashMap::default()),
152 edges: RwLock::new(FxHashMap::default()),
153 node_properties: PropertyStorage::new(),
154 edge_properties: PropertyStorage::new(),
155 label_to_id: RwLock::new(FxHashMap::default()),
156 id_to_label: RwLock::new(Vec::new()),
157 edge_type_to_id: RwLock::new(FxHashMap::default()),
158 id_to_edge_type: RwLock::new(Vec::new()),
159 forward_adj: ChunkedAdjacency::new(),
160 backward_adj,
161 label_index: RwLock::new(Vec::new()),
162 node_labels: RwLock::new(FxHashMap::default()),
163 next_node_id: AtomicU64::new(0),
164 next_edge_id: AtomicU64::new(0),
165 current_epoch: AtomicU64::new(0),
166 statistics: RwLock::new(Statistics::new()),
167 config,
168 }
169 }
170
171 #[must_use]
173 pub fn current_epoch(&self) -> EpochId {
174 EpochId::new(self.current_epoch.load(Ordering::Acquire))
175 }
176
177 pub fn new_epoch(&self) -> EpochId {
179 let id = self.current_epoch.fetch_add(1, Ordering::AcqRel) + 1;
180 EpochId::new(id)
181 }
182
183 pub fn create_node(&self, labels: &[&str]) -> NodeId {
189 self.create_node_versioned(labels, self.current_epoch(), TxId::SYSTEM)
190 }
191
192 pub fn create_node_versioned(&self, labels: &[&str], epoch: EpochId, tx_id: TxId) -> NodeId {
194 let id = NodeId::new(self.next_node_id.fetch_add(1, Ordering::Relaxed));
195
196 let mut record = NodeRecord::new(id, epoch);
197 record.set_label_count(labels.len() as u16);
198
199 let mut node_label_set = FxHashSet::default();
201 for label in labels {
202 let label_id = self.get_or_create_label_id(*label);
203 node_label_set.insert(label_id);
204
205 let mut index = self.label_index.write();
207 while index.len() <= label_id as usize {
208 index.push(FxHashMap::default());
209 }
210 index[label_id as usize].insert(id, ());
211 }
212
213 self.node_labels.write().insert(id, node_label_set);
215
216 let chain = VersionChain::with_initial(record, epoch, tx_id);
218 self.nodes.write().insert(id, chain);
219 id
220 }
221
222 pub fn create_node_with_props(
224 &self,
225 labels: &[&str],
226 properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
227 ) -> NodeId {
228 self.create_node_with_props_versioned(
229 labels,
230 properties,
231 self.current_epoch(),
232 TxId::SYSTEM,
233 )
234 }
235
236 pub fn create_node_with_props_versioned(
238 &self,
239 labels: &[&str],
240 properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
241 epoch: EpochId,
242 tx_id: TxId,
243 ) -> NodeId {
244 let id = self.create_node_versioned(labels, epoch, tx_id);
245
246 for (key, value) in properties {
247 self.node_properties.set(id, key.into(), value.into());
248 }
249
250 let count = self.node_properties.get_all(id).len() as u16;
252 if let Some(chain) = self.nodes.write().get_mut(&id) {
253 if let Some(record) = chain.latest_mut() {
254 record.props_count = count;
255 }
256 }
257
258 id
259 }
260
261 #[must_use]
263 pub fn get_node(&self, id: NodeId) -> Option<Node> {
264 self.get_node_at_epoch(id, self.current_epoch())
265 }
266
267 #[must_use]
269 pub fn get_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> Option<Node> {
270 let nodes = self.nodes.read();
271 let chain = nodes.get(&id)?;
272 let record = chain.visible_at(epoch)?;
273
274 if record.is_deleted() {
275 return None;
276 }
277
278 let mut node = Node::new(id);
279
280 let id_to_label = self.id_to_label.read();
282 let node_labels = self.node_labels.read();
283 if let Some(label_ids) = node_labels.get(&id) {
284 for &label_id in label_ids {
285 if let Some(label) = id_to_label.get(label_id as usize) {
286 node.labels.push(label.clone());
287 }
288 }
289 }
290
291 node.properties = self.node_properties.get_all(id).into_iter().collect();
293
294 Some(node)
295 }
296
297 #[must_use]
299 pub fn get_node_versioned(&self, id: NodeId, epoch: EpochId, tx_id: TxId) -> Option<Node> {
300 let nodes = self.nodes.read();
301 let chain = nodes.get(&id)?;
302 let record = chain.visible_to(epoch, tx_id)?;
303
304 if record.is_deleted() {
305 return None;
306 }
307
308 let mut node = Node::new(id);
309
310 let id_to_label = self.id_to_label.read();
312 let node_labels = self.node_labels.read();
313 if let Some(label_ids) = node_labels.get(&id) {
314 for &label_id in label_ids {
315 if let Some(label) = id_to_label.get(label_id as usize) {
316 node.labels.push(label.clone());
317 }
318 }
319 }
320
321 node.properties = self.node_properties.get_all(id).into_iter().collect();
323
324 Some(node)
325 }
326
327 pub fn delete_node(&self, id: NodeId) -> bool {
329 self.delete_node_at_epoch(id, self.current_epoch())
330 }
331
332 pub fn delete_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> bool {
334 let mut nodes = self.nodes.write();
335 if let Some(chain) = nodes.get_mut(&id) {
336 if let Some(record) = chain.visible_at(epoch) {
338 if record.is_deleted() {
339 return false;
340 }
341 } else {
342 return false;
344 }
345
346 chain.mark_deleted(epoch);
348
349 let mut index = self.label_index.write();
351 let mut node_labels = self.node_labels.write();
352 if let Some(label_ids) = node_labels.remove(&id) {
353 for label_id in label_ids {
354 if let Some(set) = index.get_mut(label_id as usize) {
355 set.remove(&id);
356 }
357 }
358 }
359
360 drop(nodes); drop(index);
363 drop(node_labels);
364 self.node_properties.remove_all(id);
365
366 true
369 } else {
370 false
371 }
372 }
373
374 pub fn delete_node_edges(&self, node_id: NodeId) {
379 let outgoing: Vec<EdgeId> = self
381 .forward_adj
382 .edges_from(node_id)
383 .into_iter()
384 .map(|(_, edge_id)| edge_id)
385 .collect();
386
387 let incoming: Vec<EdgeId> = if let Some(ref backward) = self.backward_adj {
389 backward
390 .edges_from(node_id)
391 .into_iter()
392 .map(|(_, edge_id)| edge_id)
393 .collect()
394 } else {
395 let epoch = self.current_epoch();
397 self.edges
398 .read()
399 .iter()
400 .filter_map(|(id, chain)| {
401 chain.visible_at(epoch).and_then(|r| {
402 if !r.is_deleted() && r.dst == node_id {
403 Some(*id)
404 } else {
405 None
406 }
407 })
408 })
409 .collect()
410 };
411
412 for edge_id in outgoing.into_iter().chain(incoming) {
414 self.delete_edge(edge_id);
415 }
416 }
417
418 pub fn set_node_property(&self, id: NodeId, key: &str, value: Value) {
420 self.node_properties.set(id, key.into(), value);
421
422 let count = self.node_properties.get_all(id).len() as u16;
424 if let Some(chain) = self.nodes.write().get_mut(&id) {
425 if let Some(record) = chain.latest_mut() {
426 record.props_count = count;
427 }
428 }
429 }
430
431 pub fn set_edge_property(&self, id: EdgeId, key: &str, value: Value) {
433 self.edge_properties.set(id, key.into(), value);
434 }
435
436 pub fn remove_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
440 let result = self.node_properties.remove(id, &key.into());
441
442 let count = self.node_properties.get_all(id).len() as u16;
444 if let Some(chain) = self.nodes.write().get_mut(&id) {
445 if let Some(record) = chain.latest_mut() {
446 record.props_count = count;
447 }
448 }
449
450 result
451 }
452
453 pub fn remove_edge_property(&self, id: EdgeId, key: &str) -> Option<Value> {
457 self.edge_properties.remove(id, &key.into())
458 }
459
460 pub fn add_label(&self, node_id: NodeId, label: &str) -> bool {
465 let epoch = self.current_epoch();
466
467 let nodes = self.nodes.read();
469 if let Some(chain) = nodes.get(&node_id) {
470 if chain.visible_at(epoch).map_or(true, |r| r.is_deleted()) {
471 return false;
472 }
473 } else {
474 return false;
475 }
476 drop(nodes);
477
478 let label_id = self.get_or_create_label_id(label);
480
481 let mut node_labels = self.node_labels.write();
483 let label_set = node_labels
484 .entry(node_id)
485 .or_insert_with(FxHashSet::default);
486
487 if label_set.contains(&label_id) {
488 return false; }
490
491 label_set.insert(label_id);
492 drop(node_labels);
493
494 let mut index = self.label_index.write();
496 if (label_id as usize) >= index.len() {
497 index.resize(label_id as usize + 1, FxHashMap::default());
498 }
499 index[label_id as usize].insert(node_id, ());
500
501 if let Some(chain) = self.nodes.write().get_mut(&node_id) {
503 if let Some(record) = chain.latest_mut() {
504 let count = self.node_labels.read().get(&node_id).map_or(0, |s| s.len());
505 record.set_label_count(count as u16);
506 }
507 }
508
509 true
510 }
511
512 pub fn remove_label(&self, node_id: NodeId, label: &str) -> bool {
517 let epoch = self.current_epoch();
518
519 let nodes = self.nodes.read();
521 if let Some(chain) = nodes.get(&node_id) {
522 if chain.visible_at(epoch).map_or(true, |r| r.is_deleted()) {
523 return false;
524 }
525 } else {
526 return false;
527 }
528 drop(nodes);
529
530 let label_id = {
532 let label_ids = self.label_to_id.read();
533 match label_ids.get(label) {
534 Some(&id) => id,
535 None => return false, }
537 };
538
539 let mut node_labels = self.node_labels.write();
541 if let Some(label_set) = node_labels.get_mut(&node_id) {
542 if !label_set.remove(&label_id) {
543 return false; }
545 } else {
546 return false;
547 }
548 drop(node_labels);
549
550 let mut index = self.label_index.write();
552 if (label_id as usize) < index.len() {
553 index[label_id as usize].remove(&node_id);
554 }
555
556 if let Some(chain) = self.nodes.write().get_mut(&node_id) {
558 if let Some(record) = chain.latest_mut() {
559 let count = self.node_labels.read().get(&node_id).map_or(0, |s| s.len());
560 record.set_label_count(count as u16);
561 }
562 }
563
564 true
565 }
566
567 #[must_use]
569 pub fn node_count(&self) -> usize {
570 let epoch = self.current_epoch();
571 self.nodes
572 .read()
573 .values()
574 .filter_map(|chain| chain.visible_at(epoch))
575 .filter(|r| !r.is_deleted())
576 .count()
577 }
578
579 #[must_use]
585 pub fn node_ids(&self) -> Vec<NodeId> {
586 let epoch = self.current_epoch();
587 let mut ids: Vec<NodeId> = self
588 .nodes
589 .read()
590 .iter()
591 .filter_map(|(id, chain)| {
592 chain
593 .visible_at(epoch)
594 .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
595 })
596 .collect();
597 ids.sort_unstable();
598 ids
599 }
600
601 pub fn create_edge(&self, src: NodeId, dst: NodeId, edge_type: &str) -> EdgeId {
605 self.create_edge_versioned(src, dst, edge_type, self.current_epoch(), TxId::SYSTEM)
606 }
607
608 pub fn create_edge_versioned(
610 &self,
611 src: NodeId,
612 dst: NodeId,
613 edge_type: &str,
614 epoch: EpochId,
615 tx_id: TxId,
616 ) -> EdgeId {
617 let id = EdgeId::new(self.next_edge_id.fetch_add(1, Ordering::Relaxed));
618 let type_id = self.get_or_create_edge_type_id(edge_type);
619
620 let record = EdgeRecord::new(id, src, dst, type_id, epoch);
621 let chain = VersionChain::with_initial(record, epoch, tx_id);
622 self.edges.write().insert(id, chain);
623
624 self.forward_adj.add_edge(src, dst, id);
626 if let Some(ref backward) = self.backward_adj {
627 backward.add_edge(dst, src, id);
628 }
629
630 id
631 }
632
633 pub fn create_edge_with_props(
635 &self,
636 src: NodeId,
637 dst: NodeId,
638 edge_type: &str,
639 properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
640 ) -> EdgeId {
641 let id = self.create_edge(src, dst, edge_type);
642
643 for (key, value) in properties {
644 self.edge_properties.set(id, key.into(), value.into());
645 }
646
647 id
648 }
649
650 #[must_use]
652 pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
653 self.get_edge_at_epoch(id, self.current_epoch())
654 }
655
656 #[must_use]
658 pub fn get_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> Option<Edge> {
659 let edges = self.edges.read();
660 let chain = edges.get(&id)?;
661 let record = chain.visible_at(epoch)?;
662
663 if record.is_deleted() {
664 return None;
665 }
666
667 let edge_type = {
668 let id_to_type = self.id_to_edge_type.read();
669 id_to_type.get(record.type_id as usize)?.clone()
670 };
671
672 let mut edge = Edge::new(id, record.src, record.dst, edge_type);
673
674 edge.properties = self.edge_properties.get_all(id).into_iter().collect();
676
677 Some(edge)
678 }
679
680 #[must_use]
682 pub fn get_edge_versioned(&self, id: EdgeId, epoch: EpochId, tx_id: TxId) -> Option<Edge> {
683 let edges = self.edges.read();
684 let chain = edges.get(&id)?;
685 let record = chain.visible_to(epoch, tx_id)?;
686
687 if record.is_deleted() {
688 return None;
689 }
690
691 let edge_type = {
692 let id_to_type = self.id_to_edge_type.read();
693 id_to_type.get(record.type_id as usize)?.clone()
694 };
695
696 let mut edge = Edge::new(id, record.src, record.dst, edge_type);
697
698 edge.properties = self.edge_properties.get_all(id).into_iter().collect();
700
701 Some(edge)
702 }
703
704 pub fn delete_edge(&self, id: EdgeId) -> bool {
706 self.delete_edge_at_epoch(id, self.current_epoch())
707 }
708
709 pub fn delete_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> bool {
711 let mut edges = self.edges.write();
712 if let Some(chain) = edges.get_mut(&id) {
713 let (src, dst) = {
715 match chain.visible_at(epoch) {
716 Some(record) => {
717 if record.is_deleted() {
718 return false;
719 }
720 (record.src, record.dst)
721 }
722 None => return false, }
724 };
725
726 chain.mark_deleted(epoch);
728
729 drop(edges); self.forward_adj.mark_deleted(src, id);
733 if let Some(ref backward) = self.backward_adj {
734 backward.mark_deleted(dst, id);
735 }
736
737 self.edge_properties.remove_all(id);
739
740 true
741 } else {
742 false
743 }
744 }
745
746 #[must_use]
748 pub fn edge_count(&self) -> usize {
749 let epoch = self.current_epoch();
750 self.edges
751 .read()
752 .values()
753 .filter_map(|chain| chain.visible_at(epoch))
754 .filter(|r| !r.is_deleted())
755 .count()
756 }
757
758 pub fn discard_uncommitted_versions(&self, tx_id: TxId) {
763 {
765 let mut nodes = self.nodes.write();
766 for chain in nodes.values_mut() {
767 chain.remove_versions_by(tx_id);
768 }
769 nodes.retain(|_, chain| !chain.is_empty());
771 }
772
773 {
775 let mut edges = self.edges.write();
776 for chain in edges.values_mut() {
777 chain.remove_versions_by(tx_id);
778 }
779 edges.retain(|_, chain| !chain.is_empty());
781 }
782 }
783
784 #[must_use]
786 pub fn label_count(&self) -> usize {
787 self.id_to_label.read().len()
788 }
789
790 #[must_use]
794 pub fn property_key_count(&self) -> usize {
795 let node_keys = self.node_properties.column_count();
796 let edge_keys = self.edge_properties.column_count();
797 node_keys + edge_keys
801 }
802
803 #[must_use]
805 pub fn edge_type_count(&self) -> usize {
806 self.id_to_edge_type.read().len()
807 }
808
809 pub fn neighbors(
816 &self,
817 node: NodeId,
818 direction: Direction,
819 ) -> impl Iterator<Item = NodeId> + '_ {
820 let forward: Box<dyn Iterator<Item = NodeId>> = match direction {
821 Direction::Outgoing | Direction::Both => {
822 Box::new(self.forward_adj.neighbors(node).into_iter())
823 }
824 Direction::Incoming => Box::new(std::iter::empty()),
825 };
826
827 let backward: Box<dyn Iterator<Item = NodeId>> = match direction {
828 Direction::Incoming | Direction::Both => {
829 if let Some(ref adj) = self.backward_adj {
830 Box::new(adj.neighbors(node).into_iter())
831 } else {
832 Box::new(std::iter::empty())
833 }
834 }
835 Direction::Outgoing => Box::new(std::iter::empty()),
836 };
837
838 forward.chain(backward)
839 }
840
841 pub fn edges_from(
845 &self,
846 node: NodeId,
847 direction: Direction,
848 ) -> impl Iterator<Item = (NodeId, EdgeId)> + '_ {
849 let forward: Box<dyn Iterator<Item = (NodeId, EdgeId)>> = match direction {
850 Direction::Outgoing | Direction::Both => {
851 Box::new(self.forward_adj.edges_from(node).into_iter())
852 }
853 Direction::Incoming => Box::new(std::iter::empty()),
854 };
855
856 let backward: Box<dyn Iterator<Item = (NodeId, EdgeId)>> = match direction {
857 Direction::Incoming | Direction::Both => {
858 if let Some(ref adj) = self.backward_adj {
859 Box::new(adj.edges_from(node).into_iter())
860 } else {
861 Box::new(std::iter::empty())
862 }
863 }
864 Direction::Outgoing => Box::new(std::iter::empty()),
865 };
866
867 forward.chain(backward)
868 }
869
870 #[must_use]
872 pub fn edge_type(&self, id: EdgeId) -> Option<Arc<str>> {
873 let edges = self.edges.read();
874 let chain = edges.get(&id)?;
875 let epoch = self.current_epoch();
876 let record = chain.visible_at(epoch)?;
877 let id_to_type = self.id_to_edge_type.read();
878 id_to_type.get(record.type_id as usize).cloned()
879 }
880
881 pub fn nodes_by_label(&self, label: &str) -> Vec<NodeId> {
887 let label_to_id = self.label_to_id.read();
888 if let Some(&label_id) = label_to_id.get(label) {
889 let index = self.label_index.read();
890 if let Some(set) = index.get(label_id as usize) {
891 let mut ids: Vec<NodeId> = set.keys().copied().collect();
892 ids.sort_unstable();
893 return ids;
894 }
895 }
896 Vec::new()
897 }
898
899 pub fn all_nodes(&self) -> impl Iterator<Item = Node> + '_ {
906 let epoch = self.current_epoch();
907 let node_ids: Vec<NodeId> = self
908 .nodes
909 .read()
910 .iter()
911 .filter_map(|(id, chain)| {
912 chain
913 .visible_at(epoch)
914 .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
915 })
916 .collect();
917
918 node_ids.into_iter().filter_map(move |id| self.get_node(id))
919 }
920
921 pub fn all_edges(&self) -> impl Iterator<Item = Edge> + '_ {
926 let epoch = self.current_epoch();
927 let edge_ids: Vec<EdgeId> = self
928 .edges
929 .read()
930 .iter()
931 .filter_map(|(id, chain)| {
932 chain
933 .visible_at(epoch)
934 .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
935 })
936 .collect();
937
938 edge_ids.into_iter().filter_map(move |id| self.get_edge(id))
939 }
940
941 pub fn all_labels(&self) -> Vec<String> {
943 self.id_to_label
944 .read()
945 .iter()
946 .map(|s| s.to_string())
947 .collect()
948 }
949
950 pub fn all_edge_types(&self) -> Vec<String> {
952 self.id_to_edge_type
953 .read()
954 .iter()
955 .map(|s| s.to_string())
956 .collect()
957 }
958
959 pub fn all_property_keys(&self) -> Vec<String> {
961 let mut keys = std::collections::HashSet::new();
962 for key in self.node_properties.keys() {
963 keys.insert(key.to_string());
964 }
965 for key in self.edge_properties.keys() {
966 keys.insert(key.to_string());
967 }
968 keys.into_iter().collect()
969 }
970
971 pub fn nodes_with_label<'a>(&'a self, label: &str) -> impl Iterator<Item = Node> + 'a {
973 let node_ids = self.nodes_by_label(label);
974 node_ids.into_iter().filter_map(move |id| self.get_node(id))
975 }
976
977 pub fn edges_with_type<'a>(&'a self, edge_type: &str) -> impl Iterator<Item = Edge> + 'a {
979 let epoch = self.current_epoch();
980 let type_to_id = self.edge_type_to_id.read();
981
982 if let Some(&type_id) = type_to_id.get(edge_type) {
983 let edge_ids: Vec<EdgeId> = self
984 .edges
985 .read()
986 .iter()
987 .filter_map(|(id, chain)| {
988 chain.visible_at(epoch).and_then(|r| {
989 if !r.is_deleted() && r.type_id == type_id {
990 Some(*id)
991 } else {
992 None
993 }
994 })
995 })
996 .collect();
997
998 Box::new(edge_ids.into_iter().filter_map(move |id| self.get_edge(id)))
1000 as Box<dyn Iterator<Item = Edge> + 'a>
1001 } else {
1002 Box::new(std::iter::empty()) as Box<dyn Iterator<Item = Edge> + 'a>
1004 }
1005 }
1006
1007 #[must_use]
1014 pub fn node_property_might_match(
1015 &self,
1016 property: &PropertyKey,
1017 op: CompareOp,
1018 value: &Value,
1019 ) -> bool {
1020 self.node_properties.might_match(property, op, value)
1021 }
1022
1023 #[must_use]
1025 pub fn edge_property_might_match(
1026 &self,
1027 property: &PropertyKey,
1028 op: CompareOp,
1029 value: &Value,
1030 ) -> bool {
1031 self.edge_properties.might_match(property, op, value)
1032 }
1033
1034 #[must_use]
1036 pub fn node_property_zone_map(&self, property: &PropertyKey) -> Option<ZoneMapEntry> {
1037 self.node_properties.zone_map(property)
1038 }
1039
1040 #[must_use]
1042 pub fn edge_property_zone_map(&self, property: &PropertyKey) -> Option<ZoneMapEntry> {
1043 self.edge_properties.zone_map(property)
1044 }
1045
1046 pub fn rebuild_zone_maps(&self) {
1048 self.node_properties.rebuild_zone_maps();
1049 self.edge_properties.rebuild_zone_maps();
1050 }
1051
1052 #[must_use]
1056 pub fn statistics(&self) -> Statistics {
1057 self.statistics.read().clone()
1058 }
1059
1060 pub fn compute_statistics(&self) {
1065 let mut stats = Statistics::new();
1066
1067 stats.total_nodes = self.node_count() as u64;
1069 stats.total_edges = self.edge_count() as u64;
1070
1071 let id_to_label = self.id_to_label.read();
1073 let label_index = self.label_index.read();
1074
1075 for (label_id, label_name) in id_to_label.iter().enumerate() {
1076 let node_count = label_index
1077 .get(label_id)
1078 .map(|set| set.len() as u64)
1079 .unwrap_or(0);
1080
1081 if node_count > 0 {
1082 let avg_out_degree = if stats.total_nodes > 0 {
1084 stats.total_edges as f64 / stats.total_nodes as f64
1085 } else {
1086 0.0
1087 };
1088
1089 let label_stats =
1090 LabelStatistics::new(node_count).with_degrees(avg_out_degree, avg_out_degree);
1091
1092 stats.update_label(label_name.as_ref(), label_stats);
1093 }
1094 }
1095
1096 let id_to_edge_type = self.id_to_edge_type.read();
1098 let edges = self.edges.read();
1099 let epoch = self.current_epoch();
1100
1101 let mut edge_type_counts: FxHashMap<u32, u64> = FxHashMap::default();
1102 for chain in edges.values() {
1103 if let Some(record) = chain.visible_at(epoch) {
1104 if !record.is_deleted() {
1105 *edge_type_counts.entry(record.type_id).or_default() += 1;
1106 }
1107 }
1108 }
1109
1110 for (type_id, count) in edge_type_counts {
1111 if let Some(type_name) = id_to_edge_type.get(type_id as usize) {
1112 let avg_degree = if stats.total_nodes > 0 {
1113 count as f64 / stats.total_nodes as f64
1114 } else {
1115 0.0
1116 };
1117
1118 let edge_stats = EdgeTypeStatistics::new(count, avg_degree, avg_degree);
1119 stats.update_edge_type(type_name.as_ref(), edge_stats);
1120 }
1121 }
1122
1123 *self.statistics.write() = stats;
1124 }
1125
1126 #[must_use]
1128 pub fn estimate_label_cardinality(&self, label: &str) -> f64 {
1129 self.statistics.read().estimate_label_cardinality(label)
1130 }
1131
1132 #[must_use]
1134 pub fn estimate_avg_degree(&self, edge_type: &str, outgoing: bool) -> f64 {
1135 self.statistics
1136 .read()
1137 .estimate_avg_degree(edge_type, outgoing)
1138 }
1139
1140 fn get_or_create_label_id(&self, label: &str) -> u32 {
1143 {
1144 let label_to_id = self.label_to_id.read();
1145 if let Some(&id) = label_to_id.get(label) {
1146 return id;
1147 }
1148 }
1149
1150 let mut label_to_id = self.label_to_id.write();
1151 let mut id_to_label = self.id_to_label.write();
1152
1153 if let Some(&id) = label_to_id.get(label) {
1155 return id;
1156 }
1157
1158 let id = id_to_label.len() as u32;
1159
1160 let label: Arc<str> = label.into();
1161 label_to_id.insert(label.clone(), id);
1162 id_to_label.push(label);
1163
1164 id
1165 }
1166
1167 fn get_or_create_edge_type_id(&self, edge_type: &str) -> u32 {
1168 {
1169 let type_to_id = self.edge_type_to_id.read();
1170 if let Some(&id) = type_to_id.get(edge_type) {
1171 return id;
1172 }
1173 }
1174
1175 let mut type_to_id = self.edge_type_to_id.write();
1176 let mut id_to_type = self.id_to_edge_type.write();
1177
1178 if let Some(&id) = type_to_id.get(edge_type) {
1180 return id;
1181 }
1182
1183 let id = id_to_type.len() as u32;
1184 let edge_type: Arc<str> = edge_type.into();
1185 type_to_id.insert(edge_type.clone(), id);
1186 id_to_type.push(edge_type);
1187
1188 id
1189 }
1190
1191 pub fn create_node_with_id(&self, id: NodeId, labels: &[&str]) {
1198 let epoch = self.current_epoch();
1199 let mut record = NodeRecord::new(id, epoch);
1200 record.set_label_count(labels.len() as u16);
1201
1202 let mut node_label_set = FxHashSet::default();
1204 for label in labels {
1205 let label_id = self.get_or_create_label_id(*label);
1206 node_label_set.insert(label_id);
1207
1208 let mut index = self.label_index.write();
1210 while index.len() <= label_id as usize {
1211 index.push(FxHashMap::default());
1212 }
1213 index[label_id as usize].insert(id, ());
1214 }
1215
1216 self.node_labels.write().insert(id, node_label_set);
1218
1219 let chain = VersionChain::with_initial(record, epoch, TxId::SYSTEM);
1221 self.nodes.write().insert(id, chain);
1222
1223 let id_val = id.as_u64();
1225 let _ = self
1226 .next_node_id
1227 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
1228 if id_val >= current {
1229 Some(id_val + 1)
1230 } else {
1231 None
1232 }
1233 });
1234 }
1235
1236 pub fn create_edge_with_id(&self, id: EdgeId, src: NodeId, dst: NodeId, edge_type: &str) {
1240 let epoch = self.current_epoch();
1241 let type_id = self.get_or_create_edge_type_id(edge_type);
1242
1243 let record = EdgeRecord::new(id, src, dst, type_id, epoch);
1244 let chain = VersionChain::with_initial(record, epoch, TxId::SYSTEM);
1245 self.edges.write().insert(id, chain);
1246
1247 self.forward_adj.add_edge(src, dst, id);
1249 if let Some(ref backward) = self.backward_adj {
1250 backward.add_edge(dst, src, id);
1251 }
1252
1253 let id_val = id.as_u64();
1255 let _ = self
1256 .next_edge_id
1257 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
1258 if id_val >= current {
1259 Some(id_val + 1)
1260 } else {
1261 None
1262 }
1263 });
1264 }
1265
1266 pub fn set_epoch(&self, epoch: EpochId) {
1268 self.current_epoch.store(epoch.as_u64(), Ordering::SeqCst);
1269 }
1270}
1271
1272impl Default for LpgStore {
1273 fn default() -> Self {
1274 Self::new()
1275 }
1276}
1277
1278#[cfg(test)]
1279mod tests {
1280 use super::*;
1281
1282 #[test]
1283 fn test_create_node() {
1284 let store = LpgStore::new();
1285
1286 let id = store.create_node(&["Person"]);
1287 assert!(id.is_valid());
1288
1289 let node = store.get_node(id).unwrap();
1290 assert!(node.has_label("Person"));
1291 assert!(!node.has_label("Animal"));
1292 }
1293
1294 #[test]
1295 fn test_create_node_with_props() {
1296 let store = LpgStore::new();
1297
1298 let id = store.create_node_with_props(
1299 &["Person"],
1300 [("name", Value::from("Alice")), ("age", Value::from(30i64))],
1301 );
1302
1303 let node = store.get_node(id).unwrap();
1304 assert_eq!(
1305 node.get_property("name").and_then(|v| v.as_str()),
1306 Some("Alice")
1307 );
1308 assert_eq!(
1309 node.get_property("age").and_then(|v| v.as_int64()),
1310 Some(30)
1311 );
1312 }
1313
1314 #[test]
1315 fn test_delete_node() {
1316 let store = LpgStore::new();
1317
1318 let id = store.create_node(&["Person"]);
1319 assert_eq!(store.node_count(), 1);
1320
1321 assert!(store.delete_node(id));
1322 assert_eq!(store.node_count(), 0);
1323 assert!(store.get_node(id).is_none());
1324
1325 assert!(!store.delete_node(id));
1327 }
1328
1329 #[test]
1330 fn test_create_edge() {
1331 let store = LpgStore::new();
1332
1333 let alice = store.create_node(&["Person"]);
1334 let bob = store.create_node(&["Person"]);
1335
1336 let edge_id = store.create_edge(alice, bob, "KNOWS");
1337 assert!(edge_id.is_valid());
1338
1339 let edge = store.get_edge(edge_id).unwrap();
1340 assert_eq!(edge.src, alice);
1341 assert_eq!(edge.dst, bob);
1342 assert_eq!(edge.edge_type.as_ref(), "KNOWS");
1343 }
1344
1345 #[test]
1346 fn test_neighbors() {
1347 let store = LpgStore::new();
1348
1349 let a = store.create_node(&["Person"]);
1350 let b = store.create_node(&["Person"]);
1351 let c = store.create_node(&["Person"]);
1352
1353 store.create_edge(a, b, "KNOWS");
1354 store.create_edge(a, c, "KNOWS");
1355
1356 let outgoing: Vec<_> = store.neighbors(a, Direction::Outgoing).collect();
1357 assert_eq!(outgoing.len(), 2);
1358 assert!(outgoing.contains(&b));
1359 assert!(outgoing.contains(&c));
1360
1361 let incoming: Vec<_> = store.neighbors(b, Direction::Incoming).collect();
1362 assert_eq!(incoming.len(), 1);
1363 assert!(incoming.contains(&a));
1364 }
1365
1366 #[test]
1367 fn test_nodes_by_label() {
1368 let store = LpgStore::new();
1369
1370 let p1 = store.create_node(&["Person"]);
1371 let p2 = store.create_node(&["Person"]);
1372 let _a = store.create_node(&["Animal"]);
1373
1374 let persons = store.nodes_by_label("Person");
1375 assert_eq!(persons.len(), 2);
1376 assert!(persons.contains(&p1));
1377 assert!(persons.contains(&p2));
1378
1379 let animals = store.nodes_by_label("Animal");
1380 assert_eq!(animals.len(), 1);
1381 }
1382
1383 #[test]
1384 fn test_delete_edge() {
1385 let store = LpgStore::new();
1386
1387 let a = store.create_node(&["Person"]);
1388 let b = store.create_node(&["Person"]);
1389 let edge_id = store.create_edge(a, b, "KNOWS");
1390
1391 assert_eq!(store.edge_count(), 1);
1392
1393 assert!(store.delete_edge(edge_id));
1394 assert_eq!(store.edge_count(), 0);
1395 assert!(store.get_edge(edge_id).is_none());
1396 }
1397}