grafeo_core/graph/lpg/store/
statistics.rs1use super::LpgStore;
2use crate::statistics::{EdgeTypeStatistics, LabelStatistics, Statistics};
3use std::sync::Arc;
4use std::sync::atomic::Ordering;
5
6impl LpgStore {
7 #[must_use]
11 pub fn statistics(&self) -> Arc<Statistics> {
12 Arc::clone(&self.statistics.read())
13 }
14
15 pub fn ensure_statistics_fresh(&self) {
20 if self.needs_stats_recompute.swap(false, Ordering::Relaxed) {
21 self.recompute_statistics_full();
22 } else {
23 self.compute_statistics();
24 }
25 }
26
27 pub fn compute_statistics(&self) {
33 let mut stats = Statistics::new();
34
35 stats.total_nodes = self.live_node_count.load(Ordering::Relaxed).max(0) as u64;
37 stats.total_edges = self.live_edge_count.load(Ordering::Relaxed).max(0) as u64;
38
39 let id_to_label = self.id_to_label.read();
41 let label_index = self.label_index.read();
42
43 for (label_id, label_name) in id_to_label.iter().enumerate() {
44 let node_count = label_index.get(label_id).map_or(0, |set| set.len() as u64);
45
46 if node_count > 0 {
47 let avg_out_degree = if stats.total_nodes > 0 {
48 stats.total_edges as f64 / stats.total_nodes as f64
49 } else {
50 0.0
51 };
52
53 let label_stats =
54 LabelStatistics::new(node_count).with_degrees(avg_out_degree, avg_out_degree);
55
56 stats.update_label(label_name.as_ref(), label_stats);
57 }
58 }
59
60 let id_to_edge_type = self.id_to_edge_type.read();
62 let edge_type_counts = self.edge_type_live_counts.read();
63
64 for (type_id, type_name) in id_to_edge_type.iter().enumerate() {
65 let count = edge_type_counts.get(type_id).copied().unwrap_or(0).max(0) as u64;
66
67 if count > 0 {
68 let avg_degree = if stats.total_nodes > 0 {
69 count as f64 / stats.total_nodes as f64
70 } else {
71 0.0
72 };
73
74 let edge_stats = EdgeTypeStatistics::new(count, avg_degree, avg_degree);
75 stats.update_edge_type(type_name.as_ref(), edge_stats);
76 }
77 }
78
79 *self.statistics.write() = Arc::new(stats);
80 }
81
82 #[cfg(not(feature = "tiered-storage"))]
85 fn recompute_statistics_full(&self) {
86 let epoch = self.current_epoch();
87
88 let total_nodes = self
90 .nodes
91 .read()
92 .values()
93 .filter_map(|chain| chain.visible_at(epoch))
94 .filter(|r| !r.is_deleted())
95 .count();
96
97 let edges = self.edges.read();
99 let mut total_edges: i64 = 0;
100 let id_to_edge_type = self.id_to_edge_type.read();
101 let mut type_counts = vec![0i64; id_to_edge_type.len()];
102
103 for chain in edges.values() {
104 if let Some(record) = chain.visible_at(epoch)
105 && !record.is_deleted()
106 {
107 total_edges += 1;
108 if (record.type_id as usize) < type_counts.len() {
109 type_counts[record.type_id as usize] += 1;
110 }
111 }
112 }
113
114 self.live_node_count
116 .store(total_nodes as i64, Ordering::Relaxed);
117 self.live_edge_count.store(total_edges, Ordering::Relaxed);
118 *self.edge_type_live_counts.write() = type_counts;
119
120 drop(edges);
121 drop(id_to_edge_type);
122
123 self.compute_statistics();
125 }
126
127 #[cfg(feature = "tiered-storage")]
131 fn recompute_statistics_full(&self) {
132 let epoch = self.current_epoch();
133
134 let versions = self.node_versions.read();
136 let total_nodes = versions
137 .iter()
138 .filter(|(_, index)| {
139 index.visible_at(epoch).map_or(false, |vref| {
140 self.read_node_record(&vref)
141 .map_or(false, |r| !r.is_deleted())
142 })
143 })
144 .count();
145 drop(versions);
146
147 let edge_versions = self.edge_versions.read();
149 let id_to_edge_type = self.id_to_edge_type.read();
150 let mut total_edges: i64 = 0;
151 let mut type_counts = vec![0i64; id_to_edge_type.len()];
152
153 for index in edge_versions.values() {
154 if let Some(vref) = index.visible_at(epoch)
155 && let Some(record) = self.read_edge_record(&vref)
156 && !record.is_deleted()
157 {
158 total_edges += 1;
159 if (record.type_id as usize) < type_counts.len() {
160 type_counts[record.type_id as usize] += 1;
161 }
162 }
163 }
164
165 self.live_node_count
167 .store(total_nodes as i64, Ordering::Relaxed);
168 self.live_edge_count.store(total_edges, Ordering::Relaxed);
169 *self.edge_type_live_counts.write() = type_counts;
170
171 drop(edge_versions);
172 drop(id_to_edge_type);
173
174 self.compute_statistics();
176 }
177
178 #[must_use]
180 pub fn estimate_label_cardinality(&self, label: &str) -> f64 {
181 self.statistics.read().estimate_label_cardinality(label)
182 }
183
184 #[must_use]
186 pub fn estimate_avg_degree(&self, edge_type: &str, outgoing: bool) -> f64 {
187 self.statistics
188 .read()
189 .estimate_avg_degree(edge_type, outgoing)
190 }
191}