Skip to main content

spg_engine/
maintenance.rs

1//! Table-maintenance executors: `ANALYZE` (re-stat) and
2//! `COMPACT COLD SEGMENTS` (cold-segment merge). Split out of
3//! `lib.rs` (cut 19); verbatim move, the only edit beyond
4//! visibility is reuniting the `exec_compact_cold_segments` doc,
5//! whose first half had drifted above `set_session_param`.
6
7use alloc::string::{String, ToString};
8use alloc::vec::Vec;
9
10use spg_storage::{ColumnSchema, CompactReport, DataType, IndexKind, Row, StorageError, Value};
11
12use crate::{
13    COMPACTION_TARGET_DEFAULT_BYTES, Engine, EngineError, QueryResult, canonical_value_repr,
14    is_internal_table_name, sort_values_for_histogram, statistics,
15};
16
17impl Engine {
18    /// v6.2.0 — `ANALYZE [<table>]` runtime. Bare `ANALYZE` walks
19    /// every user table; `ANALYZE <name>` re-stats one. For each
20    /// target table, single-pass scan + per-column histogram +
21    /// `null_frac` + `n_distinct`. Replaces the table's prior
22    /// stats; resets the modified-row counter.
23    ///
24    /// v6.2.0 doesn't sample — it scans the full table. v6.2.x
25    /// can add reservoir sampling at the > 100 K-row mark; not a
26    /// scope blocker for the current commit since rows ≤ 100 K
27    /// analyse in milliseconds.
28    pub(crate) fn exec_analyze(
29        &mut self,
30        target: Option<&str>,
31    ) -> Result<QueryResult, EngineError> {
32        let names: Vec<String> = if let Some(name) = target {
33            // Verify the table exists; surface a clear error if not.
34            if self.catalog.get(name).is_none() {
35                return Err(EngineError::Storage(StorageError::TableNotFound {
36                    name: name.to_string(),
37                }));
38            }
39            alloc::vec![name.to_string()]
40        } else {
41            self.catalog
42                .table_names()
43                .into_iter()
44                .filter(|n| !is_internal_table_name(n))
45                .collect()
46        };
47        let mut analysed = 0usize;
48        for table_name in &names {
49            self.analyze_one_table(table_name)?;
50            analysed += 1;
51        }
52        // v6.3.1 — plan cache invalidation. Bump stats version so
53        // future lookups see the new generation, and selectively
54        // evict every plan whose `source_tables` overlap with the
55        // ANALYZE target set. Bare ANALYZE (all tables) clears the
56        // whole cache.
57        if analysed > 0 {
58            self.statistics.bump_version();
59            if target.is_some() {
60                for t in &names {
61                    self.plan_cache.evict_referencing(t);
62                }
63            } else {
64                self.plan_cache.clear();
65            }
66        }
67        Ok(QueryResult::CommandOk {
68            affected: analysed,
69            modified_catalog: true,
70        })
71    }
72
73    /// Walk a single table's rows once and (re-)populate per-column
74    /// stats. Drops the existing stats for `table` first so columns
75    /// that have been DROP-ed between ANALYZEs don't leave stale
76    /// rows.
77    fn analyze_one_table(&mut self, table_name: &str) -> Result<(), EngineError> {
78        let table = self.catalog.get(table_name).ok_or_else(|| {
79            EngineError::Storage(StorageError::TableNotFound {
80                name: table_name.to_string(),
81            })
82        })?;
83        let schema = table.schema().clone();
84        let row_count = table.rows().len();
85        // For each column, collect (sorted) non-NULL textual values
86        // + count NULLs; then ask `statistics::build_histogram` to
87        // produce the 101 bounds and `estimate_n_distinct` the
88        // distinct count.
89        self.statistics.clear_table(table_name);
90        for (col_pos, col_schema) in schema.columns.iter().enumerate() {
91            // v6.2.0 skip: vector columns have their own stats
92            // shape (HNSW graph topology). v6.2 deliberation #1.
93            if matches!(col_schema.ty, DataType::Vector { .. }) {
94                continue;
95            }
96            let mut non_null_values: Vec<Value> = Vec::with_capacity(row_count);
97            let mut nulls: u64 = 0;
98            for row in table.rows() {
99                match row.values.get(col_pos) {
100                    Some(Value::Null) | None => nulls += 1,
101                    Some(v) => non_null_values.push(v.clone()),
102                }
103            }
104            // Sort by type-aware ordering (Int as int, Text as
105            // lex, etc.) so histogram bounds reflect the column's
106            // natural order — not lexicographic on the string
107            // representation, which would put "9" after "49".
108            non_null_values.sort_by(|a, b| sort_values_for_histogram(a, b));
109            let non_null: Vec<String> = non_null_values.iter().map(canonical_value_repr).collect();
110            let null_frac = if row_count == 0 {
111                0.0
112            } else {
113                #[allow(clippy::cast_precision_loss)]
114                let f = nulls as f32 / row_count as f32;
115                f
116            };
117            let n_distinct = statistics::estimate_n_distinct(&non_null);
118            let histogram_bounds = statistics::build_histogram(&non_null);
119            self.statistics.set(
120                table_name.to_string(),
121                col_schema.name.clone(),
122                statistics::ColumnStats {
123                    null_frac,
124                    n_distinct,
125                    histogram_bounds,
126                },
127            );
128        }
129        self.statistics.reset_modified(table_name);
130        // v6.7.0 — refresh the per-table cold_rows cache. Walk the
131        // BTree indices and count Cold locators (MAX across
132        // indices); store the result on the table. Surfaced via
133        // `spg_statistic.cold_row_count` (new column) and
134        // `spg_stat_segment.table_name` (new column).
135        let cold_count = {
136            let table = self
137                .active_catalog()
138                .get(table_name)
139                .expect("table still present");
140            table.count_cold_locators()
141        };
142        let table_mut = self
143            .active_catalog_mut()
144            .get_mut(table_name)
145            .expect("table still present");
146        table_mut.set_cold_row_count(cold_count);
147        Ok(())
148    }
149
150    /// v6.7.3 — `COMPACT COLD SEGMENTS` runtime path. Drives the
151    /// engine-layer compaction shim with the default
152    /// 4 MiB segment-size threshold. spg-server intercepts the
153    /// SQL before it reaches the engine on a server build —
154    /// it reads `SPG_COMPACTION_TARGET_SEGMENT_BYTES`, calls
155    /// `Engine::compact_cold_segments_with_target` directly with
156    /// the env value, and persists every merged segment to
157    /// `<db>.spg/segments/`. This arm only fires for engine-only
158    /// callers (spg-embedded, lib tests); in that mode merged
159    /// segments live in memory and are dropped at process exit.
160    pub(crate) fn exec_compact_cold_segments(&mut self) -> Result<QueryResult, EngineError> {
161        let target = COMPACTION_TARGET_DEFAULT_BYTES;
162        let reports = self.compact_cold_segments_with_target(target)?;
163        let columns = alloc::vec![
164            ColumnSchema::new("table_name", DataType::Text, false),
165            ColumnSchema::new("index_name", DataType::Text, false),
166            ColumnSchema::new("sources_merged", DataType::BigInt, false),
167            ColumnSchema::new("merged_segment_id", DataType::BigInt, false),
168            ColumnSchema::new("merged_rows", DataType::BigInt, false),
169            ColumnSchema::new("deleted_rows_pruned", DataType::BigInt, false),
170            ColumnSchema::new("bytes_reclaimed_estimate", DataType::BigInt, false),
171        ];
172        let rows: Vec<Row> = reports
173            .into_iter()
174            .map(|(tname, iname, report)| {
175                Row::new(alloc::vec![
176                    Value::Text(tname),
177                    Value::Text(iname),
178                    Value::BigInt(i64::try_from(report.sources.len()).unwrap_or(i64::MAX)),
179                    Value::BigInt(i64::from(report.merged_segment_id.unwrap_or(0))),
180                    Value::BigInt(i64::try_from(report.merged_rows).unwrap_or(i64::MAX)),
181                    Value::BigInt(i64::try_from(report.deleted_rows_pruned).unwrap_or(i64::MAX),),
182                    Value::BigInt(
183                        i64::try_from(report.bytes_reclaimed_estimate).unwrap_or(i64::MAX),
184                    ),
185                ])
186            })
187            .collect();
188        Ok(QueryResult::Rows { columns, rows })
189    }
190
191    /// v6.7.3 — public shim around `Catalog::compact_cold_segments`
192    /// driving every BTree index on every user table. Returns one
193    /// `(table, index, report)` triple for each merge that
194    /// actually happened (no-op (table, index) pairs are filtered
195    /// out so callers can size persist-side work to the live
196    /// merges). Caller is responsible for persisting each
197    /// `report.merged_segment_bytes` and updating the on-disk
198    /// segment registry; engine layer is no_std and never
199    /// touches disk.
200    ///
201    /// Marks every touched table's cached `cold_row_count` stale
202    /// — compaction GC'd some shadowed rows, so the count must be
203    /// re-derived on the next ANALYZE.
204    pub fn compact_cold_segments_with_target(
205        &mut self,
206        target_segment_bytes: u64,
207    ) -> Result<Vec<(String, String, CompactReport)>, EngineError> {
208        let table_names = self.active_catalog().table_names();
209        let mut reports: Vec<(String, String, CompactReport)> = Vec::new();
210        for tname in table_names {
211            if is_internal_table_name(&tname) {
212                continue;
213            }
214            let idx_names: Vec<String> = {
215                let Some(t) = self.active_catalog().get(&tname) else {
216                    continue;
217                };
218                t.indices()
219                    .iter()
220                    .filter(|i| matches!(i.kind, IndexKind::BTree(_)))
221                    .map(|i| i.name.clone())
222                    .collect()
223            };
224            for iname in idx_names {
225                let report = self
226                    .active_catalog_mut()
227                    .compact_cold_segments(&tname, &iname, target_segment_bytes)
228                    .map_err(EngineError::Storage)?;
229                if report.merged_segment_id.is_some() {
230                    if let Some(t) = self.active_catalog_mut().get_mut(&tname) {
231                        t.mark_cold_row_count_stale();
232                    }
233                    reports.push((tname.clone(), iname, report));
234                }
235            }
236        }
237        Ok(reports)
238    }
239}