Skip to main content

grafeo_core/graph/lpg/store/
statistics.rs

1use super::LpgStore;
2use crate::statistics::{EdgeTypeStatistics, LabelStatistics, Statistics};
3use std::sync::Arc;
4use std::sync::atomic::Ordering;
5
6impl LpgStore {
7    // === Statistics ===
8
9    /// Returns the current statistics (cheap `Arc` clone, no deep copy).
10    #[must_use]
11    pub fn statistics(&self) -> Arc<Statistics> {
12        Arc::clone(&self.statistics.read())
13    }
14
15    /// Recomputes statistics if they are stale (i.e., after mutations).
16    ///
17    /// Call this before reading statistics for query optimization.
18    /// Avoids redundant recomputation if no mutations occurred.
19    pub fn ensure_statistics_fresh(&self) {
20        if self.needs_stats_recompute.swap(false, Ordering::Relaxed) {
21            self.recompute_statistics_full();
22        } else {
23            self.compute_statistics();
24        }
25    }
26
27    /// Recomputes statistics from incremental counters.
28    ///
29    /// Reads live node/edge counts from atomic counters and per-label counts
30    /// from the label index. This is O(|labels| + |edge_types|) instead of
31    /// O(n + m) for a full scan.
32    pub fn compute_statistics(&self) {
33        let mut stats = Statistics::new();
34
35        // Read total counts from atomic counters
36        stats.total_nodes = self.live_node_count.load(Ordering::Relaxed).max(0) as u64;
37        stats.total_edges = self.live_edge_count.load(Ordering::Relaxed).max(0) as u64;
38
39        // Compute per-label statistics from label_index (each is O(1) via .len())
40        let id_to_label = self.id_to_label.read();
41        let label_index = self.label_index.read();
42
43        for (label_id, label_name) in id_to_label.iter().enumerate() {
44            let node_count = label_index.get(label_id).map_or(0, |set| set.len() as u64);
45
46            if node_count > 0 {
47                let avg_out_degree = if stats.total_nodes > 0 {
48                    stats.total_edges as f64 / stats.total_nodes as f64
49                } else {
50                    0.0
51                };
52
53                let label_stats =
54                    LabelStatistics::new(node_count).with_degrees(avg_out_degree, avg_out_degree);
55
56                stats.update_label(label_name.as_ref(), label_stats);
57            }
58        }
59
60        // Compute per-edge-type statistics from incremental counts
61        let id_to_edge_type = self.id_to_edge_type.read();
62        let edge_type_counts = self.edge_type_live_counts.read();
63
64        for (type_id, type_name) in id_to_edge_type.iter().enumerate() {
65            let count = edge_type_counts.get(type_id).copied().unwrap_or(0).max(0) as u64;
66
67            if count > 0 {
68                let avg_degree = if stats.total_nodes > 0 {
69                    count as f64 / stats.total_nodes as f64
70                } else {
71                    0.0
72                };
73
74                let edge_stats = EdgeTypeStatistics::new(count, avg_degree, avg_degree);
75                stats.update_edge_type(type_name.as_ref(), edge_stats);
76            }
77        }
78
79        *self.statistics.write() = Arc::new(stats);
80    }
81
82    /// Full recomputation from storage — used after rollback when counters
83    /// may be out of sync. Also resyncs the atomic counters.
84    #[cfg(not(feature = "tiered-storage"))]
85    fn recompute_statistics_full(&self) {
86        let epoch = self.current_epoch();
87
88        // Full-scan node count
89        let total_nodes = self
90            .nodes
91            .read()
92            .values()
93            .filter_map(|chain| chain.visible_at(epoch))
94            .filter(|r| !r.is_deleted())
95            .count();
96
97        // Full-scan edge count and per-type counts
98        let edges = self.edges.read();
99        let mut total_edges: i64 = 0;
100        let id_to_edge_type = self.id_to_edge_type.read();
101        let mut type_counts = vec![0i64; id_to_edge_type.len()];
102
103        for chain in edges.values() {
104            if let Some(record) = chain.visible_at(epoch)
105                && !record.is_deleted()
106            {
107                total_edges += 1;
108                if (record.type_id as usize) < type_counts.len() {
109                    type_counts[record.type_id as usize] += 1;
110                }
111            }
112        }
113
114        // Resync the atomic counters
115        self.live_node_count
116            .store(total_nodes as i64, Ordering::Relaxed);
117        self.live_edge_count.store(total_edges, Ordering::Relaxed);
118        *self.edge_type_live_counts.write() = type_counts;
119
120        drop(edges);
121        drop(id_to_edge_type);
122
123        // Now use the normal incremental path to build statistics
124        self.compute_statistics();
125    }
126
127    /// Full recomputation from storage — used after rollback when counters
128    /// may be out of sync. Also resyncs the atomic counters.
129    /// (Tiered storage version)
130    #[cfg(feature = "tiered-storage")]
131    fn recompute_statistics_full(&self) {
132        let epoch = self.current_epoch();
133
134        // Full-scan node count
135        let versions = self.node_versions.read();
136        let total_nodes = versions
137            .iter()
138            .filter(|(_, index)| {
139                index.visible_at(epoch).map_or(false, |vref| {
140                    self.read_node_record(&vref)
141                        .map_or(false, |r| !r.is_deleted())
142                })
143            })
144            .count();
145        drop(versions);
146
147        // Full-scan edge count and per-type counts
148        let edge_versions = self.edge_versions.read();
149        let id_to_edge_type = self.id_to_edge_type.read();
150        let mut total_edges: i64 = 0;
151        let mut type_counts = vec![0i64; id_to_edge_type.len()];
152
153        for index in edge_versions.values() {
154            if let Some(vref) = index.visible_at(epoch)
155                && let Some(record) = self.read_edge_record(&vref)
156                && !record.is_deleted()
157            {
158                total_edges += 1;
159                if (record.type_id as usize) < type_counts.len() {
160                    type_counts[record.type_id as usize] += 1;
161                }
162            }
163        }
164
165        // Resync the atomic counters
166        self.live_node_count
167            .store(total_nodes as i64, Ordering::Relaxed);
168        self.live_edge_count.store(total_edges, Ordering::Relaxed);
169        *self.edge_type_live_counts.write() = type_counts;
170
171        drop(edge_versions);
172        drop(id_to_edge_type);
173
174        // Now use the normal incremental path to build statistics
175        self.compute_statistics();
176    }
177
178    /// Estimates cardinality for a label scan.
179    #[must_use]
180    pub fn estimate_label_cardinality(&self, label: &str) -> f64 {
181        self.statistics.read().estimate_label_cardinality(label)
182    }
183
184    /// Estimates average degree for an edge type.
185    #[must_use]
186    pub fn estimate_avg_degree(&self, edge_type: &str, outgoing: bool) -> f64 {
187        self.statistics
188            .read()
189            .estimate_avg_degree(edge_type, outgoing)
190    }
191}