grafeo_core/graph/lpg/store/
statistics.rs1use super::LpgStore;
2use crate::statistics::{EdgeTypeStatistics, LabelStatistics, Statistics};
3use std::sync::Arc;
4use std::sync::atomic::Ordering;
5
6impl LpgStore {
7 #[must_use]
11 pub fn statistics(&self) -> Arc<Statistics> {
12 Arc::clone(&self.statistics.read())
13 }
14
15 #[doc(hidden)]
20 pub fn ensure_statistics_fresh(&self) {
21 if self.needs_stats_recompute.swap(false, Ordering::Relaxed) {
22 self.recompute_statistics_full();
23 } else {
24 self.compute_statistics();
25 }
26 }
27
28 pub(crate) fn compute_statistics(&self) {
34 let mut stats = Statistics::new();
35
36 #[allow(clippy::cast_sign_loss)]
39 {
40 stats.total_nodes = self.live_node_count.load(Ordering::Relaxed).max(0) as u64;
41 }
42 #[allow(clippy::cast_sign_loss)]
44 {
45 stats.total_edges = self.live_edge_count.load(Ordering::Relaxed).max(0) as u64;
46 }
47
48 let registry = self.label_registry.read();
50 let label_index = self.label_index.read();
51
52 for (label_id, label_name) in registry.names().iter().enumerate() {
53 let node_count = label_index.get(label_id).map_or(0, |set| set.len() as u64);
54
55 if node_count > 0 {
56 let avg_out_degree = if stats.total_nodes > 0 {
57 stats.total_edges as f64 / stats.total_nodes as f64
58 } else {
59 0.0
60 };
61
62 let label_stats =
63 LabelStatistics::new(node_count).with_degrees(avg_out_degree, avg_out_degree);
64
65 stats.update_label(label_name.as_ref(), label_stats);
66 }
67 }
68
69 let id_to_edge_type = self.id_to_edge_type.read();
71 let edge_type_counts = self.edge_type_live_counts.read();
72
73 for (type_id, type_name) in id_to_edge_type.iter().enumerate() {
74 #[allow(clippy::cast_sign_loss)]
76 let count = edge_type_counts.get(type_id).copied().unwrap_or(0).max(0) as u64;
77
78 if count > 0 {
79 let avg_degree = if stats.total_nodes > 0 {
80 count as f64 / stats.total_nodes as f64
81 } else {
82 0.0
83 };
84
85 let edge_stats = EdgeTypeStatistics::new(count, avg_degree, avg_degree);
86 stats.update_edge_type(type_name.as_ref(), edge_stats);
87 }
88 }
89
90 *self.statistics.write() = Arc::new(stats);
91 }
92
93 #[cfg(not(feature = "tiered-storage"))]
96 fn recompute_statistics_full(&self) {
97 let epoch = self.current_epoch();
98
99 let total_nodes = self
101 .nodes
102 .read()
103 .values()
104 .filter_map(|chain| chain.visible_at(epoch))
105 .filter(|r| !r.is_deleted())
106 .count();
107
108 let edges = self.edges.read();
110 let mut total_edges: i64 = 0;
111 let id_to_edge_type = self.id_to_edge_type.read();
112 let mut type_counts = vec![0i64; id_to_edge_type.len()];
113
114 for chain in edges.values() {
115 if let Some(record) = chain.visible_at(epoch)
116 && !record.is_deleted()
117 {
118 total_edges += 1;
119 if (record.type_id as usize) < type_counts.len() {
120 type_counts[record.type_id as usize] += 1;
121 }
122 }
123 }
124
125 self.live_node_count.store(
127 i64::try_from(total_nodes).unwrap_or(i64::MAX),
128 Ordering::Relaxed,
129 );
130 self.live_edge_count.store(total_edges, Ordering::Relaxed);
131 *self.edge_type_live_counts.write() = type_counts;
132
133 drop(edges);
134 drop(id_to_edge_type);
135
136 self.compute_statistics();
138 }
139
140 #[cfg(feature = "tiered-storage")]
144 fn recompute_statistics_full(&self) {
145 let epoch = self.current_epoch();
146
147 let versions = self.node_versions.read();
149 let total_nodes = versions
150 .iter()
151 .filter(|(_, index)| {
152 index.visible_at(epoch).map_or(false, |vref| {
153 self.read_node_record(&vref)
154 .map_or(false, |r| !r.is_deleted())
155 })
156 })
157 .count();
158 drop(versions);
159
160 let edge_versions = self.edge_versions.read();
162 let id_to_edge_type = self.id_to_edge_type.read();
163 let mut total_edges: i64 = 0;
164 let mut type_counts = vec![0i64; id_to_edge_type.len()];
165
166 for index in edge_versions.values() {
167 if let Some(vref) = index.visible_at(epoch)
168 && let Some(record) = self.read_edge_record(&vref)
169 && !record.is_deleted()
170 {
171 total_edges += 1;
172 if (record.type_id as usize) < type_counts.len() {
173 type_counts[record.type_id as usize] += 1;
174 }
175 }
176 }
177
178 self.live_node_count.store(
180 i64::try_from(total_nodes).unwrap_or(i64::MAX),
181 Ordering::Relaxed,
182 );
183 self.live_edge_count.store(total_edges, Ordering::Relaxed);
184 *self.edge_type_live_counts.write() = type_counts;
185
186 drop(edge_versions);
187 drop(id_to_edge_type);
188
189 self.compute_statistics();
191 }
192
193 #[must_use]
195 pub fn estimate_label_cardinality(&self, label: &str) -> f64 {
196 self.statistics.read().estimate_label_cardinality(label)
197 }
198
199 #[must_use]
201 pub fn estimate_avg_degree(&self, edge_type: &str, outgoing: bool) -> f64 {
202 self.statistics
203 .read()
204 .estimate_avg_degree(edge_type, outgoing)
205 }
206}
207
208#[cfg(test)]
209mod tests {
210 use super::*;
211
212 fn make_store() -> LpgStore {
213 LpgStore::new().unwrap()
214 }
215
216 #[test]
217 fn compute_statistics_empty_store() {
218 let store = make_store();
219 store.compute_statistics();
220 let stats = store.statistics();
221 assert_eq!(stats.total_nodes, 0);
222 assert_eq!(stats.total_edges, 0);
223 }
224
225 #[test]
226 fn compute_statistics_with_nodes_and_edges() {
227 let store = make_store();
228 let a = store.create_node(&["Person"]);
229 let b = store.create_node(&["Person"]);
230 store.create_edge(a, b, "KNOWS");
231 store.compute_statistics();
232 let stats = store.statistics();
233 assert_eq!(stats.total_nodes, 2);
234 assert_eq!(stats.total_edges, 1);
235 }
236
237 #[test]
238 fn ensure_statistics_fresh_uses_incremental_path_when_not_stale() {
239 let store = make_store();
240 store.create_node(&["X"]);
241 store.ensure_statistics_fresh();
243 assert_eq!(store.statistics().total_nodes, 1);
244 }
245
246 #[test]
247 fn ensure_statistics_fresh_does_full_recompute_when_stale() {
248 let store = make_store();
249 store.create_node(&["Y"]);
250 store
252 .needs_stats_recompute
253 .store(true, std::sync::atomic::Ordering::Relaxed);
254 store.ensure_statistics_fresh();
255 assert_eq!(store.statistics().total_nodes, 1);
256 assert!(
258 !store
259 .needs_stats_recompute
260 .load(std::sync::atomic::Ordering::Relaxed)
261 );
262 }
263
264 #[test]
265 fn estimate_label_cardinality_returns_nonzero_for_known_label() {
266 let store = make_store();
267 store.create_node(&["Doc"]);
268 store.compute_statistics();
269 let card = store.estimate_label_cardinality("Doc");
270 assert!(card > 0.0, "cardinality should be positive, got {card}");
271 }
272
273 #[test]
274 fn estimate_label_cardinality_returns_default_for_unknown_label() {
275 let store = make_store();
276 store.compute_statistics();
277 let card = store.estimate_label_cardinality("NeverSeen");
278 assert!(card >= 0.0);
280 }
281
282 #[test]
283 fn estimate_avg_degree_for_known_edge_type() {
284 let store = make_store();
285 let a = store.create_node(&[]);
286 let b = store.create_node(&[]);
287 store.create_edge(a, b, "FOLLOWS");
288 store.compute_statistics();
289 let deg = store.estimate_avg_degree("FOLLOWS", true);
290 assert!(deg >= 0.0);
291 }
292
293 #[test]
294 fn compute_statistics_zero_nodes_gives_zero_degree() {
295 let store = make_store();
296 store.compute_statistics();
299 let stats = store.statistics();
300 assert_eq!(stats.total_nodes, 0);
302 assert_eq!(stats.total_edges, 0);
303 }
304}