spg_engine/maintenance.rs
1//! Table-maintenance executors: `ANALYZE` (re-stat) and
2//! `COMPACT COLD SEGMENTS` (cold-segment merge). Split out of
3//! `lib.rs` (cut 19); verbatim move, the only edit beyond
4//! visibility is reuniting the `exec_compact_cold_segments` doc,
5//! whose first half had drifted above `set_session_param`.
6
7use alloc::string::{String, ToString};
8use alloc::vec::Vec;
9
10use spg_storage::{ColumnSchema, CompactReport, DataType, IndexKind, Row, StorageError, Value};
11
12use crate::{
13 COMPACTION_TARGET_DEFAULT_BYTES, Engine, EngineError, QueryResult, canonical_value_repr,
14 is_internal_table_name, sort_values_for_histogram, statistics,
15};
16
17impl Engine {
18 /// v6.2.0 — `ANALYZE [<table>]` runtime. Bare `ANALYZE` walks
19 /// every user table; `ANALYZE <name>` re-stats one. For each
20 /// target table, single-pass scan + per-column histogram +
21 /// `null_frac` + `n_distinct`. Replaces the table's prior
22 /// stats; resets the modified-row counter.
23 ///
24 /// v6.2.0 doesn't sample — it scans the full table. v6.2.x
25 /// can add reservoir sampling at the > 100 K-row mark; not a
26 /// scope blocker for the current commit since rows ≤ 100 K
27 /// analyse in milliseconds.
28 pub(crate) fn exec_analyze(
29 &mut self,
30 target: Option<&str>,
31 ) -> Result<QueryResult, EngineError> {
32 let names: Vec<String> = if let Some(name) = target {
33 // Verify the table exists; surface a clear error if not.
34 if self.catalog.get(name).is_none() {
35 return Err(EngineError::Storage(StorageError::TableNotFound {
36 name: name.to_string(),
37 }));
38 }
39 alloc::vec![name.to_string()]
40 } else {
41 self.catalog
42 .table_names()
43 .into_iter()
44 .filter(|n| !is_internal_table_name(n))
45 .collect()
46 };
47 let mut analysed = 0usize;
48 for table_name in &names {
49 self.analyze_one_table(table_name)?;
50 analysed += 1;
51 }
52 // v6.3.1 — plan cache invalidation. Bump stats version so
53 // future lookups see the new generation, and selectively
54 // evict every plan whose `source_tables` overlap with the
55 // ANALYZE target set. Bare ANALYZE (all tables) clears the
56 // whole cache.
57 if analysed > 0 {
58 self.statistics.bump_version();
59 if target.is_some() {
60 for t in &names {
61 self.plan_cache.evict_referencing(t);
62 }
63 } else {
64 self.plan_cache.clear();
65 }
66 }
67 Ok(QueryResult::CommandOk {
68 affected: analysed,
69 modified_catalog: true,
70 })
71 }
72
73 /// Walk a single table's rows once and (re-)populate per-column
74 /// stats. Drops the existing stats for `table` first so columns
75 /// that have been DROP-ed between ANALYZEs don't leave stale
76 /// rows.
77 fn analyze_one_table(&mut self, table_name: &str) -> Result<(), EngineError> {
78 let table = self.catalog.get(table_name).ok_or_else(|| {
79 EngineError::Storage(StorageError::TableNotFound {
80 name: table_name.to_string(),
81 })
82 })?;
83 let schema = table.schema().clone();
84 let row_count = table.rows().len();
85 // For each column, collect (sorted) non-NULL textual values
86 // + count NULLs; then ask `statistics::build_histogram` to
87 // produce the 101 bounds and `estimate_n_distinct` the
88 // distinct count.
89 self.statistics.clear_table(table_name);
90 for (col_pos, col_schema) in schema.columns.iter().enumerate() {
91 // v6.2.0 skip: vector columns have their own stats
92 // shape (HNSW graph topology). v6.2 deliberation #1.
93 if matches!(col_schema.ty, DataType::Vector { .. }) {
94 continue;
95 }
96 let mut non_null_values: Vec<Value> = Vec::with_capacity(row_count);
97 let mut nulls: u64 = 0;
98 for row in table.rows() {
99 match row.values.get(col_pos) {
100 Some(Value::Null) | None => nulls += 1,
101 Some(v) => non_null_values.push(v.clone()),
102 }
103 }
104 // Sort by type-aware ordering (Int as int, Text as
105 // lex, etc.) so histogram bounds reflect the column's
106 // natural order — not lexicographic on the string
107 // representation, which would put "9" after "49".
108 non_null_values.sort_by(|a, b| sort_values_for_histogram(a, b));
109 let non_null: Vec<String> = non_null_values.iter().map(canonical_value_repr).collect();
110 let null_frac = if row_count == 0 {
111 0.0
112 } else {
113 #[allow(clippy::cast_precision_loss)]
114 let f = nulls as f32 / row_count as f32;
115 f
116 };
117 let n_distinct = statistics::estimate_n_distinct(&non_null);
118 let histogram_bounds = statistics::build_histogram(&non_null);
119 self.statistics.set(
120 table_name.to_string(),
121 col_schema.name.clone(),
122 statistics::ColumnStats {
123 null_frac,
124 n_distinct,
125 histogram_bounds,
126 },
127 );
128 }
129 self.statistics.reset_modified(table_name);
130 // v6.7.0 — refresh the per-table cold_rows cache. Walk the
131 // BTree indices and count Cold locators (MAX across
132 // indices); store the result on the table. Surfaced via
133 // `spg_statistic.cold_row_count` (new column) and
134 // `spg_stat_segment.table_name` (new column).
135 let cold_count = {
136 let table = self
137 .active_catalog()
138 .get(table_name)
139 .expect("table still present");
140 table.count_cold_locators()
141 };
142 let table_mut = self
143 .active_catalog_mut()
144 .get_mut(table_name)
145 .expect("table still present");
146 table_mut.set_cold_row_count(cold_count);
147 Ok(())
148 }
149
150 /// v6.7.3 — `COMPACT COLD SEGMENTS` runtime path. Drives the
151 /// engine-layer compaction shim with the default
152 /// 4 MiB segment-size threshold. spg-server intercepts the
153 /// SQL before it reaches the engine on a server build —
154 /// it reads `SPG_COMPACTION_TARGET_SEGMENT_BYTES`, calls
155 /// `Engine::compact_cold_segments_with_target` directly with
156 /// the env value, and persists every merged segment to
157 /// `<db>.spg/segments/`. This arm only fires for engine-only
158 /// callers (spg-embedded, lib tests); in that mode merged
159 /// segments live in memory and are dropped at process exit.
160 pub(crate) fn exec_compact_cold_segments(&mut self) -> Result<QueryResult, EngineError> {
161 let target = COMPACTION_TARGET_DEFAULT_BYTES;
162 let reports = self.compact_cold_segments_with_target(target)?;
163 let columns = alloc::vec![
164 ColumnSchema::new("table_name", DataType::Text, false),
165 ColumnSchema::new("index_name", DataType::Text, false),
166 ColumnSchema::new("sources_merged", DataType::BigInt, false),
167 ColumnSchema::new("merged_segment_id", DataType::BigInt, false),
168 ColumnSchema::new("merged_rows", DataType::BigInt, false),
169 ColumnSchema::new("deleted_rows_pruned", DataType::BigInt, false),
170 ColumnSchema::new("bytes_reclaimed_estimate", DataType::BigInt, false),
171 ];
172 let rows: Vec<Row> = reports
173 .into_iter()
174 .map(|(tname, iname, report)| {
175 Row::new(alloc::vec![
176 Value::Text(tname),
177 Value::Text(iname),
178 Value::BigInt(i64::try_from(report.sources.len()).unwrap_or(i64::MAX)),
179 Value::BigInt(i64::from(report.merged_segment_id.unwrap_or(0))),
180 Value::BigInt(i64::try_from(report.merged_rows).unwrap_or(i64::MAX)),
181 Value::BigInt(i64::try_from(report.deleted_rows_pruned).unwrap_or(i64::MAX),),
182 Value::BigInt(
183 i64::try_from(report.bytes_reclaimed_estimate).unwrap_or(i64::MAX),
184 ),
185 ])
186 })
187 .collect();
188 Ok(QueryResult::Rows { columns, rows })
189 }
190
191 /// v6.7.3 — public shim around `Catalog::compact_cold_segments`
192 /// driving every BTree index on every user table. Returns one
193 /// `(table, index, report)` triple for each merge that
194 /// actually happened (no-op (table, index) pairs are filtered
195 /// out so callers can size persist-side work to the live
196 /// merges). Caller is responsible for persisting each
197 /// `report.merged_segment_bytes` and updating the on-disk
198 /// segment registry; engine layer is no_std and never
199 /// touches disk.
200 ///
201 /// Marks every touched table's cached `cold_row_count` stale
202 /// — compaction GC'd some shadowed rows, so the count must be
203 /// re-derived on the next ANALYZE.
204 pub fn compact_cold_segments_with_target(
205 &mut self,
206 target_segment_bytes: u64,
207 ) -> Result<Vec<(String, String, CompactReport)>, EngineError> {
208 let table_names = self.active_catalog().table_names();
209 let mut reports: Vec<(String, String, CompactReport)> = Vec::new();
210 for tname in table_names {
211 if is_internal_table_name(&tname) {
212 continue;
213 }
214 let idx_names: Vec<String> = {
215 let Some(t) = self.active_catalog().get(&tname) else {
216 continue;
217 };
218 t.indices()
219 .iter()
220 .filter(|i| matches!(i.kind, IndexKind::BTree(_)))
221 .map(|i| i.name.clone())
222 .collect()
223 };
224 for iname in idx_names {
225 let report = self
226 .active_catalog_mut()
227 .compact_cold_segments(&tname, &iname, target_segment_bytes)
228 .map_err(EngineError::Storage)?;
229 if report.merged_segment_id.is_some() {
230 if let Some(t) = self.active_catalog_mut().get_mut(&tname) {
231 t.mark_cold_row_count_stale();
232 }
233 reports.push((tname.clone(), iname, report));
234 }
235 }
236 }
237 Ok(reports)
238 }
239}