1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
//! Table-maintenance executors: `ANALYZE` (re-stat) and
//! `COMPACT COLD SEGMENTS` (cold-segment merge). Split out of
//! `lib.rs` (cut 19); verbatim move, the only edit beyond
//! visibility is reuniting the `exec_compact_cold_segments` doc,
//! whose first half had drifted above `set_session_param`.
use alloc::string::{String, ToString};
use alloc::vec::Vec;
use spg_storage::{ColumnSchema, CompactReport, DataType, IndexKind, Row, StorageError, Value};
use crate::{
COMPACTION_TARGET_DEFAULT_BYTES, Engine, EngineError, QueryResult, canonical_value_repr,
is_internal_table_name, sort_values_for_histogram, statistics,
};
impl Engine {
/// v6.2.0 — `ANALYZE [<table>]` runtime. Bare `ANALYZE` walks
/// every user table; `ANALYZE <name>` re-stats one. For each
/// target table, single-pass scan + per-column histogram +
/// `null_frac` + `n_distinct`. Replaces the table's prior
/// stats; resets the modified-row counter.
///
/// v6.2.0 doesn't sample — it scans the full table. v6.2.x
/// can add reservoir sampling at the > 100 K-row mark; not a
/// scope blocker for the current commit since rows ≤ 100 K
/// analyse in milliseconds.
pub(crate) fn exec_analyze(
&mut self,
target: Option<&str>,
) -> Result<QueryResult, EngineError> {
let names: Vec<String> = if let Some(name) = target {
// Verify the table exists; surface a clear error if not.
if self.catalog.get(name).is_none() {
return Err(EngineError::Storage(StorageError::TableNotFound {
name: name.to_string(),
}));
}
alloc::vec![name.to_string()]
} else {
self.catalog
.table_names()
.into_iter()
.filter(|n| !is_internal_table_name(n))
.collect()
};
let mut analysed = 0usize;
for table_name in &names {
self.analyze_one_table(table_name)?;
analysed += 1;
}
// v6.3.1 — plan cache invalidation. Bump stats version so
// future lookups see the new generation, and selectively
// evict every plan whose `source_tables` overlap with the
// ANALYZE target set. Bare ANALYZE (all tables) clears the
// whole cache.
if analysed > 0 {
self.statistics.bump_version();
if target.is_some() {
for t in &names {
self.plan_cache.evict_referencing(t);
}
} else {
self.plan_cache.clear();
}
}
Ok(QueryResult::CommandOk {
affected: analysed,
modified_catalog: true,
})
}
/// Walk a single table's rows once and (re-)populate per-column
/// stats. Drops the existing stats for `table` first so columns
/// that have been DROP-ed between ANALYZEs don't leave stale
/// rows.
fn analyze_one_table(&mut self, table_name: &str) -> Result<(), EngineError> {
let table = self.catalog.get(table_name).ok_or_else(|| {
EngineError::Storage(StorageError::TableNotFound {
name: table_name.to_string(),
})
})?;
let schema = table.schema().clone();
let row_count = table.rows().len();
// For each column, collect (sorted) non-NULL textual values
// + count NULLs; then ask `statistics::build_histogram` to
// produce the 101 bounds and `estimate_n_distinct` the
// distinct count.
self.statistics.clear_table(table_name);
for (col_pos, col_schema) in schema.columns.iter().enumerate() {
// v6.2.0 skip: vector columns have their own stats
// shape (HNSW graph topology). v6.2 deliberation #1.
if matches!(col_schema.ty, DataType::Vector { .. }) {
continue;
}
let mut non_null_values: Vec<Value> = Vec::with_capacity(row_count);
let mut nulls: u64 = 0;
for row in table.rows() {
match row.values.get(col_pos) {
Some(Value::Null) | None => nulls += 1,
Some(v) => non_null_values.push(v.clone()),
}
}
// Sort by type-aware ordering (Int as int, Text as
// lex, etc.) so histogram bounds reflect the column's
// natural order — not lexicographic on the string
// representation, which would put "9" after "49".
non_null_values.sort_by(|a, b| sort_values_for_histogram(a, b));
let non_null: Vec<String> = non_null_values.iter().map(canonical_value_repr).collect();
let null_frac = if row_count == 0 {
0.0
} else {
#[allow(clippy::cast_precision_loss)]
let f = nulls as f32 / row_count as f32;
f
};
let n_distinct = statistics::estimate_n_distinct(&non_null);
let histogram_bounds = statistics::build_histogram(&non_null);
self.statistics.set(
table_name.to_string(),
col_schema.name.clone(),
statistics::ColumnStats {
null_frac,
n_distinct,
histogram_bounds,
},
);
}
self.statistics.reset_modified(table_name);
// v6.7.0 — refresh the per-table cold_rows cache. Walk the
// BTree indices and count Cold locators (MAX across
// indices); store the result on the table. Surfaced via
// `spg_statistic.cold_row_count` (new column) and
// `spg_stat_segment.table_name` (new column).
let cold_count = {
let table = self
.active_catalog()
.get(table_name)
.expect("table still present");
table.count_cold_locators()
};
let table_mut = self
.active_catalog_mut()
.get_mut(table_name)
.expect("table still present");
table_mut.set_cold_row_count(cold_count);
Ok(())
}
/// v6.7.3 — `COMPACT COLD SEGMENTS` runtime path. Drives the
/// engine-layer compaction shim with the default
/// 4 MiB segment-size threshold. spg-server intercepts the
/// SQL before it reaches the engine on a server build —
/// it reads `SPG_COMPACTION_TARGET_SEGMENT_BYTES`, calls
/// `Engine::compact_cold_segments_with_target` directly with
/// the env value, and persists every merged segment to
/// `<db>.spg/segments/`. This arm only fires for engine-only
/// callers (spg-embedded, lib tests); in that mode merged
/// segments live in memory and are dropped at process exit.
pub(crate) fn exec_compact_cold_segments(&mut self) -> Result<QueryResult, EngineError> {
let target = COMPACTION_TARGET_DEFAULT_BYTES;
let reports = self.compact_cold_segments_with_target(target)?;
let columns = alloc::vec![
ColumnSchema::new("table_name", DataType::Text, false),
ColumnSchema::new("index_name", DataType::Text, false),
ColumnSchema::new("sources_merged", DataType::BigInt, false),
ColumnSchema::new("merged_segment_id", DataType::BigInt, false),
ColumnSchema::new("merged_rows", DataType::BigInt, false),
ColumnSchema::new("deleted_rows_pruned", DataType::BigInt, false),
ColumnSchema::new("bytes_reclaimed_estimate", DataType::BigInt, false),
];
let rows: Vec<Row> = reports
.into_iter()
.map(|(tname, iname, report)| {
Row::new(alloc::vec![
Value::Text(tname),
Value::Text(iname),
Value::BigInt(i64::try_from(report.sources.len()).unwrap_or(i64::MAX)),
Value::BigInt(i64::from(report.merged_segment_id.unwrap_or(0))),
Value::BigInt(i64::try_from(report.merged_rows).unwrap_or(i64::MAX)),
Value::BigInt(i64::try_from(report.deleted_rows_pruned).unwrap_or(i64::MAX),),
Value::BigInt(
i64::try_from(report.bytes_reclaimed_estimate).unwrap_or(i64::MAX),
),
])
})
.collect();
Ok(QueryResult::Rows { columns, rows })
}
/// v6.7.3 — public shim around `Catalog::compact_cold_segments`
/// driving every BTree index on every user table. Returns one
/// `(table, index, report)` triple for each merge that
/// actually happened (no-op (table, index) pairs are filtered
/// out so callers can size persist-side work to the live
/// merges). Caller is responsible for persisting each
/// `report.merged_segment_bytes` and updating the on-disk
/// segment registry; engine layer is no_std and never
/// touches disk.
///
/// Marks every touched table's cached `cold_row_count` stale
/// — compaction GC'd some shadowed rows, so the count must be
/// re-derived on the next ANALYZE.
pub fn compact_cold_segments_with_target(
&mut self,
target_segment_bytes: u64,
) -> Result<Vec<(String, String, CompactReport)>, EngineError> {
let table_names = self.active_catalog().table_names();
let mut reports: Vec<(String, String, CompactReport)> = Vec::new();
for tname in table_names {
if is_internal_table_name(&tname) {
continue;
}
let idx_names: Vec<String> = {
let Some(t) = self.active_catalog().get(&tname) else {
continue;
};
t.indices()
.iter()
.filter(|i| matches!(i.kind, IndexKind::BTree(_)))
.map(|i| i.name.clone())
.collect()
};
for iname in idx_names {
let report = self
.active_catalog_mut()
.compact_cold_segments(&tname, &iname, target_segment_bytes)
.map_err(EngineError::Storage)?;
if report.merged_segment_id.is_some() {
if let Some(t) = self.active_catalog_mut().get_mut(&tname) {
t.mark_cold_row_count_stale();
}
reports.push((tname.clone(), iname, report));
}
}
}
Ok(reports)
}
}