Skip to main content

grafeo_core/graph/lpg/store/
statistics.rs

1use super::LpgStore;
2use crate::statistics::{EdgeTypeStatistics, LabelStatistics, Statistics};
3use std::sync::Arc;
4use std::sync::atomic::Ordering;
5
6impl LpgStore {
7    // === Statistics ===
8
9    /// Returns the current statistics (cheap `Arc` clone, no deep copy).
10    #[must_use]
11    pub fn statistics(&self) -> Arc<Statistics> {
12        Arc::clone(&self.statistics.read())
13    }
14
15    /// Recomputes statistics if they are stale (i.e., after mutations).
16    ///
17    /// Call this before reading statistics for query optimization.
18    /// Avoids redundant recomputation if no mutations occurred.
19    #[doc(hidden)]
20    pub fn ensure_statistics_fresh(&self) {
21        if self.needs_stats_recompute.swap(false, Ordering::Relaxed) {
22            self.recompute_statistics_full();
23        } else {
24            self.compute_statistics();
25        }
26    }
27
28    /// Recomputes statistics from incremental counters.
29    ///
30    /// Reads live node/edge counts from atomic counters and per-label counts
31    /// from the label index. This is O(|labels| + |edge_types|) instead of
32    /// O(n + m) for a full scan.
33    pub(crate) fn compute_statistics(&self) {
34        let mut stats = Statistics::new();
35
36        // Read total counts from atomic counters
37        stats.total_nodes = self.live_node_count.load(Ordering::Relaxed).max(0) as u64;
38        stats.total_edges = self.live_edge_count.load(Ordering::Relaxed).max(0) as u64;
39
40        // Compute per-label statistics from label_index (each is O(1) via .len())
41        let id_to_label = self.id_to_label.read();
42        let label_index = self.label_index.read();
43
44        for (label_id, label_name) in id_to_label.iter().enumerate() {
45            let node_count = label_index.get(label_id).map_or(0, |set| set.len() as u64);
46
47            if node_count > 0 {
48                let avg_out_degree = if stats.total_nodes > 0 {
49                    stats.total_edges as f64 / stats.total_nodes as f64
50                } else {
51                    0.0
52                };
53
54                let label_stats =
55                    LabelStatistics::new(node_count).with_degrees(avg_out_degree, avg_out_degree);
56
57                stats.update_label(label_name.as_ref(), label_stats);
58            }
59        }
60
61        // Compute per-edge-type statistics from incremental counts
62        let id_to_edge_type = self.id_to_edge_type.read();
63        let edge_type_counts = self.edge_type_live_counts.read();
64
65        for (type_id, type_name) in id_to_edge_type.iter().enumerate() {
66            let count = edge_type_counts.get(type_id).copied().unwrap_or(0).max(0) as u64;
67
68            if count > 0 {
69                let avg_degree = if stats.total_nodes > 0 {
70                    count as f64 / stats.total_nodes as f64
71                } else {
72                    0.0
73                };
74
75                let edge_stats = EdgeTypeStatistics::new(count, avg_degree, avg_degree);
76                stats.update_edge_type(type_name.as_ref(), edge_stats);
77            }
78        }
79
80        *self.statistics.write() = Arc::new(stats);
81    }
82
83    /// Full recomputation from storage: used after rollback when counters
84    /// may be out of sync. Also resyncs the atomic counters.
85    #[cfg(not(feature = "tiered-storage"))]
86    fn recompute_statistics_full(&self) {
87        let epoch = self.current_epoch();
88
89        // Full-scan node count
90        let total_nodes = self
91            .nodes
92            .read()
93            .values()
94            .filter_map(|chain| chain.visible_at(epoch))
95            .filter(|r| !r.is_deleted())
96            .count();
97
98        // Full-scan edge count and per-type counts
99        let edges = self.edges.read();
100        let mut total_edges: i64 = 0;
101        let id_to_edge_type = self.id_to_edge_type.read();
102        let mut type_counts = vec![0i64; id_to_edge_type.len()];
103
104        for chain in edges.values() {
105            if let Some(record) = chain.visible_at(epoch)
106                && !record.is_deleted()
107            {
108                total_edges += 1;
109                if (record.type_id as usize) < type_counts.len() {
110                    type_counts[record.type_id as usize] += 1;
111                }
112            }
113        }
114
115        // Resync the atomic counters
116        self.live_node_count
117            .store(total_nodes as i64, Ordering::Relaxed);
118        self.live_edge_count.store(total_edges, Ordering::Relaxed);
119        *self.edge_type_live_counts.write() = type_counts;
120
121        drop(edges);
122        drop(id_to_edge_type);
123
124        // Now use the normal incremental path to build statistics
125        self.compute_statistics();
126    }
127
128    /// Full recomputation from storage: used after rollback when counters
129    /// may be out of sync. Also resyncs the atomic counters.
130    /// (Tiered storage version)
131    #[cfg(feature = "tiered-storage")]
132    fn recompute_statistics_full(&self) {
133        let epoch = self.current_epoch();
134
135        // Full-scan node count
136        let versions = self.node_versions.read();
137        let total_nodes = versions
138            .iter()
139            .filter(|(_, index)| {
140                index.visible_at(epoch).map_or(false, |vref| {
141                    self.read_node_record(&vref)
142                        .map_or(false, |r| !r.is_deleted())
143                })
144            })
145            .count();
146        drop(versions);
147
148        // Full-scan edge count and per-type counts
149        let edge_versions = self.edge_versions.read();
150        let id_to_edge_type = self.id_to_edge_type.read();
151        let mut total_edges: i64 = 0;
152        let mut type_counts = vec![0i64; id_to_edge_type.len()];
153
154        for index in edge_versions.values() {
155            if let Some(vref) = index.visible_at(epoch)
156                && let Some(record) = self.read_edge_record(&vref)
157                && !record.is_deleted()
158            {
159                total_edges += 1;
160                if (record.type_id as usize) < type_counts.len() {
161                    type_counts[record.type_id as usize] += 1;
162                }
163            }
164        }
165
166        // Resync the atomic counters
167        self.live_node_count
168            .store(total_nodes as i64, Ordering::Relaxed);
169        self.live_edge_count.store(total_edges, Ordering::Relaxed);
170        *self.edge_type_live_counts.write() = type_counts;
171
172        drop(edge_versions);
173        drop(id_to_edge_type);
174
175        // Now use the normal incremental path to build statistics
176        self.compute_statistics();
177    }
178
179    /// Estimates cardinality for a label scan.
180    #[must_use]
181    pub fn estimate_label_cardinality(&self, label: &str) -> f64 {
182        self.statistics.read().estimate_label_cardinality(label)
183    }
184
185    /// Estimates average degree for an edge type.
186    #[must_use]
187    pub fn estimate_avg_degree(&self, edge_type: &str, outgoing: bool) -> f64 {
188        self.statistics
189            .read()
190            .estimate_avg_degree(edge_type, outgoing)
191    }
192}
193
194#[cfg(test)]
195mod tests {
196    use super::*;
197
198    fn make_store() -> LpgStore {
199        LpgStore::new().unwrap()
200    }
201
202    #[test]
203    fn compute_statistics_empty_store() {
204        let store = make_store();
205        store.compute_statistics();
206        let stats = store.statistics();
207        assert_eq!(stats.total_nodes, 0);
208        assert_eq!(stats.total_edges, 0);
209    }
210
211    #[test]
212    fn compute_statistics_with_nodes_and_edges() {
213        let store = make_store();
214        let a = store.create_node(&["Person"]);
215        let b = store.create_node(&["Person"]);
216        store.create_edge(a, b, "KNOWS");
217        store.compute_statistics();
218        let stats = store.statistics();
219        assert_eq!(stats.total_nodes, 2);
220        assert_eq!(stats.total_edges, 1);
221    }
222
223    #[test]
224    fn ensure_statistics_fresh_uses_incremental_path_when_not_stale() {
225        let store = make_store();
226        store.create_node(&["X"]);
227        // No mutation flag set, should use incremental path
228        store.ensure_statistics_fresh();
229        assert_eq!(store.statistics().total_nodes, 1);
230    }
231
232    #[test]
233    fn ensure_statistics_fresh_does_full_recompute_when_stale() {
234        let store = make_store();
235        store.create_node(&["Y"]);
236        // Force the stale flag
237        store
238            .needs_stats_recompute
239            .store(true, std::sync::atomic::Ordering::Relaxed);
240        store.ensure_statistics_fresh();
241        assert_eq!(store.statistics().total_nodes, 1);
242        // Flag should now be cleared
243        assert!(
244            !store
245                .needs_stats_recompute
246                .load(std::sync::atomic::Ordering::Relaxed)
247        );
248    }
249
250    #[test]
251    fn estimate_label_cardinality_returns_nonzero_for_known_label() {
252        let store = make_store();
253        store.create_node(&["Doc"]);
254        store.compute_statistics();
255        let card = store.estimate_label_cardinality("Doc");
256        assert!(card > 0.0, "cardinality should be positive, got {card}");
257    }
258
259    #[test]
260    fn estimate_label_cardinality_returns_default_for_unknown_label() {
261        let store = make_store();
262        store.compute_statistics();
263        let card = store.estimate_label_cardinality("NeverSeen");
264        // Default estimate should be small but non-negative
265        assert!(card >= 0.0);
266    }
267
268    #[test]
269    fn estimate_avg_degree_for_known_edge_type() {
270        let store = make_store();
271        let a = store.create_node(&[]);
272        let b = store.create_node(&[]);
273        store.create_edge(a, b, "FOLLOWS");
274        store.compute_statistics();
275        let deg = store.estimate_avg_degree("FOLLOWS", true);
276        assert!(deg >= 0.0);
277    }
278
279    #[test]
280    fn compute_statistics_zero_nodes_gives_zero_degree() {
281        let store = make_store();
282        // Manually add an edge type count without nodes by using the store
283        // with an empty graph — avg_degree branch when total_nodes == 0
284        store.compute_statistics();
285        let stats = store.statistics();
286        // No labels or edge types should be present
287        assert_eq!(stats.total_nodes, 0);
288        assert_eq!(stats.total_edges, 0);
289    }
290}