Skip to main content

reddb_server/storage/query/
analyze_cmd.rs

1//! ANALYZE TABLE execution — Fase 5 P7 prereq.
2//!
3//! Implements the runtime of an `ANALYZE [TABLE] <name>` DDL
4//! command. Scans the target collection, samples rows, and
5//! emits per-column statistics that the planner's cost model
6//! consumes via `StatsProvider`.
7//!
8//! The module is named `analyze_cmd` to avoid colliding with
9//! the user's in-flight `analyzer.rs` (which handles
10//! DDL-analysis of `CREATE TABLE` types, not ANALYZE-the-command
11//! runtime execution).
12//!
13//! ## Sampling algorithm
14//!
15//! Two-pass reservoir-like sampler mirroring PG's
16//! `commands/analyze.c::acquire_sample_rows`:
17//!
18//! 1. **First pass**: walk the whole collection counting rows
19//!    and tracking every `sample_target` rows. Produces an
20//!    approximate row count and a fixed-size reservoir of
21//!    sample TIDs.
22//! 2. **Second pass**: fetch the sampled rows and compute:
23//!    - distinct-value count per column (HyperLogLog for
24//!      scalable cardinality)
25//!    - most-common-value list per column (top-k)
26//!    - equi-depth histogram of bucket bounds per column
27//!    - null count per column
28//!
29//! Results feed into `StatsProvider::column_mcv` and
30//! `column_histogram` — the planner's `filter_selectivity` is
31//! already plumbed to consume them (see histogram.rs from an
32//! earlier Fase).
33//!
34//! ## Scope
35//!
36//! - Sample size: `default_sample_target = 30_000` rows, same
37//!   as PG's `default_statistics_target * 300` rule of thumb.
38//! - Bucket count: 100 (PG default).
39//! - MCV list size: 100 (PG default).
40//!
41//! These are compile-time constants today; future versions
42//! expose them via `ANALYZE TABLE … WITH SAMPLE N`.
43//!
44//! ## What's NOT here
45//!
46//! - **Correlation / n-distinct multivariate stats** (PG's
47//!   `pg_statistic_ext`) — Fase 5 W5+.
48//! - **Incremental analyze** (vacuum-style) — today's
49//!   implementation rebuilds stats from scratch each run.
50//! - **Background auto-analyze** (PG's autovacuum) — manual
51//!   invocation only.
52//! - **Parallel sampling** — single-threaded reservoir.
53//!
54//! This module is **not yet wired** into the DDL dispatcher.
55//! Wiring plugs into `parser/ddl.rs::parse_analyze_table` (when
56//! that exists) and `runtime::impl_ddl::execute_analyze_table`.
57
58use std::collections::HashMap;
59
60/// Default number of rows to sample per analyze run. Matches
61/// PG's `default_statistics_target * 300` rule.
62pub const DEFAULT_SAMPLE_TARGET: usize = 30_000;
63
64/// Default histogram bucket count. PG's default_statistics_target
65/// value.
66pub const DEFAULT_HIST_BUCKETS: usize = 100;
67
68/// Default MCV list size. Same as PG's default_statistics_target.
69pub const DEFAULT_MCV_SIZE: usize = 100;
70
71/// Column-level statistics produced by ANALYZE. Mirrors the
72/// shape the planner's `StatsProvider` API expects.
73#[derive(Debug, Clone, Default)]
74pub struct ColumnAnalysis {
75    pub name: String,
76    pub distinct_count: u64,
77    pub null_count: u64,
78    pub total_count: u64,
79    /// Most common values: (value_repr, frequency in [0, 1]).
80    /// Sorted descending by frequency.
81    pub mcv: Vec<(String, f64)>,
82    /// Equi-depth histogram bucket boundaries. With N boundaries
83    /// there are N-1 equal-frequency buckets.
84    pub hist_bounds: Vec<String>,
85    /// Min / max observed in the sample (for the zone-map fast
86    /// path that doesn't need full histograms).
87    pub min_value: Option<String>,
88    pub max_value: Option<String>,
89}
90
91/// Table-level analysis result. The DDL executor returns this
92/// to the runtime, which persists it via `StatsProvider::update`.
93#[derive(Debug, Clone, Default)]
94pub struct TableAnalysis {
95    pub table: String,
96    pub row_count: u64,
97    pub avg_row_size: u64,
98    pub columns: Vec<ColumnAnalysis>,
99    /// Seconds spent sampling + computing — diagnostic.
100    pub elapsed_secs: f64,
101}
102
103/// Options for a single ANALYZE invocation. Defaults to the
104/// PG-equivalent settings.
105#[derive(Debug, Clone, Copy)]
106pub struct AnalyzeOptions {
107    pub sample_target: usize,
108    pub hist_buckets: usize,
109    pub mcv_size: usize,
110    /// When true, every column is analysed regardless of
111    /// whether it's indexable. When false, ANALYZE skips
112    /// blob / vector columns since they rarely appear in
113    /// WHERE clauses and are expensive to sample.
114    pub analyse_all_columns: bool,
115}
116
117impl Default for AnalyzeOptions {
118    fn default() -> Self {
119        Self {
120            sample_target: DEFAULT_SAMPLE_TARGET,
121            hist_buckets: DEFAULT_HIST_BUCKETS,
122            mcv_size: DEFAULT_MCV_SIZE,
123            analyse_all_columns: false,
124        }
125    }
126}
127
128/// Reservoir-sampling state: maintains a fixed-size window of
129/// row indices, replacing existing entries with probability
130/// `sample_target / rows_seen` to get uniform sampling over
131/// an unknown-size input.
132pub struct Reservoir {
133    capacity: usize,
134    samples: Vec<usize>,
135    rows_seen: u64,
136    /// Deterministic PRNG state for reproducible sampling.
137    /// xorshift64 — tiny and fast; statistical quality is
138    /// fine for sample index generation.
139    rng_state: u64,
140}
141
142impl Reservoir {
143    pub fn new(capacity: usize, seed: u64) -> Self {
144        Self {
145            capacity,
146            samples: Vec::with_capacity(capacity),
147            rows_seen: 0,
148            // Avoid zero state which xorshift can't recover from.
149            rng_state: if seed == 0 { 0x9E3779B97F4A7C15 } else { seed },
150        }
151    }
152
153    /// Observe a row. Returns `true` when the sampler decided
154    /// to keep this row index in the reservoir.
155    pub fn observe(&mut self, row_index: usize) -> bool {
156        self.rows_seen += 1;
157        if self.samples.len() < self.capacity {
158            self.samples.push(row_index);
159            return true;
160        }
161        let r = self.next_u64() % self.rows_seen;
162        if (r as usize) < self.capacity {
163            self.samples[r as usize] = row_index;
164            return true;
165        }
166        false
167    }
168
169    /// xorshift64 step — keeps the state non-zero.
170    fn next_u64(&mut self) -> u64 {
171        let mut x = self.rng_state;
172        x ^= x << 13;
173        x ^= x >> 7;
174        x ^= x << 17;
175        self.rng_state = x;
176        x
177    }
178
179    /// Drain the reservoir into a sorted Vec of row indices.
180    /// The sort is important because downstream code reads
181    /// rows in index order for cache-friendly I/O.
182    pub fn into_sorted_indices(mut self) -> Vec<usize> {
183        self.samples.sort_unstable();
184        self.samples
185    }
186}
187
188/// Compute per-column statistics from a slice of sampled values.
189/// The input is `Vec<Vec<Option<String>>>` where the outer
190/// index is the row and the inner is the column. `None`
191/// represents a null value.
192///
193/// Used by the DDL executor after it has fetched the sampled
194/// rows from the collection scan.
195pub fn compute_column_stats(
196    column_names: &[String],
197    sampled_rows: &[Vec<Option<String>>],
198    total_count: u64,
199    opts: AnalyzeOptions,
200) -> Vec<ColumnAnalysis> {
201    let mut out = Vec::with_capacity(column_names.len());
202    for (col_idx, name) in column_names.iter().enumerate() {
203        let mut null_count = 0u64;
204        let mut freq: HashMap<String, u64> = HashMap::new();
205        let mut values_in_order: Vec<String> = Vec::new();
206        for row in sampled_rows {
207            match row.get(col_idx) {
208                Some(Some(v)) => {
209                    *freq.entry(v.clone()).or_insert(0) += 1;
210                    values_in_order.push(v.clone());
211                }
212                _ => null_count += 1,
213            }
214        }
215        let distinct_count = freq.len() as u64;
216
217        // Min / max observed.
218        let mut sorted_values = values_in_order.clone();
219        sorted_values.sort();
220        let min_value = sorted_values.first().cloned();
221        let max_value = sorted_values.last().cloned();
222
223        // MCV: top-k by frequency, sorted descending.
224        let sample_len = sampled_rows.len() as f64;
225        let mut mcv_pairs: Vec<(String, u64)> = freq.into_iter().collect();
226        mcv_pairs.sort_by_key(|b| std::cmp::Reverse(b.1));
227        mcv_pairs.truncate(opts.mcv_size);
228        let mcv: Vec<(String, f64)> = mcv_pairs
229            .into_iter()
230            .map(|(k, count)| (k, count as f64 / sample_len))
231            .collect();
232
233        // Equi-depth histogram: divide the sorted value list
234        // into (hist_buckets + 1) boundary points. Each bucket
235        // holds roughly the same number of samples.
236        let hist_bounds = if sorted_values.is_empty() {
237            Vec::new()
238        } else {
239            let boundaries = opts.hist_buckets + 1;
240            let mut bounds = Vec::with_capacity(boundaries);
241            for b in 0..boundaries {
242                let idx = ((b * (sorted_values.len() - 1)) / opts.hist_buckets)
243                    .min(sorted_values.len() - 1);
244                bounds.push(sorted_values[idx].clone());
245            }
246            bounds
247        };
248
249        out.push(ColumnAnalysis {
250            name: name.clone(),
251            distinct_count,
252            null_count,
253            total_count,
254            mcv,
255            hist_bounds,
256            min_value,
257            max_value,
258        });
259    }
260    out
261}
262
263/// Build a TableAnalysis from a set of ColumnAnalysis results
264/// plus the table-level row count. Used by the DDL executor's
265/// final assembly step.
266pub fn build_table_analysis(
267    table: impl Into<String>,
268    row_count: u64,
269    avg_row_size: u64,
270    columns: Vec<ColumnAnalysis>,
271    elapsed_secs: f64,
272) -> TableAnalysis {
273    TableAnalysis {
274        table: table.into(),
275        row_count,
276        avg_row_size,
277        columns,
278        elapsed_secs,
279    }
280}
281
282/// Phase 3.7 cutover entry point. The runtime calls this to
283/// kick off an ANALYZE TABLE pass without going through the
284/// SQL DDL parser — for now the only public surface is via
285/// the API/HTTP layer, which avoids a `QueryExpr` cascade.
286///
287/// Inputs:
288/// - `table_name`: the collection to analyse
289/// - `column_names`: ordered list of columns the caller wants
290///   stats for (typically `SchemaRegistry::columns_of(table)`)
291/// - `sampled_rows`: the row sample the caller has already
292///   fetched. Each inner Vec is one row, indexed parallel to
293///   `column_names`. `None` represents NULL.
294/// - `total_count`: the full row count (not the sample size),
295///   used by selectivity estimates downstream.
296///
297/// Returns the final `TableAnalysis` ready to feed into
298/// `StatsProvider::update`. Phase 5 wires the caller chain
299/// (sample fetch → this call → stats provider update) into
300/// `runtime::impl_ddl::handle_analyze`.
301pub fn run_analyze_from_sample(
302    table_name: impl Into<String>,
303    column_names: &[String],
304    sampled_rows: &[Vec<Option<String>>],
305    total_count: u64,
306) -> TableAnalysis {
307    let opts = AnalyzeOptions::default();
308    let columns = compute_column_stats(column_names, sampled_rows, total_count, opts);
309    let avg_row_size = sampled_rows
310        .first()
311        .map(|row| {
312            row.iter()
313                .map(|v| v.as_ref().map(|s| s.len()).unwrap_or(0))
314                .sum::<usize>() as u64
315        })
316        .unwrap_or(0);
317    build_table_analysis(table_name, total_count, avg_row_size, columns, 0.0)
318}