grafeo_core/graph/lpg/store/
statistics.rs1use super::LpgStore;
2use crate::statistics::{EdgeTypeStatistics, LabelStatistics, Statistics};
3use std::sync::Arc;
4use std::sync::atomic::Ordering;
5
6impl LpgStore {
7 #[must_use]
11 pub fn statistics(&self) -> Arc<Statistics> {
12 Arc::clone(&self.statistics.read())
13 }
14
15 #[doc(hidden)]
20 pub fn ensure_statistics_fresh(&self) {
21 if self.needs_stats_recompute.swap(false, Ordering::Relaxed) {
22 self.recompute_statistics_full();
23 } else {
24 self.compute_statistics();
25 }
26 }
27
28 pub(crate) fn compute_statistics(&self) {
34 let mut stats = Statistics::new();
35
36 stats.total_nodes = self.live_node_count.load(Ordering::Relaxed).max(0) as u64;
38 stats.total_edges = self.live_edge_count.load(Ordering::Relaxed).max(0) as u64;
39
40 let id_to_label = self.id_to_label.read();
42 let label_index = self.label_index.read();
43
44 for (label_id, label_name) in id_to_label.iter().enumerate() {
45 let node_count = label_index.get(label_id).map_or(0, |set| set.len() as u64);
46
47 if node_count > 0 {
48 let avg_out_degree = if stats.total_nodes > 0 {
49 stats.total_edges as f64 / stats.total_nodes as f64
50 } else {
51 0.0
52 };
53
54 let label_stats =
55 LabelStatistics::new(node_count).with_degrees(avg_out_degree, avg_out_degree);
56
57 stats.update_label(label_name.as_ref(), label_stats);
58 }
59 }
60
61 let id_to_edge_type = self.id_to_edge_type.read();
63 let edge_type_counts = self.edge_type_live_counts.read();
64
65 for (type_id, type_name) in id_to_edge_type.iter().enumerate() {
66 let count = edge_type_counts.get(type_id).copied().unwrap_or(0).max(0) as u64;
67
68 if count > 0 {
69 let avg_degree = if stats.total_nodes > 0 {
70 count as f64 / stats.total_nodes as f64
71 } else {
72 0.0
73 };
74
75 let edge_stats = EdgeTypeStatistics::new(count, avg_degree, avg_degree);
76 stats.update_edge_type(type_name.as_ref(), edge_stats);
77 }
78 }
79
80 *self.statistics.write() = Arc::new(stats);
81 }
82
83 #[cfg(not(feature = "tiered-storage"))]
86 fn recompute_statistics_full(&self) {
87 let epoch = self.current_epoch();
88
89 let total_nodes = self
91 .nodes
92 .read()
93 .values()
94 .filter_map(|chain| chain.visible_at(epoch))
95 .filter(|r| !r.is_deleted())
96 .count();
97
98 let edges = self.edges.read();
100 let mut total_edges: i64 = 0;
101 let id_to_edge_type = self.id_to_edge_type.read();
102 let mut type_counts = vec![0i64; id_to_edge_type.len()];
103
104 for chain in edges.values() {
105 if let Some(record) = chain.visible_at(epoch)
106 && !record.is_deleted()
107 {
108 total_edges += 1;
109 if (record.type_id as usize) < type_counts.len() {
110 type_counts[record.type_id as usize] += 1;
111 }
112 }
113 }
114
115 self.live_node_count
117 .store(total_nodes as i64, Ordering::Relaxed);
118 self.live_edge_count.store(total_edges, Ordering::Relaxed);
119 *self.edge_type_live_counts.write() = type_counts;
120
121 drop(edges);
122 drop(id_to_edge_type);
123
124 self.compute_statistics();
126 }
127
128 #[cfg(feature = "tiered-storage")]
132 fn recompute_statistics_full(&self) {
133 let epoch = self.current_epoch();
134
135 let versions = self.node_versions.read();
137 let total_nodes = versions
138 .iter()
139 .filter(|(_, index)| {
140 index.visible_at(epoch).map_or(false, |vref| {
141 self.read_node_record(&vref)
142 .map_or(false, |r| !r.is_deleted())
143 })
144 })
145 .count();
146 drop(versions);
147
148 let edge_versions = self.edge_versions.read();
150 let id_to_edge_type = self.id_to_edge_type.read();
151 let mut total_edges: i64 = 0;
152 let mut type_counts = vec![0i64; id_to_edge_type.len()];
153
154 for index in edge_versions.values() {
155 if let Some(vref) = index.visible_at(epoch)
156 && let Some(record) = self.read_edge_record(&vref)
157 && !record.is_deleted()
158 {
159 total_edges += 1;
160 if (record.type_id as usize) < type_counts.len() {
161 type_counts[record.type_id as usize] += 1;
162 }
163 }
164 }
165
166 self.live_node_count
168 .store(total_nodes as i64, Ordering::Relaxed);
169 self.live_edge_count.store(total_edges, Ordering::Relaxed);
170 *self.edge_type_live_counts.write() = type_counts;
171
172 drop(edge_versions);
173 drop(id_to_edge_type);
174
175 self.compute_statistics();
177 }
178
179 #[must_use]
181 pub fn estimate_label_cardinality(&self, label: &str) -> f64 {
182 self.statistics.read().estimate_label_cardinality(label)
183 }
184
185 #[must_use]
187 pub fn estimate_avg_degree(&self, edge_type: &str, outgoing: bool) -> f64 {
188 self.statistics
189 .read()
190 .estimate_avg_degree(edge_type, outgoing)
191 }
192}
193
194#[cfg(test)]
195mod tests {
196 use super::*;
197
198 fn make_store() -> LpgStore {
199 LpgStore::new().unwrap()
200 }
201
202 #[test]
203 fn compute_statistics_empty_store() {
204 let store = make_store();
205 store.compute_statistics();
206 let stats = store.statistics();
207 assert_eq!(stats.total_nodes, 0);
208 assert_eq!(stats.total_edges, 0);
209 }
210
211 #[test]
212 fn compute_statistics_with_nodes_and_edges() {
213 let store = make_store();
214 let a = store.create_node(&["Person"]);
215 let b = store.create_node(&["Person"]);
216 store.create_edge(a, b, "KNOWS");
217 store.compute_statistics();
218 let stats = store.statistics();
219 assert_eq!(stats.total_nodes, 2);
220 assert_eq!(stats.total_edges, 1);
221 }
222
223 #[test]
224 fn ensure_statistics_fresh_uses_incremental_path_when_not_stale() {
225 let store = make_store();
226 store.create_node(&["X"]);
227 store.ensure_statistics_fresh();
229 assert_eq!(store.statistics().total_nodes, 1);
230 }
231
232 #[test]
233 fn ensure_statistics_fresh_does_full_recompute_when_stale() {
234 let store = make_store();
235 store.create_node(&["Y"]);
236 store
238 .needs_stats_recompute
239 .store(true, std::sync::atomic::Ordering::Relaxed);
240 store.ensure_statistics_fresh();
241 assert_eq!(store.statistics().total_nodes, 1);
242 assert!(
244 !store
245 .needs_stats_recompute
246 .load(std::sync::atomic::Ordering::Relaxed)
247 );
248 }
249
250 #[test]
251 fn estimate_label_cardinality_returns_nonzero_for_known_label() {
252 let store = make_store();
253 store.create_node(&["Doc"]);
254 store.compute_statistics();
255 let card = store.estimate_label_cardinality("Doc");
256 assert!(card > 0.0, "cardinality should be positive, got {card}");
257 }
258
259 #[test]
260 fn estimate_label_cardinality_returns_default_for_unknown_label() {
261 let store = make_store();
262 store.compute_statistics();
263 let card = store.estimate_label_cardinality("NeverSeen");
264 assert!(card >= 0.0);
266 }
267
268 #[test]
269 fn estimate_avg_degree_for_known_edge_type() {
270 let store = make_store();
271 let a = store.create_node(&[]);
272 let b = store.create_node(&[]);
273 store.create_edge(a, b, "FOLLOWS");
274 store.compute_statistics();
275 let deg = store.estimate_avg_degree("FOLLOWS", true);
276 assert!(deg >= 0.0);
277 }
278
279 #[test]
280 fn compute_statistics_zero_nodes_gives_zero_degree() {
281 let store = make_store();
282 store.compute_statistics();
285 let stats = store.statistics();
286 assert_eq!(stats.total_nodes, 0);
288 assert_eq!(stats.total_edges, 0);
289 }
290}