grafeo_core/graph/lpg/store/
statistics.rs1use super::LpgStore;
2use crate::statistics::{EdgeTypeStatistics, LabelStatistics, Statistics};
3use grafeo_common::utils::hash::FxHashMap;
4use std::sync::Arc;
5use std::sync::atomic::Ordering;
6
7impl LpgStore {
8 #[must_use]
12 pub fn statistics(&self) -> Arc<Statistics> {
13 Arc::clone(&self.statistics.read())
14 }
15
16 pub fn ensure_statistics_fresh(&self) {
21 if self.needs_stats_recompute.swap(false, Ordering::Relaxed) {
22 self.compute_statistics();
23 }
24 }
25
26 #[cfg(not(feature = "tiered-storage"))]
31 pub fn compute_statistics(&self) {
32 let mut stats = Statistics::new();
33
34 stats.total_nodes = self.node_count() as u64;
36 stats.total_edges = self.edge_count() as u64;
37
38 let id_to_label = self.id_to_label.read();
40 let label_index = self.label_index.read();
41
42 for (label_id, label_name) in id_to_label.iter().enumerate() {
43 let node_count = label_index.get(label_id).map_or(0, |set| set.len() as u64);
44
45 if node_count > 0 {
46 let avg_out_degree = if stats.total_nodes > 0 {
48 stats.total_edges as f64 / stats.total_nodes as f64
49 } else {
50 0.0
51 };
52
53 let label_stats =
54 LabelStatistics::new(node_count).with_degrees(avg_out_degree, avg_out_degree);
55
56 stats.update_label(label_name.as_ref(), label_stats);
57 }
58 }
59
60 let id_to_edge_type = self.id_to_edge_type.read();
62 let edges = self.edges.read();
63 let epoch = self.current_epoch();
64
65 let mut edge_type_counts: FxHashMap<u32, u64> = FxHashMap::default();
66 for chain in edges.values() {
67 if let Some(record) = chain.visible_at(epoch)
68 && !record.is_deleted()
69 {
70 *edge_type_counts.entry(record.type_id).or_default() += 1;
71 }
72 }
73
74 for (type_id, count) in edge_type_counts {
75 if let Some(type_name) = id_to_edge_type.get(type_id as usize) {
76 let avg_degree = if stats.total_nodes > 0 {
77 count as f64 / stats.total_nodes as f64
78 } else {
79 0.0
80 };
81
82 let edge_stats = EdgeTypeStatistics::new(count, avg_degree, avg_degree);
83 stats.update_edge_type(type_name.as_ref(), edge_stats);
84 }
85 }
86
87 *self.statistics.write() = Arc::new(stats);
88 }
89
90 #[cfg(feature = "tiered-storage")]
93 pub fn compute_statistics(&self) {
94 let mut stats = Statistics::new();
95
96 stats.total_nodes = self.node_count() as u64;
98 stats.total_edges = self.edge_count() as u64;
99
100 let id_to_label = self.id_to_label.read();
102 let label_index = self.label_index.read();
103
104 for (label_id, label_name) in id_to_label.iter().enumerate() {
105 let node_count = label_index.get(label_id).map_or(0, |set| set.len() as u64);
106
107 if node_count > 0 {
108 let avg_out_degree = if stats.total_nodes > 0 {
109 stats.total_edges as f64 / stats.total_nodes as f64
110 } else {
111 0.0
112 };
113
114 let label_stats =
115 LabelStatistics::new(node_count).with_degrees(avg_out_degree, avg_out_degree);
116
117 stats.update_label(label_name.as_ref(), label_stats);
118 }
119 }
120
121 let id_to_edge_type = self.id_to_edge_type.read();
123 let versions = self.edge_versions.read();
124 let epoch = self.current_epoch();
125
126 let mut edge_type_counts: FxHashMap<u32, u64> = FxHashMap::default();
127 for index in versions.values() {
128 if let Some(vref) = index.visible_at(epoch)
129 && let Some(record) = self.read_edge_record(&vref)
130 && !record.is_deleted()
131 {
132 *edge_type_counts.entry(record.type_id).or_default() += 1;
133 }
134 }
135
136 for (type_id, count) in edge_type_counts {
137 if let Some(type_name) = id_to_edge_type.get(type_id as usize) {
138 let avg_degree = if stats.total_nodes > 0 {
139 count as f64 / stats.total_nodes as f64
140 } else {
141 0.0
142 };
143
144 let edge_stats = EdgeTypeStatistics::new(count, avg_degree, avg_degree);
145 stats.update_edge_type(type_name.as_ref(), edge_stats);
146 }
147 }
148
149 *self.statistics.write() = Arc::new(stats);
150 }
151
152 #[must_use]
154 pub fn estimate_label_cardinality(&self, label: &str) -> f64 {
155 self.statistics.read().estimate_label_cardinality(label)
156 }
157
158 #[must_use]
160 pub fn estimate_avg_degree(&self, edge_type: &str, outgoing: bool) -> f64 {
161 self.statistics
162 .read()
163 .estimate_avg_degree(edge_type, outgoing)
164 }
165}