Skip to main content

grafeo_core/graph/lpg/store/
statistics.rs

1use super::LpgStore;
2use crate::statistics::{EdgeTypeStatistics, LabelStatistics, Statistics};
3use std::sync::Arc;
4use std::sync::atomic::Ordering;
5
6impl LpgStore {
7    // === Statistics ===
8
9    /// Returns the current statistics (cheap `Arc` clone, no deep copy).
10    #[must_use]
11    pub fn statistics(&self) -> Arc<Statistics> {
12        Arc::clone(&self.statistics.read())
13    }
14
15    /// Recomputes statistics if they are stale (i.e., after mutations).
16    ///
17    /// Call this before reading statistics for query optimization.
18    /// Avoids redundant recomputation if no mutations occurred.
19    #[doc(hidden)]
20    pub fn ensure_statistics_fresh(&self) {
21        if self.needs_stats_recompute.swap(false, Ordering::Relaxed) {
22            self.recompute_statistics_full();
23        } else {
24            self.compute_statistics();
25        }
26    }
27
28    /// Recomputes statistics from incremental counters.
29    ///
30    /// Reads live node/edge counts from atomic counters and per-label counts
31    /// from the label index. This is O(|labels| + |edge_types|) instead of
32    /// O(n + m) for a full scan.
33    pub(crate) fn compute_statistics(&self) {
34        let mut stats = Statistics::new();
35
36        // Read total counts from atomic counters
37        stats.total_nodes = self.live_node_count.load(Ordering::Relaxed).max(0) as u64;
38        stats.total_edges = self.live_edge_count.load(Ordering::Relaxed).max(0) as u64;
39
40        // Compute per-label statistics from label_index (each is O(1) via .len())
41        let id_to_label = self.id_to_label.read();
42        let label_index = self.label_index.read();
43
44        for (label_id, label_name) in id_to_label.iter().enumerate() {
45            let node_count = label_index.get(label_id).map_or(0, |set| set.len() as u64);
46
47            if node_count > 0 {
48                let avg_out_degree = if stats.total_nodes > 0 {
49                    stats.total_edges as f64 / stats.total_nodes as f64
50                } else {
51                    0.0
52                };
53
54                let label_stats =
55                    LabelStatistics::new(node_count).with_degrees(avg_out_degree, avg_out_degree);
56
57                stats.update_label(label_name.as_ref(), label_stats);
58            }
59        }
60
61        // Compute per-edge-type statistics from incremental counts
62        let id_to_edge_type = self.id_to_edge_type.read();
63        let edge_type_counts = self.edge_type_live_counts.read();
64
65        for (type_id, type_name) in id_to_edge_type.iter().enumerate() {
66            let count = edge_type_counts.get(type_id).copied().unwrap_or(0).max(0) as u64;
67
68            if count > 0 {
69                let avg_degree = if stats.total_nodes > 0 {
70                    count as f64 / stats.total_nodes as f64
71                } else {
72                    0.0
73                };
74
75                let edge_stats = EdgeTypeStatistics::new(count, avg_degree, avg_degree);
76                stats.update_edge_type(type_name.as_ref(), edge_stats);
77            }
78        }
79
80        *self.statistics.write() = Arc::new(stats);
81    }
82
83    /// Full recomputation from storage: used after rollback when counters
84    /// may be out of sync. Also resyncs the atomic counters.
85    #[cfg(not(feature = "tiered-storage"))]
86    fn recompute_statistics_full(&self) {
87        let epoch = self.current_epoch();
88
89        // Full-scan node count
90        let total_nodes = self
91            .nodes
92            .read()
93            .values()
94            .filter_map(|chain| chain.visible_at(epoch))
95            .filter(|r| !r.is_deleted())
96            .count();
97
98        // Full-scan edge count and per-type counts
99        let edges = self.edges.read();
100        let mut total_edges: i64 = 0;
101        let id_to_edge_type = self.id_to_edge_type.read();
102        let mut type_counts = vec![0i64; id_to_edge_type.len()];
103
104        for chain in edges.values() {
105            if let Some(record) = chain.visible_at(epoch)
106                && !record.is_deleted()
107            {
108                total_edges += 1;
109                if (record.type_id as usize) < type_counts.len() {
110                    type_counts[record.type_id as usize] += 1;
111                }
112            }
113        }
114
115        // Resync the atomic counters
116        self.live_node_count
117            .store(total_nodes as i64, Ordering::Relaxed);
118        self.live_edge_count.store(total_edges, Ordering::Relaxed);
119        *self.edge_type_live_counts.write() = type_counts;
120
121        drop(edges);
122        drop(id_to_edge_type);
123
124        // Now use the normal incremental path to build statistics
125        self.compute_statistics();
126    }
127
128    /// Full recomputation from storage: used after rollback when counters
129    /// may be out of sync. Also resyncs the atomic counters.
130    /// (Tiered storage version)
131    #[cfg(feature = "tiered-storage")]
132    fn recompute_statistics_full(&self) {
133        let epoch = self.current_epoch();
134
135        // Full-scan node count
136        let versions = self.node_versions.read();
137        let total_nodes = versions
138            .iter()
139            .filter(|(_, index)| {
140                index.visible_at(epoch).map_or(false, |vref| {
141                    self.read_node_record(&vref)
142                        .map_or(false, |r| !r.is_deleted())
143                })
144            })
145            .count();
146        drop(versions);
147
148        // Full-scan edge count and per-type counts
149        let edge_versions = self.edge_versions.read();
150        let id_to_edge_type = self.id_to_edge_type.read();
151        let mut total_edges: i64 = 0;
152        let mut type_counts = vec![0i64; id_to_edge_type.len()];
153
154        for index in edge_versions.values() {
155            if let Some(vref) = index.visible_at(epoch)
156                && let Some(record) = self.read_edge_record(&vref)
157                && !record.is_deleted()
158            {
159                total_edges += 1;
160                if (record.type_id as usize) < type_counts.len() {
161                    type_counts[record.type_id as usize] += 1;
162                }
163            }
164        }
165
166        // Resync the atomic counters
167        self.live_node_count
168            .store(total_nodes as i64, Ordering::Relaxed);
169        self.live_edge_count.store(total_edges, Ordering::Relaxed);
170        *self.edge_type_live_counts.write() = type_counts;
171
172        drop(edge_versions);
173        drop(id_to_edge_type);
174
175        // Now use the normal incremental path to build statistics
176        self.compute_statistics();
177    }
178
179    /// Estimates cardinality for a label scan.
180    #[must_use]
181    pub fn estimate_label_cardinality(&self, label: &str) -> f64 {
182        self.statistics.read().estimate_label_cardinality(label)
183    }
184
185    /// Estimates average degree for an edge type.
186    #[must_use]
187    pub fn estimate_avg_degree(&self, edge_type: &str, outgoing: bool) -> f64 {
188        self.statistics
189            .read()
190            .estimate_avg_degree(edge_type, outgoing)
191    }
192}