1use super::property::CompareOp;
4use super::{Edge, EdgeRecord, Node, NodeRecord, PropertyStorage};
5use crate::graph::Direction;
6use crate::index::adjacency::ChunkedAdjacency;
7use crate::index::zone_map::ZoneMapEntry;
8use crate::statistics::{EdgeTypeStatistics, LabelStatistics, Statistics};
9use grafeo_common::mvcc::VersionChain;
10use grafeo_common::types::{EdgeId, EpochId, NodeId, PropertyKey, TxId, Value};
11use grafeo_common::utils::hash::{FxHashMap, FxHashSet};
12use parking_lot::RwLock;
13use std::sync::Arc;
14use std::sync::atomic::{AtomicU64, Ordering};
15
16#[derive(Debug, Clone)]
18pub struct LpgStoreConfig {
19 pub backward_edges: bool,
21 pub initial_node_capacity: usize,
23 pub initial_edge_capacity: usize,
25}
26
27impl Default for LpgStoreConfig {
28 fn default() -> Self {
29 Self {
30 backward_edges: true,
31 initial_node_capacity: 1024,
32 initial_edge_capacity: 4096,
33 }
34 }
35}
36
37pub struct LpgStore {
42 #[allow(dead_code)]
44 config: LpgStoreConfig,
45
46 nodes: RwLock<FxHashMap<NodeId, VersionChain<NodeRecord>>>,
48
49 edges: RwLock<FxHashMap<EdgeId, VersionChain<EdgeRecord>>>,
51
52 node_properties: PropertyStorage<NodeId>,
54
55 edge_properties: PropertyStorage<EdgeId>,
57
58 label_to_id: RwLock<FxHashMap<Arc<str>, u32>>,
60
61 id_to_label: RwLock<Vec<Arc<str>>>,
63
64 edge_type_to_id: RwLock<FxHashMap<Arc<str>, u32>>,
66
67 id_to_edge_type: RwLock<Vec<Arc<str>>>,
69
70 forward_adj: ChunkedAdjacency,
72
73 backward_adj: Option<ChunkedAdjacency>,
76
77 label_index: RwLock<Vec<FxHashMap<NodeId, ()>>>,
79
80 node_labels: RwLock<FxHashMap<NodeId, FxHashSet<u32>>>,
83
84 next_node_id: AtomicU64,
86
87 next_edge_id: AtomicU64,
89
90 current_epoch: AtomicU64,
92
93 statistics: RwLock<Statistics>,
95}
96
97impl LpgStore {
98 #[must_use]
100 pub fn new() -> Self {
101 Self::with_config(LpgStoreConfig::default())
102 }
103
104 #[must_use]
106 pub fn with_config(config: LpgStoreConfig) -> Self {
107 let backward_adj = if config.backward_edges {
108 Some(ChunkedAdjacency::new())
109 } else {
110 None
111 };
112
113 Self {
114 nodes: RwLock::new(FxHashMap::default()),
115 edges: RwLock::new(FxHashMap::default()),
116 node_properties: PropertyStorage::new(),
117 edge_properties: PropertyStorage::new(),
118 label_to_id: RwLock::new(FxHashMap::default()),
119 id_to_label: RwLock::new(Vec::new()),
120 edge_type_to_id: RwLock::new(FxHashMap::default()),
121 id_to_edge_type: RwLock::new(Vec::new()),
122 forward_adj: ChunkedAdjacency::new(),
123 backward_adj,
124 label_index: RwLock::new(Vec::new()),
125 node_labels: RwLock::new(FxHashMap::default()),
126 next_node_id: AtomicU64::new(0),
127 next_edge_id: AtomicU64::new(0),
128 current_epoch: AtomicU64::new(0),
129 statistics: RwLock::new(Statistics::new()),
130 config,
131 }
132 }
133
134 #[must_use]
136 pub fn current_epoch(&self) -> EpochId {
137 EpochId::new(self.current_epoch.load(Ordering::Acquire))
138 }
139
140 pub fn new_epoch(&self) -> EpochId {
142 let id = self.current_epoch.fetch_add(1, Ordering::AcqRel) + 1;
143 EpochId::new(id)
144 }
145
146 pub fn create_node(&self, labels: &[&str]) -> NodeId {
152 self.create_node_versioned(labels, self.current_epoch(), TxId::SYSTEM)
153 }
154
155 pub fn create_node_versioned(&self, labels: &[&str], epoch: EpochId, tx_id: TxId) -> NodeId {
157 let id = NodeId::new(self.next_node_id.fetch_add(1, Ordering::Relaxed));
158
159 let mut record = NodeRecord::new(id, epoch);
160 record.set_label_count(labels.len() as u16);
161
162 let mut node_label_set = FxHashSet::default();
164 for label in labels {
165 let label_id = self.get_or_create_label_id(*label);
166 node_label_set.insert(label_id);
167
168 let mut index = self.label_index.write();
170 while index.len() <= label_id as usize {
171 index.push(FxHashMap::default());
172 }
173 index[label_id as usize].insert(id, ());
174 }
175
176 self.node_labels.write().insert(id, node_label_set);
178
179 let chain = VersionChain::with_initial(record, epoch, tx_id);
181 self.nodes.write().insert(id, chain);
182 id
183 }
184
185 pub fn create_node_with_props(
187 &self,
188 labels: &[&str],
189 properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
190 ) -> NodeId {
191 self.create_node_with_props_versioned(
192 labels,
193 properties,
194 self.current_epoch(),
195 TxId::SYSTEM,
196 )
197 }
198
199 pub fn create_node_with_props_versioned(
201 &self,
202 labels: &[&str],
203 properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
204 epoch: EpochId,
205 tx_id: TxId,
206 ) -> NodeId {
207 let id = self.create_node_versioned(labels, epoch, tx_id);
208
209 for (key, value) in properties {
210 self.node_properties.set(id, key.into(), value.into());
211 }
212
213 let count = self.node_properties.get_all(id).len() as u16;
215 if let Some(chain) = self.nodes.write().get_mut(&id) {
216 if let Some(record) = chain.latest_mut() {
217 record.props_count = count;
218 }
219 }
220
221 id
222 }
223
224 #[must_use]
226 pub fn get_node(&self, id: NodeId) -> Option<Node> {
227 self.get_node_at_epoch(id, self.current_epoch())
228 }
229
230 #[must_use]
232 pub fn get_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> Option<Node> {
233 let nodes = self.nodes.read();
234 let chain = nodes.get(&id)?;
235 let record = chain.visible_at(epoch)?;
236
237 if record.is_deleted() {
238 return None;
239 }
240
241 let mut node = Node::new(id);
242
243 let id_to_label = self.id_to_label.read();
245 let node_labels = self.node_labels.read();
246 if let Some(label_ids) = node_labels.get(&id) {
247 for &label_id in label_ids {
248 if let Some(label) = id_to_label.get(label_id as usize) {
249 node.labels.push(label.clone());
250 }
251 }
252 }
253
254 node.properties = self.node_properties.get_all(id).into_iter().collect();
256
257 Some(node)
258 }
259
260 #[must_use]
262 pub fn get_node_versioned(&self, id: NodeId, epoch: EpochId, tx_id: TxId) -> Option<Node> {
263 let nodes = self.nodes.read();
264 let chain = nodes.get(&id)?;
265 let record = chain.visible_to(epoch, tx_id)?;
266
267 if record.is_deleted() {
268 return None;
269 }
270
271 let mut node = Node::new(id);
272
273 let id_to_label = self.id_to_label.read();
275 let node_labels = self.node_labels.read();
276 if let Some(label_ids) = node_labels.get(&id) {
277 for &label_id in label_ids {
278 if let Some(label) = id_to_label.get(label_id as usize) {
279 node.labels.push(label.clone());
280 }
281 }
282 }
283
284 node.properties = self.node_properties.get_all(id).into_iter().collect();
286
287 Some(node)
288 }
289
290 pub fn delete_node(&self, id: NodeId) -> bool {
292 self.delete_node_at_epoch(id, self.current_epoch())
293 }
294
295 pub fn delete_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> bool {
297 let mut nodes = self.nodes.write();
298 if let Some(chain) = nodes.get_mut(&id) {
299 if let Some(record) = chain.visible_at(epoch) {
301 if record.is_deleted() {
302 return false;
303 }
304 } else {
305 return false;
307 }
308
309 chain.mark_deleted(epoch);
311
312 let mut index = self.label_index.write();
314 let mut node_labels = self.node_labels.write();
315 if let Some(label_ids) = node_labels.remove(&id) {
316 for label_id in label_ids {
317 if let Some(set) = index.get_mut(label_id as usize) {
318 set.remove(&id);
319 }
320 }
321 }
322
323 drop(nodes); drop(index);
326 drop(node_labels);
327 self.node_properties.remove_all(id);
328
329 true
332 } else {
333 false
334 }
335 }
336
337 pub fn delete_node_edges(&self, node_id: NodeId) {
339 let outgoing: Vec<EdgeId> = self
341 .forward_adj
342 .edges_from(node_id)
343 .into_iter()
344 .map(|(_, edge_id)| edge_id)
345 .collect();
346
347 let incoming: Vec<EdgeId> = if let Some(ref backward) = self.backward_adj {
349 backward
350 .edges_from(node_id)
351 .into_iter()
352 .map(|(_, edge_id)| edge_id)
353 .collect()
354 } else {
355 let epoch = self.current_epoch();
357 self.edges
358 .read()
359 .iter()
360 .filter_map(|(id, chain)| {
361 chain.visible_at(epoch).and_then(|r| {
362 if !r.is_deleted() && r.dst == node_id {
363 Some(*id)
364 } else {
365 None
366 }
367 })
368 })
369 .collect()
370 };
371
372 for edge_id in outgoing.into_iter().chain(incoming) {
374 self.delete_edge(edge_id);
375 }
376 }
377
378 pub fn set_node_property(&self, id: NodeId, key: &str, value: Value) {
380 self.node_properties.set(id, key.into(), value);
381
382 let count = self.node_properties.get_all(id).len() as u16;
384 if let Some(chain) = self.nodes.write().get_mut(&id) {
385 if let Some(record) = chain.latest_mut() {
386 record.props_count = count;
387 }
388 }
389 }
390
391 pub fn set_edge_property(&self, id: EdgeId, key: &str, value: Value) {
393 self.edge_properties.set(id, key.into(), value);
394 }
395
396 pub fn remove_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
400 let result = self.node_properties.remove(id, &key.into());
401
402 let count = self.node_properties.get_all(id).len() as u16;
404 if let Some(chain) = self.nodes.write().get_mut(&id) {
405 if let Some(record) = chain.latest_mut() {
406 record.props_count = count;
407 }
408 }
409
410 result
411 }
412
413 pub fn remove_edge_property(&self, id: EdgeId, key: &str) -> Option<Value> {
417 self.edge_properties.remove(id, &key.into())
418 }
419
420 pub fn add_label(&self, node_id: NodeId, label: &str) -> bool {
425 let epoch = self.current_epoch();
426
427 let nodes = self.nodes.read();
429 if let Some(chain) = nodes.get(&node_id) {
430 if chain.visible_at(epoch).map_or(true, |r| r.is_deleted()) {
431 return false;
432 }
433 } else {
434 return false;
435 }
436 drop(nodes);
437
438 let label_id = self.get_or_create_label_id(label);
440
441 let mut node_labels = self.node_labels.write();
443 let label_set = node_labels
444 .entry(node_id)
445 .or_insert_with(FxHashSet::default);
446
447 if label_set.contains(&label_id) {
448 return false; }
450
451 label_set.insert(label_id);
452 drop(node_labels);
453
454 let mut index = self.label_index.write();
456 if (label_id as usize) >= index.len() {
457 index.resize(label_id as usize + 1, FxHashMap::default());
458 }
459 index[label_id as usize].insert(node_id, ());
460
461 if let Some(chain) = self.nodes.write().get_mut(&node_id) {
463 if let Some(record) = chain.latest_mut() {
464 let count = self.node_labels.read().get(&node_id).map_or(0, |s| s.len());
465 record.set_label_count(count as u16);
466 }
467 }
468
469 true
470 }
471
472 pub fn remove_label(&self, node_id: NodeId, label: &str) -> bool {
477 let epoch = self.current_epoch();
478
479 let nodes = self.nodes.read();
481 if let Some(chain) = nodes.get(&node_id) {
482 if chain.visible_at(epoch).map_or(true, |r| r.is_deleted()) {
483 return false;
484 }
485 } else {
486 return false;
487 }
488 drop(nodes);
489
490 let label_id = {
492 let label_ids = self.label_to_id.read();
493 match label_ids.get(label) {
494 Some(&id) => id,
495 None => return false, }
497 };
498
499 let mut node_labels = self.node_labels.write();
501 if let Some(label_set) = node_labels.get_mut(&node_id) {
502 if !label_set.remove(&label_id) {
503 return false; }
505 } else {
506 return false;
507 }
508 drop(node_labels);
509
510 let mut index = self.label_index.write();
512 if (label_id as usize) < index.len() {
513 index[label_id as usize].remove(&node_id);
514 }
515
516 if let Some(chain) = self.nodes.write().get_mut(&node_id) {
518 if let Some(record) = chain.latest_mut() {
519 let count = self.node_labels.read().get(&node_id).map_or(0, |s| s.len());
520 record.set_label_count(count as u16);
521 }
522 }
523
524 true
525 }
526
527 #[must_use]
529 pub fn node_count(&self) -> usize {
530 let epoch = self.current_epoch();
531 self.nodes
532 .read()
533 .values()
534 .filter_map(|chain| chain.visible_at(epoch))
535 .filter(|r| !r.is_deleted())
536 .count()
537 }
538
539 #[must_use]
544 pub fn node_ids(&self) -> Vec<NodeId> {
545 let epoch = self.current_epoch();
546 self.nodes
547 .read()
548 .iter()
549 .filter_map(|(id, chain)| {
550 chain
551 .visible_at(epoch)
552 .and_then(|r| if !r.is_deleted() { Some(*id) } else { None })
553 })
554 .collect()
555 }
556
557 pub fn create_edge(&self, src: NodeId, dst: NodeId, edge_type: &str) -> EdgeId {
561 self.create_edge_versioned(src, dst, edge_type, self.current_epoch(), TxId::SYSTEM)
562 }
563
564 pub fn create_edge_versioned(
566 &self,
567 src: NodeId,
568 dst: NodeId,
569 edge_type: &str,
570 epoch: EpochId,
571 tx_id: TxId,
572 ) -> EdgeId {
573 let id = EdgeId::new(self.next_edge_id.fetch_add(1, Ordering::Relaxed));
574 let type_id = self.get_or_create_edge_type_id(edge_type);
575
576 let record = EdgeRecord::new(id, src, dst, type_id, epoch);
577 let chain = VersionChain::with_initial(record, epoch, tx_id);
578 self.edges.write().insert(id, chain);
579
580 self.forward_adj.add_edge(src, dst, id);
582 if let Some(ref backward) = self.backward_adj {
583 backward.add_edge(dst, src, id);
584 }
585
586 id
587 }
588
589 pub fn create_edge_with_props(
591 &self,
592 src: NodeId,
593 dst: NodeId,
594 edge_type: &str,
595 properties: impl IntoIterator<Item = (impl Into<PropertyKey>, impl Into<Value>)>,
596 ) -> EdgeId {
597 let id = self.create_edge(src, dst, edge_type);
598
599 for (key, value) in properties {
600 self.edge_properties.set(id, key.into(), value.into());
601 }
602
603 id
604 }
605
606 #[must_use]
608 pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
609 self.get_edge_at_epoch(id, self.current_epoch())
610 }
611
612 #[must_use]
614 pub fn get_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> Option<Edge> {
615 let edges = self.edges.read();
616 let chain = edges.get(&id)?;
617 let record = chain.visible_at(epoch)?;
618
619 if record.is_deleted() {
620 return None;
621 }
622
623 let edge_type = {
624 let id_to_type = self.id_to_edge_type.read();
625 id_to_type.get(record.type_id as usize)?.clone()
626 };
627
628 let mut edge = Edge::new(id, record.src, record.dst, edge_type);
629
630 edge.properties = self.edge_properties.get_all(id).into_iter().collect();
632
633 Some(edge)
634 }
635
636 #[must_use]
638 pub fn get_edge_versioned(&self, id: EdgeId, epoch: EpochId, tx_id: TxId) -> Option<Edge> {
639 let edges = self.edges.read();
640 let chain = edges.get(&id)?;
641 let record = chain.visible_to(epoch, tx_id)?;
642
643 if record.is_deleted() {
644 return None;
645 }
646
647 let edge_type = {
648 let id_to_type = self.id_to_edge_type.read();
649 id_to_type.get(record.type_id as usize)?.clone()
650 };
651
652 let mut edge = Edge::new(id, record.src, record.dst, edge_type);
653
654 edge.properties = self.edge_properties.get_all(id).into_iter().collect();
656
657 Some(edge)
658 }
659
660 pub fn delete_edge(&self, id: EdgeId) -> bool {
662 self.delete_edge_at_epoch(id, self.current_epoch())
663 }
664
665 pub fn delete_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> bool {
667 let mut edges = self.edges.write();
668 if let Some(chain) = edges.get_mut(&id) {
669 let (src, dst) = {
671 match chain.visible_at(epoch) {
672 Some(record) => {
673 if record.is_deleted() {
674 return false;
675 }
676 (record.src, record.dst)
677 }
678 None => return false, }
680 };
681
682 chain.mark_deleted(epoch);
684
685 drop(edges); self.forward_adj.mark_deleted(src, id);
689 if let Some(ref backward) = self.backward_adj {
690 backward.mark_deleted(dst, id);
691 }
692
693 self.edge_properties.remove_all(id);
695
696 true
697 } else {
698 false
699 }
700 }
701
702 #[must_use]
704 pub fn edge_count(&self) -> usize {
705 let epoch = self.current_epoch();
706 self.edges
707 .read()
708 .values()
709 .filter_map(|chain| chain.visible_at(epoch))
710 .filter(|r| !r.is_deleted())
711 .count()
712 }
713
714 pub fn discard_uncommitted_versions(&self, tx_id: TxId) {
719 {
721 let mut nodes = self.nodes.write();
722 for chain in nodes.values_mut() {
723 chain.remove_versions_by(tx_id);
724 }
725 nodes.retain(|_, chain| !chain.is_empty());
727 }
728
729 {
731 let mut edges = self.edges.write();
732 for chain in edges.values_mut() {
733 chain.remove_versions_by(tx_id);
734 }
735 edges.retain(|_, chain| !chain.is_empty());
737 }
738 }
739
740 #[must_use]
742 pub fn label_count(&self) -> usize {
743 self.id_to_label.read().len()
744 }
745
746 #[must_use]
750 pub fn property_key_count(&self) -> usize {
751 let node_keys = self.node_properties.column_count();
752 let edge_keys = self.edge_properties.column_count();
753 node_keys + edge_keys
757 }
758
759 #[must_use]
761 pub fn edge_type_count(&self) -> usize {
762 self.id_to_edge_type.read().len()
763 }
764
765 pub fn neighbors(
769 &self,
770 node: NodeId,
771 direction: Direction,
772 ) -> impl Iterator<Item = NodeId> + '_ {
773 let forward: Box<dyn Iterator<Item = NodeId>> = match direction {
774 Direction::Outgoing | Direction::Both => {
775 Box::new(self.forward_adj.neighbors(node).into_iter())
776 }
777 Direction::Incoming => Box::new(std::iter::empty()),
778 };
779
780 let backward: Box<dyn Iterator<Item = NodeId>> = match direction {
781 Direction::Incoming | Direction::Both => {
782 if let Some(ref adj) = self.backward_adj {
783 Box::new(adj.neighbors(node).into_iter())
784 } else {
785 Box::new(std::iter::empty())
786 }
787 }
788 Direction::Outgoing => Box::new(std::iter::empty()),
789 };
790
791 forward.chain(backward)
792 }
793
794 pub fn edges_from(
798 &self,
799 node: NodeId,
800 direction: Direction,
801 ) -> impl Iterator<Item = (NodeId, EdgeId)> + '_ {
802 let forward: Box<dyn Iterator<Item = (NodeId, EdgeId)>> = match direction {
803 Direction::Outgoing | Direction::Both => {
804 Box::new(self.forward_adj.edges_from(node).into_iter())
805 }
806 Direction::Incoming => Box::new(std::iter::empty()),
807 };
808
809 let backward: Box<dyn Iterator<Item = (NodeId, EdgeId)>> = match direction {
810 Direction::Incoming | Direction::Both => {
811 if let Some(ref adj) = self.backward_adj {
812 Box::new(adj.edges_from(node).into_iter())
813 } else {
814 Box::new(std::iter::empty())
815 }
816 }
817 Direction::Outgoing => Box::new(std::iter::empty()),
818 };
819
820 forward.chain(backward)
821 }
822
823 #[must_use]
825 pub fn edge_type(&self, id: EdgeId) -> Option<Arc<str>> {
826 let edges = self.edges.read();
827 let chain = edges.get(&id)?;
828 let epoch = self.current_epoch();
829 let record = chain.visible_at(epoch)?;
830 let id_to_type = self.id_to_edge_type.read();
831 id_to_type.get(record.type_id as usize).cloned()
832 }
833
834 pub fn nodes_by_label(&self, label: &str) -> Vec<NodeId> {
836 let label_to_id = self.label_to_id.read();
837 if let Some(&label_id) = label_to_id.get(label) {
838 let index = self.label_index.read();
839 if let Some(set) = index.get(label_id as usize) {
840 return set.keys().copied().collect();
841 }
842 }
843 Vec::new()
844 }
845
846 #[must_use]
853 pub fn node_property_might_match(
854 &self,
855 property: &PropertyKey,
856 op: CompareOp,
857 value: &Value,
858 ) -> bool {
859 self.node_properties.might_match(property, op, value)
860 }
861
862 #[must_use]
864 pub fn edge_property_might_match(
865 &self,
866 property: &PropertyKey,
867 op: CompareOp,
868 value: &Value,
869 ) -> bool {
870 self.edge_properties.might_match(property, op, value)
871 }
872
873 #[must_use]
875 pub fn node_property_zone_map(&self, property: &PropertyKey) -> Option<ZoneMapEntry> {
876 self.node_properties.zone_map(property)
877 }
878
879 #[must_use]
881 pub fn edge_property_zone_map(&self, property: &PropertyKey) -> Option<ZoneMapEntry> {
882 self.edge_properties.zone_map(property)
883 }
884
885 pub fn rebuild_zone_maps(&self) {
887 self.node_properties.rebuild_zone_maps();
888 self.edge_properties.rebuild_zone_maps();
889 }
890
891 #[must_use]
895 pub fn statistics(&self) -> Statistics {
896 self.statistics.read().clone()
897 }
898
899 pub fn compute_statistics(&self) {
903 let mut stats = Statistics::new();
904
905 stats.total_nodes = self.node_count() as u64;
907 stats.total_edges = self.edge_count() as u64;
908
909 let id_to_label = self.id_to_label.read();
911 let label_index = self.label_index.read();
912
913 for (label_id, label_name) in id_to_label.iter().enumerate() {
914 let node_count = label_index
915 .get(label_id)
916 .map(|set| set.len() as u64)
917 .unwrap_or(0);
918
919 if node_count > 0 {
920 let avg_out_degree = if stats.total_nodes > 0 {
922 stats.total_edges as f64 / stats.total_nodes as f64
923 } else {
924 0.0
925 };
926
927 let label_stats =
928 LabelStatistics::new(node_count).with_degrees(avg_out_degree, avg_out_degree);
929
930 stats.update_label(label_name.as_ref(), label_stats);
931 }
932 }
933
934 let id_to_edge_type = self.id_to_edge_type.read();
936 let edges = self.edges.read();
937 let epoch = self.current_epoch();
938
939 let mut edge_type_counts: FxHashMap<u32, u64> = FxHashMap::default();
940 for chain in edges.values() {
941 if let Some(record) = chain.visible_at(epoch) {
942 if !record.is_deleted() {
943 *edge_type_counts.entry(record.type_id).or_default() += 1;
944 }
945 }
946 }
947
948 for (type_id, count) in edge_type_counts {
949 if let Some(type_name) = id_to_edge_type.get(type_id as usize) {
950 let avg_degree = if stats.total_nodes > 0 {
951 count as f64 / stats.total_nodes as f64
952 } else {
953 0.0
954 };
955
956 let edge_stats = EdgeTypeStatistics::new(count, avg_degree, avg_degree);
957 stats.update_edge_type(type_name.as_ref(), edge_stats);
958 }
959 }
960
961 *self.statistics.write() = stats;
962 }
963
964 #[must_use]
966 pub fn estimate_label_cardinality(&self, label: &str) -> f64 {
967 self.statistics.read().estimate_label_cardinality(label)
968 }
969
970 #[must_use]
972 pub fn estimate_avg_degree(&self, edge_type: &str, outgoing: bool) -> f64 {
973 self.statistics
974 .read()
975 .estimate_avg_degree(edge_type, outgoing)
976 }
977
978 fn get_or_create_label_id(&self, label: &str) -> u32 {
981 {
982 let label_to_id = self.label_to_id.read();
983 if let Some(&id) = label_to_id.get(label) {
984 return id;
985 }
986 }
987
988 let mut label_to_id = self.label_to_id.write();
989 let mut id_to_label = self.id_to_label.write();
990
991 if let Some(&id) = label_to_id.get(label) {
993 return id;
994 }
995
996 let id = id_to_label.len() as u32;
997
998 let label: Arc<str> = label.into();
999 label_to_id.insert(label.clone(), id);
1000 id_to_label.push(label);
1001
1002 id
1003 }
1004
1005 fn get_or_create_edge_type_id(&self, edge_type: &str) -> u32 {
1006 {
1007 let type_to_id = self.edge_type_to_id.read();
1008 if let Some(&id) = type_to_id.get(edge_type) {
1009 return id;
1010 }
1011 }
1012
1013 let mut type_to_id = self.edge_type_to_id.write();
1014 let mut id_to_type = self.id_to_edge_type.write();
1015
1016 if let Some(&id) = type_to_id.get(edge_type) {
1018 return id;
1019 }
1020
1021 let id = id_to_type.len() as u32;
1022 let edge_type: Arc<str> = edge_type.into();
1023 type_to_id.insert(edge_type.clone(), id);
1024 id_to_type.push(edge_type);
1025
1026 id
1027 }
1028
1029 pub fn create_node_with_id(&self, id: NodeId, labels: &[&str]) {
1036 let epoch = self.current_epoch();
1037 let mut record = NodeRecord::new(id, epoch);
1038 record.set_label_count(labels.len() as u16);
1039
1040 let mut node_label_set = FxHashSet::default();
1042 for label in labels {
1043 let label_id = self.get_or_create_label_id(*label);
1044 node_label_set.insert(label_id);
1045
1046 let mut index = self.label_index.write();
1048 while index.len() <= label_id as usize {
1049 index.push(FxHashMap::default());
1050 }
1051 index[label_id as usize].insert(id, ());
1052 }
1053
1054 self.node_labels.write().insert(id, node_label_set);
1056
1057 let chain = VersionChain::with_initial(record, epoch, TxId::SYSTEM);
1059 self.nodes.write().insert(id, chain);
1060
1061 let id_val = id.as_u64();
1063 let _ = self
1064 .next_node_id
1065 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
1066 if id_val >= current {
1067 Some(id_val + 1)
1068 } else {
1069 None
1070 }
1071 });
1072 }
1073
1074 pub fn create_edge_with_id(&self, id: EdgeId, src: NodeId, dst: NodeId, edge_type: &str) {
1078 let epoch = self.current_epoch();
1079 let type_id = self.get_or_create_edge_type_id(edge_type);
1080
1081 let record = EdgeRecord::new(id, src, dst, type_id, epoch);
1082 let chain = VersionChain::with_initial(record, epoch, TxId::SYSTEM);
1083 self.edges.write().insert(id, chain);
1084
1085 self.forward_adj.add_edge(src, dst, id);
1087 if let Some(ref backward) = self.backward_adj {
1088 backward.add_edge(dst, src, id);
1089 }
1090
1091 let id_val = id.as_u64();
1093 let _ = self
1094 .next_edge_id
1095 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
1096 if id_val >= current {
1097 Some(id_val + 1)
1098 } else {
1099 None
1100 }
1101 });
1102 }
1103
1104 pub fn set_epoch(&self, epoch: EpochId) {
1106 self.current_epoch.store(epoch.as_u64(), Ordering::SeqCst);
1107 }
1108}
1109
1110impl Default for LpgStore {
1111 fn default() -> Self {
1112 Self::new()
1113 }
1114}
1115
1116#[cfg(test)]
1117mod tests {
1118 use super::*;
1119
1120 #[test]
1121 fn test_create_node() {
1122 let store = LpgStore::new();
1123
1124 let id = store.create_node(&["Person"]);
1125 assert!(id.is_valid());
1126
1127 let node = store.get_node(id).unwrap();
1128 assert!(node.has_label("Person"));
1129 assert!(!node.has_label("Animal"));
1130 }
1131
1132 #[test]
1133 fn test_create_node_with_props() {
1134 let store = LpgStore::new();
1135
1136 let id = store.create_node_with_props(
1137 &["Person"],
1138 [("name", Value::from("Alice")), ("age", Value::from(30i64))],
1139 );
1140
1141 let node = store.get_node(id).unwrap();
1142 assert_eq!(
1143 node.get_property("name").and_then(|v| v.as_str()),
1144 Some("Alice")
1145 );
1146 assert_eq!(
1147 node.get_property("age").and_then(|v| v.as_int64()),
1148 Some(30)
1149 );
1150 }
1151
1152 #[test]
1153 fn test_delete_node() {
1154 let store = LpgStore::new();
1155
1156 let id = store.create_node(&["Person"]);
1157 assert_eq!(store.node_count(), 1);
1158
1159 assert!(store.delete_node(id));
1160 assert_eq!(store.node_count(), 0);
1161 assert!(store.get_node(id).is_none());
1162
1163 assert!(!store.delete_node(id));
1165 }
1166
1167 #[test]
1168 fn test_create_edge() {
1169 let store = LpgStore::new();
1170
1171 let alice = store.create_node(&["Person"]);
1172 let bob = store.create_node(&["Person"]);
1173
1174 let edge_id = store.create_edge(alice, bob, "KNOWS");
1175 assert!(edge_id.is_valid());
1176
1177 let edge = store.get_edge(edge_id).unwrap();
1178 assert_eq!(edge.src, alice);
1179 assert_eq!(edge.dst, bob);
1180 assert_eq!(edge.edge_type.as_ref(), "KNOWS");
1181 }
1182
1183 #[test]
1184 fn test_neighbors() {
1185 let store = LpgStore::new();
1186
1187 let a = store.create_node(&["Person"]);
1188 let b = store.create_node(&["Person"]);
1189 let c = store.create_node(&["Person"]);
1190
1191 store.create_edge(a, b, "KNOWS");
1192 store.create_edge(a, c, "KNOWS");
1193
1194 let outgoing: Vec<_> = store.neighbors(a, Direction::Outgoing).collect();
1195 assert_eq!(outgoing.len(), 2);
1196 assert!(outgoing.contains(&b));
1197 assert!(outgoing.contains(&c));
1198
1199 let incoming: Vec<_> = store.neighbors(b, Direction::Incoming).collect();
1200 assert_eq!(incoming.len(), 1);
1201 assert!(incoming.contains(&a));
1202 }
1203
1204 #[test]
1205 fn test_nodes_by_label() {
1206 let store = LpgStore::new();
1207
1208 let p1 = store.create_node(&["Person"]);
1209 let p2 = store.create_node(&["Person"]);
1210 let _a = store.create_node(&["Animal"]);
1211
1212 let persons = store.nodes_by_label("Person");
1213 assert_eq!(persons.len(), 2);
1214 assert!(persons.contains(&p1));
1215 assert!(persons.contains(&p2));
1216
1217 let animals = store.nodes_by_label("Animal");
1218 assert_eq!(animals.len(), 1);
1219 }
1220
1221 #[test]
1222 fn test_delete_edge() {
1223 let store = LpgStore::new();
1224
1225 let a = store.create_node(&["Person"]);
1226 let b = store.create_node(&["Person"]);
1227 let edge_id = store.create_edge(a, b, "KNOWS");
1228
1229 assert_eq!(store.edge_count(), 1);
1230
1231 assert!(store.delete_edge(edge_id));
1232 assert_eq!(store.edge_count(), 0);
1233 assert!(store.get_edge(edge_id).is_none());
1234 }
1235}