grafeo_core/graph/lpg/store/
statistics.rs1use super::LpgStore;
2use crate::statistics::{EdgeTypeStatistics, LabelStatistics, Statistics};
3use std::sync::Arc;
4use std::sync::atomic::Ordering;
5
6impl LpgStore {
7 #[must_use]
11 pub fn statistics(&self) -> Arc<Statistics> {
12 Arc::clone(&self.statistics.read())
13 }
14
15 #[doc(hidden)]
20 pub fn ensure_statistics_fresh(&self) {
21 if self.needs_stats_recompute.swap(false, Ordering::Relaxed) {
22 self.recompute_statistics_full();
23 } else {
24 self.compute_statistics();
25 }
26 }
27
28 pub(crate) fn compute_statistics(&self) {
34 let mut stats = Statistics::new();
35
36 stats.total_nodes = self.live_node_count.load(Ordering::Relaxed).max(0) as u64;
38 stats.total_edges = self.live_edge_count.load(Ordering::Relaxed).max(0) as u64;
39
40 let id_to_label = self.id_to_label.read();
42 let label_index = self.label_index.read();
43
44 for (label_id, label_name) in id_to_label.iter().enumerate() {
45 let node_count = label_index.get(label_id).map_or(0, |set| set.len() as u64);
46
47 if node_count > 0 {
48 let avg_out_degree = if stats.total_nodes > 0 {
49 stats.total_edges as f64 / stats.total_nodes as f64
50 } else {
51 0.0
52 };
53
54 let label_stats =
55 LabelStatistics::new(node_count).with_degrees(avg_out_degree, avg_out_degree);
56
57 stats.update_label(label_name.as_ref(), label_stats);
58 }
59 }
60
61 let id_to_edge_type = self.id_to_edge_type.read();
63 let edge_type_counts = self.edge_type_live_counts.read();
64
65 for (type_id, type_name) in id_to_edge_type.iter().enumerate() {
66 let count = edge_type_counts.get(type_id).copied().unwrap_or(0).max(0) as u64;
67
68 if count > 0 {
69 let avg_degree = if stats.total_nodes > 0 {
70 count as f64 / stats.total_nodes as f64
71 } else {
72 0.0
73 };
74
75 let edge_stats = EdgeTypeStatistics::new(count, avg_degree, avg_degree);
76 stats.update_edge_type(type_name.as_ref(), edge_stats);
77 }
78 }
79
80 *self.statistics.write() = Arc::new(stats);
81 }
82
83 #[cfg(not(feature = "tiered-storage"))]
86 fn recompute_statistics_full(&self) {
87 let epoch = self.current_epoch();
88
89 let total_nodes = self
91 .nodes
92 .read()
93 .values()
94 .filter_map(|chain| chain.visible_at(epoch))
95 .filter(|r| !r.is_deleted())
96 .count();
97
98 let edges = self.edges.read();
100 let mut total_edges: i64 = 0;
101 let id_to_edge_type = self.id_to_edge_type.read();
102 let mut type_counts = vec![0i64; id_to_edge_type.len()];
103
104 for chain in edges.values() {
105 if let Some(record) = chain.visible_at(epoch)
106 && !record.is_deleted()
107 {
108 total_edges += 1;
109 if (record.type_id as usize) < type_counts.len() {
110 type_counts[record.type_id as usize] += 1;
111 }
112 }
113 }
114
115 self.live_node_count
117 .store(total_nodes as i64, Ordering::Relaxed);
118 self.live_edge_count.store(total_edges, Ordering::Relaxed);
119 *self.edge_type_live_counts.write() = type_counts;
120
121 drop(edges);
122 drop(id_to_edge_type);
123
124 self.compute_statistics();
126 }
127
128 #[cfg(feature = "tiered-storage")]
132 fn recompute_statistics_full(&self) {
133 let epoch = self.current_epoch();
134
135 let versions = self.node_versions.read();
137 let total_nodes = versions
138 .iter()
139 .filter(|(_, index)| {
140 index.visible_at(epoch).map_or(false, |vref| {
141 self.read_node_record(&vref)
142 .map_or(false, |r| !r.is_deleted())
143 })
144 })
145 .count();
146 drop(versions);
147
148 let edge_versions = self.edge_versions.read();
150 let id_to_edge_type = self.id_to_edge_type.read();
151 let mut total_edges: i64 = 0;
152 let mut type_counts = vec![0i64; id_to_edge_type.len()];
153
154 for index in edge_versions.values() {
155 if let Some(vref) = index.visible_at(epoch)
156 && let Some(record) = self.read_edge_record(&vref)
157 && !record.is_deleted()
158 {
159 total_edges += 1;
160 if (record.type_id as usize) < type_counts.len() {
161 type_counts[record.type_id as usize] += 1;
162 }
163 }
164 }
165
166 self.live_node_count
168 .store(total_nodes as i64, Ordering::Relaxed);
169 self.live_edge_count.store(total_edges, Ordering::Relaxed);
170 *self.edge_type_live_counts.write() = type_counts;
171
172 drop(edge_versions);
173 drop(id_to_edge_type);
174
175 self.compute_statistics();
177 }
178
179 #[must_use]
181 pub fn estimate_label_cardinality(&self, label: &str) -> f64 {
182 self.statistics.read().estimate_label_cardinality(label)
183 }
184
185 #[must_use]
187 pub fn estimate_avg_degree(&self, edge_type: &str, outgoing: bool) -> f64 {
188 self.statistics
189 .read()
190 .estimate_avg_degree(edge_type, outgoing)
191 }
192}