1use alloc::collections::BTreeMap;
9use alloc::string::String;
10use alloc::vec::Vec;
11
12use spg_storage::{ColumnSchema, DataType, Row, Value};
13
14use crate::{
15 ActivityProvider, AuditChainProvider, AuditVerifier, Engine, MemoryStats, QueryResult,
16 SlowQueryLogger, TableMemoryStats, approx_row_bytes, is_internal_table_name,
17 render_create_table, render_histogram_bounds,
18};
19use crate::{query_stats, statistics};
20
21impl Engine {
22 pub(crate) fn exec_spg_statistic(&self) -> QueryResult {
27 let columns = alloc::vec![
28 ColumnSchema::new("table_name", DataType::Text, false),
29 ColumnSchema::new("column_name", DataType::Text, false),
30 ColumnSchema::new("null_frac", DataType::Float, false),
31 ColumnSchema::new("n_distinct", DataType::BigInt, false),
32 ColumnSchema::new("histogram_bounds", DataType::Text, false),
33 ColumnSchema::new("cold_row_count", DataType::BigInt, false),
38 ];
39 let rows: Vec<Row> = self
40 .statistics
41 .iter()
42 .map(|((t, c), s)| {
43 let cold = self
44 .catalog
45 .get(t)
46 .map_or(0, |table| table.cold_row_count());
47 Row::new(alloc::vec![
48 Value::Text(t.clone()),
49 Value::Text(c.clone()),
50 Value::Float(f64::from(s.null_frac)),
51 Value::BigInt(i64::try_from(s.n_distinct).unwrap_or(i64::MAX)),
52 Value::Text(render_histogram_bounds(&s.histogram_bounds)),
53 Value::BigInt(i64::try_from(cold).unwrap_or(i64::MAX)),
54 ])
55 })
56 .collect();
57 QueryResult::Rows { columns, rows }
58 }
59
60 pub(crate) fn exec_spg_stat_replication(&self) -> QueryResult {
67 let columns = alloc::vec![
68 ColumnSchema::new("name", DataType::Text, false),
69 ColumnSchema::new("conn_str", DataType::Text, false),
70 ColumnSchema::new("publications", DataType::Text, false),
71 ColumnSchema::new("last_received_pos", DataType::BigInt, false),
72 ColumnSchema::new("enabled", DataType::Bool, false),
73 ];
74 let rows: Vec<Row> = self
75 .subscriptions
76 .iter()
77 .map(|(name, sub)| {
78 Row::new(alloc::vec![
79 Value::Text(name.clone()),
80 Value::Text(sub.conn_str.clone()),
81 Value::Text(sub.publications.join(",")),
82 Value::BigInt(i64::try_from(sub.last_received_pos).unwrap_or(i64::MAX)),
83 Value::Bool(sub.enabled),
84 ])
85 })
86 .collect();
87 QueryResult::Rows { columns, rows }
88 }
89
90 pub fn memory_stats(&self) -> MemoryStats {
105 let mut tables: Vec<TableMemoryStats> = Vec::new();
106 let (mut total_enc, mut total_res, mut total_idx) = (0u64, 0u64, 0u64);
107 for tname in self.catalog.table_names() {
108 if is_internal_table_name(&tname) {
109 continue;
110 }
111 let Some(t) = self.catalog.get(&tname) else {
112 continue;
113 };
114 let resident: u64 = t.rows().iter().map(|r| approx_row_bytes(r) as u64).sum();
115 let mut idx_bytes: u64 = 0;
121 for idx in t.indices() {
122 idx_bytes += idx.kind.approx_resident_bytes();
123 }
124 total_enc += t.hot_bytes();
125 total_res += resident;
126 total_idx += idx_bytes;
127 tables.push(TableMemoryStats {
128 name: tname.clone(),
129 hot_rows: t.rows().len() as u64,
130 cold_rows: t.cold_row_count(),
131 hot_encoded_bytes: t.hot_bytes(),
132 approx_resident_bytes: resident,
133 index_count: t.indices().len() as u64,
134 approx_index_bytes: idx_bytes,
135 });
136 }
137 MemoryStats {
138 tables,
139 total_hot_encoded_bytes: total_enc,
140 total_approx_resident_bytes: total_res,
141 total_approx_index_bytes: total_idx,
142 max_query_bytes: self.max_query_bytes,
143 wal_bytes: None,
146 }
147 }
148
149 pub(crate) fn exec_spg_memory_stats(&self) -> QueryResult {
153 let columns = alloc::vec![
154 ColumnSchema::new("table_name", DataType::Text, false),
155 ColumnSchema::new("hot_rows", DataType::BigInt, false),
156 ColumnSchema::new("cold_rows", DataType::BigInt, false),
157 ColumnSchema::new("hot_encoded_bytes", DataType::BigInt, false),
158 ColumnSchema::new("approx_resident_bytes", DataType::BigInt, false),
159 ColumnSchema::new("index_count", DataType::BigInt, false),
160 ColumnSchema::new("approx_index_bytes", DataType::BigInt, false),
161 ];
162 #[allow(clippy::cast_possible_wrap)]
163 let rows: Vec<Row> = self
164 .memory_stats()
165 .tables
166 .into_iter()
167 .map(|t| {
168 Row::new(alloc::vec![
169 Value::Text(t.name),
170 Value::BigInt(t.hot_rows as i64),
171 Value::BigInt(t.cold_rows as i64),
172 Value::BigInt(t.hot_encoded_bytes as i64),
173 Value::BigInt(t.approx_resident_bytes as i64),
174 Value::BigInt(t.index_count as i64),
175 Value::BigInt(t.approx_index_bytes as i64),
176 ])
177 })
178 .collect();
179 QueryResult::Rows { columns, rows }
180 }
181
182 pub(crate) fn exec_spg_stat_segment(&self) -> QueryResult {
183 let columns = alloc::vec![
184 ColumnSchema::new("segment_id", DataType::BigInt, false),
185 ColumnSchema::new("table_name", DataType::Text, false),
186 ColumnSchema::new("num_rows", DataType::BigInt, false),
187 ColumnSchema::new("num_pages", DataType::BigInt, false),
188 ColumnSchema::new("total_bytes", DataType::BigInt, false),
189 ];
190 let mut segment_owners: alloc::collections::BTreeMap<u32, String> = BTreeMap::new();
196 for tname in self.catalog.table_names() {
197 if is_internal_table_name(&tname) {
198 continue;
199 }
200 let Some(t) = self.catalog.get(&tname) else {
201 continue;
202 };
203 for idx in t.indices() {
204 if let spg_storage::IndexKind::BTree(map) = &idx.kind {
205 for (_, locs) in map.iter() {
206 for loc in locs {
207 if let spg_storage::RowLocator::Cold { segment_id, .. } = loc {
208 segment_owners
209 .entry(*segment_id)
210 .or_insert_with(|| tname.clone());
211 }
212 }
213 }
214 }
215 }
216 }
217 let rows: Vec<Row> = self
218 .catalog
219 .cold_segment_ids_global()
220 .iter()
221 .filter_map(|&id| {
222 let seg = self.catalog.cold_segment(id)?;
223 let meta = seg.meta();
224 let owner = segment_owners.get(&id).cloned().unwrap_or_default();
225 Some(Row::new(alloc::vec![
226 Value::BigInt(i64::from(id)),
227 Value::Text(owner),
228 Value::BigInt(i64::try_from(meta.num_rows).unwrap_or(i64::MAX)),
229 Value::BigInt(i64::from(meta.num_pages)),
230 Value::BigInt(i64::try_from(meta.total_bytes).unwrap_or(i64::MAX)),
231 ]))
232 })
233 .collect();
234 QueryResult::Rows { columns, rows }
235 }
236
237 pub(crate) fn exec_spg_stat_query(&self) -> QueryResult {
243 let columns = alloc::vec![
244 ColumnSchema::new("sql", DataType::Text, false),
245 ColumnSchema::new("exec_count", DataType::BigInt, false),
246 ColumnSchema::new("total_us", DataType::BigInt, false),
247 ColumnSchema::new("mean_us", DataType::BigInt, false),
248 ColumnSchema::new("max_us", DataType::BigInt, false),
249 ColumnSchema::new("last_seen_us", DataType::BigInt, false),
250 ];
251 let rows: Vec<Row> = self
252 .query_stats
253 .snapshot()
254 .into_iter()
255 .map(|(sql, s)| {
256 let mean = if s.exec_count == 0 {
257 0
258 } else {
259 s.total_us / s.exec_count
260 };
261 Row::new(alloc::vec![
262 Value::Text(sql),
263 Value::BigInt(i64::try_from(s.exec_count).unwrap_or(i64::MAX)),
264 Value::BigInt(i64::try_from(s.total_us).unwrap_or(i64::MAX)),
265 Value::BigInt(i64::try_from(mean).unwrap_or(i64::MAX)),
266 Value::BigInt(i64::try_from(s.max_us).unwrap_or(i64::MAX)),
267 Value::BigInt(i64::try_from(s.last_seen_us).unwrap_or(i64::MAX)),
268 ])
269 })
270 .collect();
271 QueryResult::Rows { columns, rows }
272 }
273
274 #[must_use]
279 pub const fn with_activity_provider(mut self, f: ActivityProvider) -> Self {
280 self.activity_provider = Some(f);
281 self
282 }
283
284 #[must_use]
286 pub const fn with_audit_providers(
287 mut self,
288 chain: AuditChainProvider,
289 verify: AuditVerifier,
290 ) -> Self {
291 self.audit_chain_provider = Some(chain);
292 self.audit_verifier = Some(verify);
293 self
294 }
295
296 #[must_use]
301 pub const fn with_slow_query_log(mut self, threshold_us: u64, logger: SlowQueryLogger) -> Self {
302 self.slow_query_threshold_us = Some(threshold_us);
303 self.slow_query_logger = Some(logger);
304 self
305 }
306
307 pub fn set_plan_cache_max(&mut self, n: usize) {
311 self.plan_cache.set_max_entries(n);
312 }
313
314 pub(crate) fn exec_spg_stat_activity(&self) -> QueryResult {
319 let columns = alloc::vec![
320 ColumnSchema::new("pid", DataType::Int, false),
321 ColumnSchema::new("user", DataType::Text, false),
322 ColumnSchema::new("started_at_us", DataType::BigInt, false),
323 ColumnSchema::new("current_sql", DataType::Text, false),
324 ColumnSchema::new("wait_event", DataType::Text, false),
325 ColumnSchema::new("elapsed_us", DataType::BigInt, false),
326 ColumnSchema::new("in_transaction", DataType::Bool, false),
327 ColumnSchema::new("application_name", DataType::Text, false),
328 ];
329 let rows: Vec<Row> = self
330 .activity_provider
331 .map(|f| f())
332 .unwrap_or_default()
333 .into_iter()
334 .map(|r| {
335 Row::new(alloc::vec![
336 Value::Int(i32::try_from(r.pid).unwrap_or(i32::MAX)),
337 Value::Text(r.user),
338 Value::BigInt(r.started_at_us),
339 Value::Text(r.current_sql),
340 Value::Text(r.wait_event),
341 Value::BigInt(r.elapsed_us),
342 Value::Bool(r.in_transaction),
343 Value::Text(r.application_name),
344 ])
345 })
346 .collect();
347 QueryResult::Rows { columns, rows }
348 }
349
350 pub(crate) fn exec_spg_table_ddl(&self) -> QueryResult {
354 let columns = alloc::vec![
355 ColumnSchema::new("table_name", DataType::Text, false),
356 ColumnSchema::new("ddl", DataType::Text, false),
357 ];
358 let rows: Vec<Row> = self
359 .catalog
360 .table_names()
361 .into_iter()
362 .filter(|n| !is_internal_table_name(n))
363 .filter_map(|name| {
364 let table = self.catalog.get(&name)?;
365 let ddl = render_create_table(&name, &table.schema().columns);
366 Some(Row::new(alloc::vec![Value::Text(name), Value::Text(ddl),]))
367 })
368 .collect();
369 QueryResult::Rows { columns, rows }
370 }
371
372 pub(crate) fn exec_spg_role_ddl(&self) -> QueryResult {
376 let columns = alloc::vec![
377 ColumnSchema::new("role_name", DataType::Text, false),
378 ColumnSchema::new("ddl", DataType::Text, false),
379 ];
380 let rows: Vec<Row> = self
381 .users
382 .iter()
383 .map(|(name, rec)| {
384 let ddl = alloc::format!(
385 "CREATE USER {name} WITH PASSWORD '<redacted>' ROLE '{}'",
386 rec.role.as_str(),
387 );
388 Row::new(alloc::vec![
389 Value::Text(String::from(name)),
390 Value::Text(ddl)
391 ])
392 })
393 .collect();
394 QueryResult::Rows { columns, rows }
395 }
396
397 pub(crate) fn exec_spg_database_ddl(&self) -> QueryResult {
403 let columns = alloc::vec![ColumnSchema::new("ddl", DataType::Text, false)];
404 let mut out = String::new();
405 for (name, rec) in self.users.iter() {
406 out.push_str(&alloc::format!(
407 "CREATE USER {name} WITH PASSWORD '<redacted>' ROLE '{}';\n",
408 rec.role.as_str(),
409 ));
410 }
411 for name in self.catalog.table_names() {
412 if is_internal_table_name(&name) {
413 continue;
414 }
415 if let Some(table) = self.catalog.get(&name) {
416 out.push_str(&render_create_table(&name, &table.schema().columns));
417 out.push_str(";\n");
418 }
419 }
420 QueryResult::Rows {
421 columns,
422 rows: alloc::vec![Row::new(alloc::vec![Value::Text(out)])],
423 }
424 }
425
426 pub(crate) fn exec_spg_audit_chain(&self) -> QueryResult {
430 let columns = alloc::vec![
431 ColumnSchema::new("seq", DataType::BigInt, false),
432 ColumnSchema::new("ts_ms", DataType::BigInt, false),
433 ColumnSchema::new("prev_hash", DataType::Text, false),
434 ColumnSchema::new("entry_hash", DataType::Text, false),
435 ColumnSchema::new("sql", DataType::Text, false),
436 ];
437 let rows: Vec<Row> = self
438 .audit_chain_provider
439 .map(|f| f())
440 .unwrap_or_default()
441 .into_iter()
442 .map(|r| {
443 Row::new(alloc::vec![
444 Value::BigInt(r.seq),
445 Value::BigInt(r.ts_ms),
446 Value::Text(r.prev_hash_hex),
447 Value::Text(r.entry_hash_hex),
448 Value::Text(r.sql),
449 ])
450 })
451 .collect();
452 QueryResult::Rows { columns, rows }
453 }
454
455 pub(crate) fn exec_spg_audit_verify(&self) -> QueryResult {
461 let columns = alloc::vec![
462 ColumnSchema::new("verified_count", DataType::BigInt, false),
463 ColumnSchema::new("broken_at_seq", DataType::BigInt, false),
464 ];
465 let (verified, broken) = self.audit_verifier.map(|f| f()).unwrap_or((0, -1));
466 let row = Row::new(alloc::vec![Value::BigInt(verified), Value::BigInt(broken),]);
467 QueryResult::Rows {
468 columns,
469 rows: alloc::vec![row],
470 }
471 }
472
473 pub fn query_stats(&self) -> &query_stats::QueryStats {
475 &self.query_stats
476 }
477
478 pub fn query_stats_mut(&mut self) -> &mut query_stats::QueryStats {
480 &mut self.query_stats
481 }
482
483 pub const fn statistics(&self) -> &statistics::Statistics {
487 &self.statistics
488 }
489
490 pub fn tables_needing_analyze(&self) -> Vec<String> {
503 const MIN_ROWS: u64 = 100;
504 let mut out = Vec::new();
505 for name in self.catalog.table_names() {
506 if is_internal_table_name(&name) {
507 continue;
508 }
509 let Some(table) = self.catalog.get(&name) else {
510 continue;
511 };
512 let row_count = table.rows().len() as u64;
513 let modified = self.statistics.modified_since_last_analyze(&name);
514 let base = row_count.max(MIN_ROWS);
519 let threshold = base.saturating_add(9) / 10;
520 if modified >= threshold {
521 out.push(name);
522 }
523 }
524 out
525 }
526}