Skip to main content

grafeo_core/graph/lpg/store/
statistics.rs

1use super::LpgStore;
2use crate::statistics::{EdgeTypeStatistics, LabelStatistics, Statistics};
3use std::sync::Arc;
4use std::sync::atomic::Ordering;
5
6impl LpgStore {
7    // === Statistics ===
8
9    /// Returns the current statistics (cheap `Arc` clone, no deep copy).
10    #[must_use]
11    pub fn statistics(&self) -> Arc<Statistics> {
12        Arc::clone(&self.statistics.read())
13    }
14
15    /// Recomputes statistics if they are stale (i.e., after mutations).
16    ///
17    /// Call this before reading statistics for query optimization.
18    /// Avoids redundant recomputation if no mutations occurred.
19    #[doc(hidden)]
20    pub fn ensure_statistics_fresh(&self) {
21        if self.needs_stats_recompute.swap(false, Ordering::Relaxed) {
22            self.recompute_statistics_full();
23        } else {
24            self.compute_statistics();
25        }
26    }
27
28    /// Recomputes statistics from incremental counters.
29    ///
30    /// Reads live node/edge counts from atomic counters and per-label counts
31    /// from the label index. This is O(|labels| + |edge_types|) instead of
32    /// O(n + m) for a full scan.
33    pub(crate) fn compute_statistics(&self) {
34        let mut stats = Statistics::new();
35
36        // Read total counts from atomic counters
37        // reason: clamped to >= 0 by max(0), safe to cast to u64
38        #[allow(clippy::cast_sign_loss)]
39        {
40            stats.total_nodes = self.live_node_count.load(Ordering::Relaxed).max(0) as u64;
41        }
42        // reason: clamped to >= 0 by max(0), safe to cast to u64
43        #[allow(clippy::cast_sign_loss)]
44        {
45            stats.total_edges = self.live_edge_count.load(Ordering::Relaxed).max(0) as u64;
46        }
47
48        // Compute per-label statistics from label_index (each is O(1) via .len())
49        let registry = self.label_registry.read();
50        let label_index = self.label_index.read();
51
52        for (label_id, label_name) in registry.names().iter().enumerate() {
53            let node_count = label_index.get(label_id).map_or(0, |set| set.len() as u64);
54
55            if node_count > 0 {
56                let avg_out_degree = if stats.total_nodes > 0 {
57                    stats.total_edges as f64 / stats.total_nodes as f64
58                } else {
59                    0.0
60                };
61
62                let label_stats =
63                    LabelStatistics::new(node_count).with_degrees(avg_out_degree, avg_out_degree);
64
65                stats.update_label(label_name.as_ref(), label_stats);
66            }
67        }
68
69        // Compute per-edge-type statistics from incremental counts
70        let id_to_edge_type = self.id_to_edge_type.read();
71        let edge_type_counts = self.edge_type_live_counts.read();
72
73        for (type_id, type_name) in id_to_edge_type.iter().enumerate() {
74            // reason: clamped to >= 0 by max(0), safe to cast to u64
75            #[allow(clippy::cast_sign_loss)]
76            let count = edge_type_counts.get(type_id).copied().unwrap_or(0).max(0) as u64;
77
78            if count > 0 {
79                let avg_degree = if stats.total_nodes > 0 {
80                    count as f64 / stats.total_nodes as f64
81                } else {
82                    0.0
83                };
84
85                let edge_stats = EdgeTypeStatistics::new(count, avg_degree, avg_degree);
86                stats.update_edge_type(type_name.as_ref(), edge_stats);
87            }
88        }
89
90        *self.statistics.write() = Arc::new(stats);
91    }
92
93    /// Full recomputation from storage: used after rollback when counters
94    /// may be out of sync. Also resyncs the atomic counters.
95    #[cfg(not(feature = "tiered-storage"))]
96    fn recompute_statistics_full(&self) {
97        let epoch = self.current_epoch();
98
99        // Full-scan node count
100        let total_nodes = self
101            .nodes
102            .read()
103            .values()
104            .filter_map(|chain| chain.visible_at(epoch))
105            .filter(|r| !r.is_deleted())
106            .count();
107
108        // Full-scan edge count and per-type counts
109        let edges = self.edges.read();
110        let mut total_edges: i64 = 0;
111        let id_to_edge_type = self.id_to_edge_type.read();
112        let mut type_counts = vec![0i64; id_to_edge_type.len()];
113
114        for chain in edges.values() {
115            if let Some(record) = chain.visible_at(epoch)
116                && !record.is_deleted()
117            {
118                total_edges += 1;
119                if (record.type_id as usize) < type_counts.len() {
120                    type_counts[record.type_id as usize] += 1;
121                }
122            }
123        }
124
125        // Resync the atomic counters
126        self.live_node_count.store(
127            i64::try_from(total_nodes).unwrap_or(i64::MAX),
128            Ordering::Relaxed,
129        );
130        self.live_edge_count.store(total_edges, Ordering::Relaxed);
131        *self.edge_type_live_counts.write() = type_counts;
132
133        drop(edges);
134        drop(id_to_edge_type);
135
136        // Now use the normal incremental path to build statistics
137        self.compute_statistics();
138    }
139
140    /// Full recomputation from storage: used after rollback when counters
141    /// may be out of sync. Also resyncs the atomic counters.
142    /// (Tiered storage version)
143    #[cfg(feature = "tiered-storage")]
144    fn recompute_statistics_full(&self) {
145        let epoch = self.current_epoch();
146
147        // Full-scan node count
148        let versions = self.node_versions.read();
149        let total_nodes = versions
150            .iter()
151            .filter(|(_, index)| {
152                index.visible_at(epoch).map_or(false, |vref| {
153                    self.read_node_record(&vref)
154                        .map_or(false, |r| !r.is_deleted())
155                })
156            })
157            .count();
158        drop(versions);
159
160        // Full-scan edge count and per-type counts
161        let edge_versions = self.edge_versions.read();
162        let id_to_edge_type = self.id_to_edge_type.read();
163        let mut total_edges: i64 = 0;
164        let mut type_counts = vec![0i64; id_to_edge_type.len()];
165
166        for index in edge_versions.values() {
167            if let Some(vref) = index.visible_at(epoch)
168                && let Some(record) = self.read_edge_record(&vref)
169                && !record.is_deleted()
170            {
171                total_edges += 1;
172                if (record.type_id as usize) < type_counts.len() {
173                    type_counts[record.type_id as usize] += 1;
174                }
175            }
176        }
177
178        // Resync the atomic counters
179        self.live_node_count.store(
180            i64::try_from(total_nodes).unwrap_or(i64::MAX),
181            Ordering::Relaxed,
182        );
183        self.live_edge_count.store(total_edges, Ordering::Relaxed);
184        *self.edge_type_live_counts.write() = type_counts;
185
186        drop(edge_versions);
187        drop(id_to_edge_type);
188
189        // Now use the normal incremental path to build statistics
190        self.compute_statistics();
191    }
192
193    /// Estimates cardinality for a label scan.
194    #[must_use]
195    pub fn estimate_label_cardinality(&self, label: &str) -> f64 {
196        self.statistics.read().estimate_label_cardinality(label)
197    }
198
199    /// Estimates average degree for an edge type.
200    #[must_use]
201    pub fn estimate_avg_degree(&self, edge_type: &str, outgoing: bool) -> f64 {
202        self.statistics
203            .read()
204            .estimate_avg_degree(edge_type, outgoing)
205    }
206}
207
208#[cfg(test)]
209mod tests {
210    use super::*;
211
212    fn make_store() -> LpgStore {
213        LpgStore::new().unwrap()
214    }
215
216    #[test]
217    fn compute_statistics_empty_store() {
218        let store = make_store();
219        store.compute_statistics();
220        let stats = store.statistics();
221        assert_eq!(stats.total_nodes, 0);
222        assert_eq!(stats.total_edges, 0);
223    }
224
225    #[test]
226    fn compute_statistics_with_nodes_and_edges() {
227        let store = make_store();
228        let a = store.create_node(&["Person"]);
229        let b = store.create_node(&["Person"]);
230        store.create_edge(a, b, "KNOWS");
231        store.compute_statistics();
232        let stats = store.statistics();
233        assert_eq!(stats.total_nodes, 2);
234        assert_eq!(stats.total_edges, 1);
235    }
236
237    #[test]
238    fn ensure_statistics_fresh_uses_incremental_path_when_not_stale() {
239        let store = make_store();
240        store.create_node(&["X"]);
241        // No mutation flag set, should use incremental path
242        store.ensure_statistics_fresh();
243        assert_eq!(store.statistics().total_nodes, 1);
244    }
245
246    #[test]
247    fn ensure_statistics_fresh_does_full_recompute_when_stale() {
248        let store = make_store();
249        store.create_node(&["Y"]);
250        // Force the stale flag
251        store
252            .needs_stats_recompute
253            .store(true, std::sync::atomic::Ordering::Relaxed);
254        store.ensure_statistics_fresh();
255        assert_eq!(store.statistics().total_nodes, 1);
256        // Flag should now be cleared
257        assert!(
258            !store
259                .needs_stats_recompute
260                .load(std::sync::atomic::Ordering::Relaxed)
261        );
262    }
263
264    #[test]
265    fn estimate_label_cardinality_returns_nonzero_for_known_label() {
266        let store = make_store();
267        store.create_node(&["Doc"]);
268        store.compute_statistics();
269        let card = store.estimate_label_cardinality("Doc");
270        assert!(card > 0.0, "cardinality should be positive, got {card}");
271    }
272
273    #[test]
274    fn estimate_label_cardinality_returns_default_for_unknown_label() {
275        let store = make_store();
276        store.compute_statistics();
277        let card = store.estimate_label_cardinality("NeverSeen");
278        // Default estimate should be small but non-negative
279        assert!(card >= 0.0);
280    }
281
282    #[test]
283    fn estimate_avg_degree_for_known_edge_type() {
284        let store = make_store();
285        let a = store.create_node(&[]);
286        let b = store.create_node(&[]);
287        store.create_edge(a, b, "FOLLOWS");
288        store.compute_statistics();
289        let deg = store.estimate_avg_degree("FOLLOWS", true);
290        assert!(deg >= 0.0);
291    }
292
293    #[test]
294    fn compute_statistics_zero_nodes_gives_zero_degree() {
295        let store = make_store();
296        // Manually add an edge type count without nodes by using the store
297        // with an empty graph — avg_degree branch when total_nodes == 0
298        store.compute_statistics();
299        let stats = store.statistics();
300        // No labels or edge types should be present
301        assert_eq!(stats.total_nodes, 0);
302        assert_eq!(stats.total_edges, 0);
303    }
304}