reddb_server/storage/query/analyze_cmd.rs
1//! ANALYZE TABLE execution — Fase 5 P7 prereq.
2//!
3//! Implements the runtime of an `ANALYZE [TABLE] <name>` DDL
4//! command. Scans the target collection, samples rows, and
5//! emits per-column statistics that the planner's cost model
6//! consumes via `StatsProvider`.
7//!
8//! The module is named `analyze_cmd` to avoid colliding with
9//! the user's in-flight `analyzer.rs` (which handles
10//! DDL-analysis of `CREATE TABLE` types, not ANALYZE-the-command
11//! runtime execution).
12//!
13//! ## Sampling algorithm
14//!
15//! Two-pass reservoir-like sampler mirroring PG's
16//! `commands/analyze.c::acquire_sample_rows`:
17//!
18//! 1. **First pass**: walk the whole collection counting rows
19//! and tracking every `sample_target` rows. Produces an
20//! approximate row count and a fixed-size reservoir of
21//! sample TIDs.
22//! 2. **Second pass**: fetch the sampled rows and compute:
23//! - distinct-value count per column (HyperLogLog for
24//! scalable cardinality)
25//! - most-common-value list per column (top-k)
26//! - equi-depth histogram of bucket bounds per column
27//! - null count per column
28//!
29//! Results feed into `StatsProvider::column_mcv` and
30//! `column_histogram` — the planner's `filter_selectivity` is
31//! already plumbed to consume them (see histogram.rs from an
32//! earlier Fase).
33//!
34//! ## Scope
35//!
36//! - Sample size: `default_sample_target = 30_000` rows, same
37//! as PG's `default_statistics_target * 300` rule of thumb.
38//! - Bucket count: 100 (PG default).
39//! - MCV list size: 100 (PG default).
40//!
41//! These are compile-time constants today; future versions
42//! expose them via `ANALYZE TABLE … WITH SAMPLE N`.
43//!
44//! ## What's NOT here
45//!
46//! - **Correlation / n-distinct multivariate stats** (PG's
47//! `pg_statistic_ext`) — Fase 5 W5+.
48//! - **Incremental analyze** (vacuum-style) — today's
49//! implementation rebuilds stats from scratch each run.
50//! - **Background auto-analyze** (PG's autovacuum) — manual
51//! invocation only.
52//! - **Parallel sampling** — single-threaded reservoir.
53//!
54//! This module is **not yet wired** into the DDL dispatcher.
55//! Wiring plugs into `parser/ddl.rs::parse_analyze_table` (when
56//! that exists) and `runtime::impl_ddl::execute_analyze_table`.
57
58use std::collections::HashMap;
59
60/// Default number of rows to sample per analyze run. Matches
61/// PG's `default_statistics_target * 300` rule.
62pub const DEFAULT_SAMPLE_TARGET: usize = 30_000;
63
64/// Default histogram bucket count. PG's default_statistics_target
65/// value.
66pub const DEFAULT_HIST_BUCKETS: usize = 100;
67
68/// Default MCV list size. Same as PG's default_statistics_target.
69pub const DEFAULT_MCV_SIZE: usize = 100;
70
71/// Column-level statistics produced by ANALYZE. Mirrors the
72/// shape the planner's `StatsProvider` API expects.
73#[derive(Debug, Clone, Default)]
74pub struct ColumnAnalysis {
75 pub name: String,
76 pub distinct_count: u64,
77 pub null_count: u64,
78 pub total_count: u64,
79 /// Most common values: (value_repr, frequency in [0, 1]).
80 /// Sorted descending by frequency.
81 pub mcv: Vec<(String, f64)>,
82 /// Equi-depth histogram bucket boundaries. With N boundaries
83 /// there are N-1 equal-frequency buckets.
84 pub hist_bounds: Vec<String>,
85 /// Min / max observed in the sample (for the zone-map fast
86 /// path that doesn't need full histograms).
87 pub min_value: Option<String>,
88 pub max_value: Option<String>,
89}
90
91/// Table-level analysis result. The DDL executor returns this
92/// to the runtime, which persists it via `StatsProvider::update`.
93#[derive(Debug, Clone, Default)]
94pub struct TableAnalysis {
95 pub table: String,
96 pub row_count: u64,
97 pub avg_row_size: u64,
98 pub columns: Vec<ColumnAnalysis>,
99 /// Seconds spent sampling + computing — diagnostic.
100 pub elapsed_secs: f64,
101}
102
103/// Options for a single ANALYZE invocation. Defaults to the
104/// PG-equivalent settings.
105#[derive(Debug, Clone, Copy)]
106pub struct AnalyzeOptions {
107 pub sample_target: usize,
108 pub hist_buckets: usize,
109 pub mcv_size: usize,
110 /// When true, every column is analysed regardless of
111 /// whether it's indexable. When false, ANALYZE skips
112 /// blob / vector columns since they rarely appear in
113 /// WHERE clauses and are expensive to sample.
114 pub analyse_all_columns: bool,
115}
116
117impl Default for AnalyzeOptions {
118 fn default() -> Self {
119 Self {
120 sample_target: DEFAULT_SAMPLE_TARGET,
121 hist_buckets: DEFAULT_HIST_BUCKETS,
122 mcv_size: DEFAULT_MCV_SIZE,
123 analyse_all_columns: false,
124 }
125 }
126}
127
128/// Reservoir-sampling state: maintains a fixed-size window of
129/// row indices, replacing existing entries with probability
130/// `sample_target / rows_seen` to get uniform sampling over
131/// an unknown-size input.
132pub struct Reservoir {
133 capacity: usize,
134 samples: Vec<usize>,
135 rows_seen: u64,
136 /// Deterministic PRNG state for reproducible sampling.
137 /// xorshift64 — tiny and fast; statistical quality is
138 /// fine for sample index generation.
139 rng_state: u64,
140}
141
142impl Reservoir {
143 pub fn new(capacity: usize, seed: u64) -> Self {
144 Self {
145 capacity,
146 samples: Vec::with_capacity(capacity),
147 rows_seen: 0,
148 // Avoid zero state which xorshift can't recover from.
149 rng_state: if seed == 0 { 0x9E3779B97F4A7C15 } else { seed },
150 }
151 }
152
153 /// Observe a row. Returns `true` when the sampler decided
154 /// to keep this row index in the reservoir.
155 pub fn observe(&mut self, row_index: usize) -> bool {
156 self.rows_seen += 1;
157 if self.samples.len() < self.capacity {
158 self.samples.push(row_index);
159 return true;
160 }
161 let r = self.next_u64() % self.rows_seen;
162 if (r as usize) < self.capacity {
163 self.samples[r as usize] = row_index;
164 return true;
165 }
166 false
167 }
168
169 /// xorshift64 step — keeps the state non-zero.
170 fn next_u64(&mut self) -> u64 {
171 let mut x = self.rng_state;
172 x ^= x << 13;
173 x ^= x >> 7;
174 x ^= x << 17;
175 self.rng_state = x;
176 x
177 }
178
179 /// Drain the reservoir into a sorted Vec of row indices.
180 /// The sort is important because downstream code reads
181 /// rows in index order for cache-friendly I/O.
182 pub fn into_sorted_indices(mut self) -> Vec<usize> {
183 self.samples.sort_unstable();
184 self.samples
185 }
186}
187
188/// Compute per-column statistics from a slice of sampled values.
189/// The input is `Vec<Vec<Option<String>>>` where the outer
190/// index is the row and the inner is the column. `None`
191/// represents a null value.
192///
193/// Used by the DDL executor after it has fetched the sampled
194/// rows from the collection scan.
195pub fn compute_column_stats(
196 column_names: &[String],
197 sampled_rows: &[Vec<Option<String>>],
198 total_count: u64,
199 opts: AnalyzeOptions,
200) -> Vec<ColumnAnalysis> {
201 let mut out = Vec::with_capacity(column_names.len());
202 for (col_idx, name) in column_names.iter().enumerate() {
203 let mut null_count = 0u64;
204 let mut freq: HashMap<String, u64> = HashMap::new();
205 let mut values_in_order: Vec<String> = Vec::new();
206 for row in sampled_rows {
207 match row.get(col_idx) {
208 Some(Some(v)) => {
209 *freq.entry(v.clone()).or_insert(0) += 1;
210 values_in_order.push(v.clone());
211 }
212 _ => null_count += 1,
213 }
214 }
215 let distinct_count = freq.len() as u64;
216
217 // Min / max observed.
218 let mut sorted_values = values_in_order.clone();
219 sorted_values.sort();
220 let min_value = sorted_values.first().cloned();
221 let max_value = sorted_values.last().cloned();
222
223 // MCV: top-k by frequency, sorted descending.
224 let sample_len = sampled_rows.len() as f64;
225 let mut mcv_pairs: Vec<(String, u64)> = freq.into_iter().collect();
226 mcv_pairs.sort_by_key(|b| std::cmp::Reverse(b.1));
227 mcv_pairs.truncate(opts.mcv_size);
228 let mcv: Vec<(String, f64)> = mcv_pairs
229 .into_iter()
230 .map(|(k, count)| (k, count as f64 / sample_len))
231 .collect();
232
233 // Equi-depth histogram: divide the sorted value list
234 // into (hist_buckets + 1) boundary points. Each bucket
235 // holds roughly the same number of samples.
236 let hist_bounds = if sorted_values.is_empty() {
237 Vec::new()
238 } else {
239 let boundaries = opts.hist_buckets + 1;
240 let mut bounds = Vec::with_capacity(boundaries);
241 for b in 0..boundaries {
242 let idx = ((b * (sorted_values.len() - 1)) / opts.hist_buckets)
243 .min(sorted_values.len() - 1);
244 bounds.push(sorted_values[idx].clone());
245 }
246 bounds
247 };
248
249 out.push(ColumnAnalysis {
250 name: name.clone(),
251 distinct_count,
252 null_count,
253 total_count,
254 mcv,
255 hist_bounds,
256 min_value,
257 max_value,
258 });
259 }
260 out
261}
262
263/// Build a TableAnalysis from a set of ColumnAnalysis results
264/// plus the table-level row count. Used by the DDL executor's
265/// final assembly step.
266pub fn build_table_analysis(
267 table: impl Into<String>,
268 row_count: u64,
269 avg_row_size: u64,
270 columns: Vec<ColumnAnalysis>,
271 elapsed_secs: f64,
272) -> TableAnalysis {
273 TableAnalysis {
274 table: table.into(),
275 row_count,
276 avg_row_size,
277 columns,
278 elapsed_secs,
279 }
280}
281
282/// Phase 3.7 cutover entry point. The runtime calls this to
283/// kick off an ANALYZE TABLE pass without going through the
284/// SQL DDL parser — for now the only public surface is via
285/// the API/HTTP layer, which avoids a `QueryExpr` cascade.
286///
287/// Inputs:
288/// - `table_name`: the collection to analyse
289/// - `column_names`: ordered list of columns the caller wants
290/// stats for (typically `SchemaRegistry::columns_of(table)`)
291/// - `sampled_rows`: the row sample the caller has already
292/// fetched. Each inner Vec is one row, indexed parallel to
293/// `column_names`. `None` represents NULL.
294/// - `total_count`: the full row count (not the sample size),
295/// used by selectivity estimates downstream.
296///
297/// Returns the final `TableAnalysis` ready to feed into
298/// `StatsProvider::update`. Phase 5 wires the caller chain
299/// (sample fetch → this call → stats provider update) into
300/// `runtime::impl_ddl::handle_analyze`.
301pub fn run_analyze_from_sample(
302 table_name: impl Into<String>,
303 column_names: &[String],
304 sampled_rows: &[Vec<Option<String>>],
305 total_count: u64,
306) -> TableAnalysis {
307 let opts = AnalyzeOptions::default();
308 let columns = compute_column_stats(column_names, sampled_rows, total_count, opts);
309 let avg_row_size = sampled_rows
310 .first()
311 .map(|row| {
312 row.iter()
313 .map(|v| v.as_ref().map(|s| s.len()).unwrap_or(0))
314 .sum::<usize>() as u64
315 })
316 .unwrap_or(0);
317 build_table_analysis(table_name, total_count, avg_row_size, columns, 0.0)
318}