Skip to main content

spg_engine/
statistics.rs

1// pedantic doc_markdown flags the embedded wire-format spec block
2// and a handful of proper nouns; allowing at the module level
3// keeps the spec readable.
4#![allow(clippy::doc_markdown)]
5
6//! v6.2.0 — per-column statistics for the cost-based optimizer.
7//!
8//! Each analysed `(table, column)` pair gets a [`ColumnStats`]
9//! row: `null_frac` ∈ [0.0, 1.0], `n_distinct` count, and a
10//! 100-bucket equi-depth histogram (`Vec<String>` of 101 bounds —
11//! v0 .. v100). Skewed distributions live in the bucket widths,
12//! not in a separate MCV sidecar (see `V6_2_DESIGN.md` deliberation
13//! #1).
14//!
15//! Storage shape mirrors [`crate::publications::Publications`] and
16//! [`crate::subscriptions::Subscriptions`]:
17//!   - `BTreeMap<(String, String), ColumnStats>` keeps iteration
18//!     in deterministic alphabetical order (snapshot byte-stable
19//!     regardless of insertion sequence).
20//!   - `BTreeMap<String, u64>` tracks per-table modified-row count
21//!     for v6.2.1 auto-analyze's 10 % threshold trigger.
22//!
23//! Persistence rides the snapshot envelope's v5 trailer block (see
24//! `crate::lib::build_envelope`). v1/v2/v3/v4 envelopes deserialise
25//! to empty statistics; v5 writers always emit the trailer.
26
27use alloc::collections::BTreeMap;
28use alloc::string::{String, ToString};
29use alloc::vec::Vec;
30
31/// Per-column statistics computed by ANALYZE. See module docs.
32#[derive(Debug, Clone, PartialEq)]
33pub struct ColumnStats {
34    /// Fraction of NULL rows. `[0.0, 1.0]`.
35    pub null_frac: f32,
36    /// Raw distinct-value count (linear-counting estimate; PG
37    /// stores a similar approximation. v6.2.x can re-tune the
38    /// sketch — the value semantic is "approximate ≥ 0.95
39    /// accuracy on skewed corpora").
40    pub n_distinct: u64,
41    /// 101 sorted bounds → 100 equi-depth buckets. Bucket `i`
42    /// spans values in `[bounds[i], bounds[i+1])` for i < 99,
43    /// and `[bounds[99], bounds[100]]` for the final bucket.
44    /// Bounds are the canonical SQL textual form of the column's
45    /// values — TEXT lexicographic, INT decimal, FLOAT
46    /// shortest-round-trip, DATE/TIMESTAMP ISO. Empty for an
47    /// all-NULL column.
48    pub histogram_bounds: Vec<String>,
49}
50
51#[derive(Debug, Clone, PartialEq, Default)]
52pub struct Statistics {
53    /// Keyed on `(table_name, column_name)`. BTreeMap orders by
54    /// `(table, column)` so iteration is deterministic for
55    /// snapshot byte-stability and SHOW-style introspection.
56    inner: BTreeMap<(String, String), ColumnStats>,
57    /// Per-table modified-row counter since the last ANALYZE on
58    /// that table. v6.2.1 auto-analyze fires when this fraction
59    /// crosses 10 % of the live row count.
60    modified_since: BTreeMap<String, u64>,
61    /// v6.3.1 — monotonic version bumped on every successful
62    /// ANALYZE. The plan cache snapshots this at prepare time;
63    /// cache lookup compares and evicts on mismatch.
64    ///
65    /// In-memory only. Does NOT ride the envelope (plan cache is
66    /// in-memory only too, so version starts at 0 on every Engine
67    /// boot).
68    version: u64,
69}
70
71// Statistics holds f32 (null_frac) so it can't auto-derive `Eq`.
72// ANALYZE never stores NaN, so PartialEq is total in practice; we
73// just don't claim Eq.
74
75#[derive(Debug, PartialEq, Eq)]
76pub enum StatisticsError {
77    Corrupt(String),
78}
79
80impl Statistics {
81    pub fn new() -> Self {
82        Self::default()
83    }
84
85    pub fn len(&self) -> usize {
86        self.inner.len()
87    }
88
89    pub fn is_empty(&self) -> bool {
90        self.inner.is_empty()
91    }
92
93    pub fn get(&self, table: &str, column: &str) -> Option<&ColumnStats> {
94        self.inner.get(&(table.to_string(), column.to_string()))
95    }
96
97    /// Iterate `((table, column), stats)` in deterministic
98    /// alphabetical order. Used by `SELECT * FROM spg_statistic`
99    /// and by snapshot serialisation.
100    pub fn iter(&self) -> impl Iterator<Item = (&(String, String), &ColumnStats)> {
101        self.inner.iter()
102    }
103
104    /// Replace (or insert) the stats for one `(table, column)`.
105    /// Called by ANALYZE per column.
106    pub fn set(&mut self, table: String, column: String, stats: ColumnStats) {
107        self.inner.insert((table, column), stats);
108    }
109
110    /// Drop every row whose table key matches `table`. Called
111    /// before re-ANALYZE on a table so columns dropped between
112    /// analyses don't leave stale rows behind.
113    pub fn clear_table(&mut self, table: &str) {
114        self.inner.retain(|(t, _), _| t != table);
115    }
116
117    /// Reset the modified-row counter for `table`. Called at the
118    /// end of ANALYZE so v6.2.1 auto-analyze starts a fresh
119    /// window.
120    pub fn reset_modified(&mut self, table: &str) {
121        self.modified_since.insert(table.to_string(), 0);
122    }
123
124    /// Bump the modified-row counter. The engine's
125    /// `exec_insert` / `exec_update` / `exec_delete` paths feed
126    /// this hook so v6.2.1 auto-analyze can read it.
127    pub fn record_modifications(&mut self, table: &str, n: u64) {
128        let entry = self.modified_since.entry(table.to_string()).or_default();
129        *entry = entry.saturating_add(n);
130    }
131
132    pub fn modified_since_last_analyze(&self, table: &str) -> u64 {
133        self.modified_since.get(table).copied().unwrap_or(0)
134    }
135
136    /// v6.3.1 — current monotonic version. Plan cache snapshots this
137    /// at prepare time; lookup compares and evicts on mismatch.
138    pub fn version(&self) -> u64 {
139        self.version
140    }
141
142    /// v6.3.1 — bumps the version. Called by `exec_analyze` after a
143    /// successful ANALYZE on any table.
144    pub fn bump_version(&mut self) {
145        self.version = self.version.saturating_add(1);
146    }
147
148    // ── serialisation (envelope v5 trailer) ─────────────────────
149
150    /// Format (each block little-endian):
151    ///   [u16 num_columns]
152    ///   for each column:
153    ///     [u16 table_len][table bytes]
154    ///     [u16 col_len][col bytes]
155    ///     [f32 null_frac]
156    ///     [u64 n_distinct]
157    ///     [u16 num_bounds]
158    ///     for each bound: [u16 b_len][b bytes]
159    ///   [u16 num_modified_entries]
160    ///   for each: [u16 t_len][t bytes][u64 modified_count]
161    pub fn serialize(&self) -> Vec<u8> {
162        let mut out = Vec::with_capacity(2 + self.inner.len() * 32);
163        let n = u16::try_from(self.inner.len()).expect("≤ 65,535 column-stats rows");
164        out.extend_from_slice(&n.to_le_bytes());
165        for ((table, col), stats) in &self.inner {
166            write_str(&mut out, table);
167            write_str(&mut out, col);
168            out.extend_from_slice(&stats.null_frac.to_le_bytes());
169            out.extend_from_slice(&stats.n_distinct.to_le_bytes());
170            let nb =
171                u16::try_from(stats.histogram_bounds.len()).expect("≤ 65,535 histogram bounds");
172            out.extend_from_slice(&nb.to_le_bytes());
173            for b in &stats.histogram_bounds {
174                write_str(&mut out, b);
175            }
176        }
177        let m = u16::try_from(self.modified_since.len()).expect("≤ 65,535 modified-row counters");
178        out.extend_from_slice(&m.to_le_bytes());
179        for (table, count) in &self.modified_since {
180            write_str(&mut out, table);
181            out.extend_from_slice(&count.to_le_bytes());
182        }
183        out
184    }
185
186    pub fn deserialize(buf: &[u8]) -> Result<Self, StatisticsError> {
187        let mut p = 0usize;
188        let n = read_u16(buf, &mut p)? as usize;
189        let mut inner = BTreeMap::new();
190        for _ in 0..n {
191            let table = read_str(buf, &mut p)?;
192            let col = read_str(buf, &mut p)?;
193            let null_frac_bytes = read_bytes(buf, &mut p, 4)?;
194            let null_frac = f32::from_le_bytes(
195                null_frac_bytes
196                    .try_into()
197                    .map_err(|_| StatisticsError::Corrupt("null_frac slice".to_string()))?,
198            );
199            let n_distinct = read_u64(buf, &mut p)?;
200            let nb = read_u16(buf, &mut p)? as usize;
201            let mut bounds = Vec::with_capacity(nb);
202            for _ in 0..nb {
203                bounds.push(read_str(buf, &mut p)?);
204            }
205            if inner
206                .insert(
207                    (table.clone(), col.clone()),
208                    ColumnStats {
209                        null_frac,
210                        n_distinct,
211                        histogram_bounds: bounds,
212                    },
213                )
214                .is_some()
215            {
216                return Err(StatisticsError::Corrupt(alloc::format!(
217                    "duplicate spg_statistic key ({table:?}, {col:?})"
218                )));
219            }
220        }
221        let m = read_u16(buf, &mut p)? as usize;
222        let mut modified_since = BTreeMap::new();
223        for _ in 0..m {
224            let table = read_str(buf, &mut p)?;
225            let count = read_u64(buf, &mut p)?;
226            modified_since.insert(table, count);
227        }
228        if p != buf.len() {
229            return Err(StatisticsError::Corrupt(alloc::format!(
230                "trailing bytes in statistics payload: read {p}, len {}",
231                buf.len()
232            )));
233        }
234        Ok(Self {
235            inner,
236            modified_since,
237            version: 0,
238        })
239    }
240}
241
242// ── histogram builder ───────────────────────────────────────────
243
244/// v6.2.0 — 100-bucket equi-depth histogram bound count (101
245/// boundary values). v6.2.x can re-tune.
246pub const NUM_BUCKETS: usize = 100;
247
248/// Build an equi-depth histogram over a (sorted) sample of textual
249/// column values. Returns the 101 boundary strings, or an empty
250/// vec when the input has no non-NULL values.
251///
252/// The caller sorts the sample via the column's natural ordering
253/// (TEXT lexicographic, INT decimal, etc.) and hands us the
254/// already-stringified values — we don't try to reason about types
255/// here. Equi-depth means each consecutive pair of bounds spans
256/// approximately `sample.len() / NUM_BUCKETS` values; selectivity
257/// estimation in v6.2.2 walks bounds directly.
258pub fn build_histogram(sorted_values: &[String]) -> Vec<String> {
259    if sorted_values.is_empty() {
260        return Vec::new();
261    }
262    let n = sorted_values.len();
263    // 101 bounds = 100 buckets. With fewer than 101 values, every
264    // value becomes its own bound (smaller histograms degrade
265    // gracefully — selectivity still works).
266    if n <= NUM_BUCKETS + 1 {
267        return sorted_values.to_vec();
268    }
269    let mut bounds = Vec::with_capacity(NUM_BUCKETS + 1);
270    for i in 0..=NUM_BUCKETS {
271        // `i / NUM_BUCKETS` ∈ [0, 1] → index ∈ [0, n-1]
272        let idx = (i as u64 * (n as u64 - 1)) / NUM_BUCKETS as u64;
273        bounds.push(sorted_values[idx as usize].clone());
274    }
275    bounds
276}
277
278/// v6.2.0 — n_distinct estimator. Linear-counting sketch over the
279/// already-sorted-and-deduped sample. Returns the exact distinct
280/// count on a complete sample; on a reservoir sample, returns the
281/// observed count (which v6.2.x can swap for HyperLogLog if needed).
282pub fn estimate_n_distinct(sorted_values: &[String]) -> u64 {
283    if sorted_values.is_empty() {
284        return 0;
285    }
286    let mut count: u64 = 1;
287    let mut prev = &sorted_values[0];
288    for v in &sorted_values[1..] {
289        if v != prev {
290            count += 1;
291            prev = v;
292        }
293    }
294    count
295}
296
297// ── byte-codec helpers ──────────────────────────────────────────
298
299fn write_str(out: &mut Vec<u8>, s: &str) {
300    let n = u16::try_from(s.len()).expect("table / column / bound names ≤ 65,535 bytes");
301    out.extend_from_slice(&n.to_le_bytes());
302    out.extend_from_slice(s.as_bytes());
303}
304
305fn read_bytes<'a>(buf: &'a [u8], p: &mut usize, n: usize) -> Result<&'a [u8], StatisticsError> {
306    let slice = buf
307        .get(*p..*p + n)
308        .ok_or_else(|| StatisticsError::Corrupt(alloc::format!("short read ({n} bytes)")))?;
309    *p += n;
310    Ok(slice)
311}
312
313fn read_u16(buf: &[u8], p: &mut usize) -> Result<u16, StatisticsError> {
314    let bytes = read_bytes(buf, p, 2)?;
315    Ok(u16::from_le_bytes(bytes.try_into().map_err(|_| {
316        StatisticsError::Corrupt("u16 slice".to_string())
317    })?))
318}
319
320fn read_u64(buf: &[u8], p: &mut usize) -> Result<u64, StatisticsError> {
321    let bytes = read_bytes(buf, p, 8)?;
322    Ok(u64::from_le_bytes(bytes.try_into().map_err(|_| {
323        StatisticsError::Corrupt("u64 slice".to_string())
324    })?))
325}
326
327fn read_str(buf: &[u8], p: &mut usize) -> Result<String, StatisticsError> {
328    let n = read_u16(buf, p)? as usize;
329    let slice = read_bytes(buf, p, n)?;
330    core::str::from_utf8(slice)
331        .map(ToString::to_string)
332        .map_err(|e| StatisticsError::Corrupt(alloc::format!("non-UTF-8 str: {e}")))
333}
334
335#[cfg(test)]
336mod tests {
337    use super::*;
338
339    fn mk_cs(null_frac: f32, n_distinct: u64, bounds: &[&str]) -> ColumnStats {
340        ColumnStats {
341            null_frac,
342            n_distinct,
343            histogram_bounds: bounds.iter().map(|s| s.to_string()).collect(),
344        }
345    }
346
347    #[test]
348    fn empty_roundtrips() {
349        let s = Statistics::new();
350        let bytes = s.serialize();
351        let s2 = Statistics::deserialize(&bytes).unwrap();
352        assert_eq!(s, s2);
353    }
354
355    #[test]
356    fn single_column_roundtrips() {
357        let mut s = Statistics::new();
358        s.set(
359            "users".into(),
360            "id".into(),
361            mk_cs(0.0, 1000, &["1", "500", "1000"]),
362        );
363        let s2 = Statistics::deserialize(&s.serialize()).unwrap();
364        assert_eq!(s, s2);
365        let got = s2.get("users", "id").unwrap();
366        assert_eq!(got.n_distinct, 1000);
367        assert_eq!(got.histogram_bounds.len(), 3);
368    }
369
370    #[test]
371    fn multi_column_roundtrips_with_modified_counter() {
372        let mut s = Statistics::new();
373        s.set(
374            "users".into(),
375            "id".into(),
376            mk_cs(0.0, 100, &["1", "50", "100"]),
377        );
378        s.set(
379            "users".into(),
380            "name".into(),
381            mk_cs(0.1, 99, &["alice", "bob", "zoe"]),
382        );
383        s.record_modifications("users", 17);
384        let s2 = Statistics::deserialize(&s.serialize()).unwrap();
385        assert_eq!(s, s2);
386        assert_eq!(s2.modified_since_last_analyze("users"), 17);
387    }
388
389    #[test]
390    fn histogram_bounds_count_is_101_for_100_buckets() {
391        // 1000 sorted values → equi-depth 100 buckets → 101 bounds.
392        let vals: Vec<String> = (0..1000).map(|i| alloc::format!("{i:04}")).collect();
393        let bounds = build_histogram(&vals);
394        assert_eq!(bounds.len(), 101);
395        // First + last bound match min + max.
396        assert_eq!(bounds.first().unwrap(), "0000");
397        assert_eq!(bounds.last().unwrap(), "0999");
398    }
399
400    #[test]
401    fn deterministic_serialise_independent_of_insert_order() {
402        let mut s1 = Statistics::new();
403        s1.set("z".into(), "c1".into(), mk_cs(0.0, 1, &["x"]));
404        s1.set("a".into(), "c2".into(), mk_cs(0.0, 1, &["y"]));
405        let mut s2 = Statistics::new();
406        s2.set("a".into(), "c2".into(), mk_cs(0.0, 1, &["y"]));
407        s2.set("z".into(), "c1".into(), mk_cs(0.0, 1, &["x"]));
408        assert_eq!(s1.serialize(), s2.serialize());
409    }
410
411    #[test]
412    fn n_distinct_estimator_within_5pct_on_uniform_corpus() {
413        // 10000 values with exactly 100 distinct values, repeated.
414        let mut vals: Vec<String> = Vec::with_capacity(10000);
415        for i in 0..10000 {
416            vals.push(alloc::format!("v{}", i % 100));
417        }
418        vals.sort();
419        let est = estimate_n_distinct(&vals);
420        // Linear-counting on a sorted complete sample returns the
421        // exact count, so this is 100.
422        assert_eq!(est, 100);
423    }
424
425    #[test]
426    fn clear_table_drops_only_target_rows() {
427        let mut s = Statistics::new();
428        s.set("a".into(), "c1".into(), mk_cs(0.0, 1, &["x"]));
429        s.set("a".into(), "c2".into(), mk_cs(0.0, 1, &["y"]));
430        s.set("b".into(), "c1".into(), mk_cs(0.0, 1, &["z"]));
431        s.clear_table("a");
432        assert_eq!(s.len(), 1);
433        assert!(s.get("a", "c1").is_none());
434        assert!(s.get("b", "c1").is_some());
435    }
436
437    #[test]
438    fn corrupt_short_read_errors() {
439        // num_columns = 1 but no payload.
440        let buf = 1u16.to_le_bytes();
441        let err = Statistics::deserialize(&buf).unwrap_err();
442        assert!(matches!(err, StatisticsError::Corrupt(_)));
443    }
444
445    #[test]
446    fn build_histogram_passthrough_when_sample_is_small() {
447        let vals: Vec<String> = (0..5).map(|i| alloc::format!("v{i}")).collect();
448        let bounds = build_histogram(&vals);
449        // <= 101 distinct values → every value is a bound.
450        assert_eq!(bounds.len(), 5);
451        assert_eq!(bounds[0], "v0");
452        assert_eq!(bounds[4], "v4");
453    }
454}