1use alloc::collections::BTreeMap;
9use alloc::string::String;
10use alloc::vec::Vec;
11
12use spg_storage::{ColumnSchema, DataType, Row, Value};
13
14use crate::{
15 ActivityProvider, AuditChainProvider, AuditVerifier, Engine, MemoryStats, QueryResult,
16 SlowQueryLogger, TableMemoryStats, approx_row_bytes, is_internal_table_name,
17 render_create_table, render_histogram_bounds,
18};
19use crate::{query_stats, statistics};
20
21impl Engine {
22 pub(crate) fn exec_spg_statistic(&self) -> QueryResult {
27 let columns = alloc::vec![
28 ColumnSchema::new("table_name", DataType::Text, false),
29 ColumnSchema::new("column_name", DataType::Text, false),
30 ColumnSchema::new("null_frac", DataType::Float, false),
31 ColumnSchema::new("n_distinct", DataType::BigInt, false),
32 ColumnSchema::new("histogram_bounds", DataType::Text, false),
33 ColumnSchema::new("cold_row_count", DataType::BigInt, false),
38 ];
39 let rows: Vec<Row> = self
40 .statistics
41 .iter()
42 .map(|((t, c), s)| {
43 let cold = self
44 .catalog
45 .get(t)
46 .map_or(0, |table| table.cold_row_count());
47 Row::new(alloc::vec![
48 Value::Text(t.clone()),
49 Value::Text(c.clone()),
50 Value::Float(f64::from(s.null_frac)),
51 Value::BigInt(i64::try_from(s.n_distinct).unwrap_or(i64::MAX)),
52 Value::Text(render_histogram_bounds(&s.histogram_bounds)),
53 Value::BigInt(i64::try_from(cold).unwrap_or(i64::MAX)),
54 ])
55 })
56 .collect();
57 QueryResult::Rows { columns, rows }
58 }
59
60 pub(crate) fn exec_spg_stat_replication(&self) -> QueryResult {
67 let columns = alloc::vec![
68 ColumnSchema::new("name", DataType::Text, false),
69 ColumnSchema::new("conn_str", DataType::Text, false),
70 ColumnSchema::new("publications", DataType::Text, false),
71 ColumnSchema::new("last_received_pos", DataType::BigInt, false),
72 ColumnSchema::new("enabled", DataType::Bool, false),
73 ];
74 let rows: Vec<Row> = self
75 .subscriptions
76 .iter()
77 .map(|(name, sub)| {
78 Row::new(alloc::vec![
79 Value::Text(name.clone()),
80 Value::Text(sub.conn_str.clone()),
81 Value::Text(sub.publications.join(",")),
82 Value::BigInt(i64::try_from(sub.last_received_pos).unwrap_or(i64::MAX)),
83 Value::Bool(sub.enabled),
84 ])
85 })
86 .collect();
87 QueryResult::Rows { columns, rows }
88 }
89
90 pub fn memory_stats(&self) -> MemoryStats {
105 let mut tables: Vec<TableMemoryStats> = Vec::new();
106 let (mut total_enc, mut total_res, mut total_idx) = (0u64, 0u64, 0u64);
107 for tname in self.catalog.table_names() {
108 if is_internal_table_name(&tname) {
109 continue;
110 }
111 let Some(t) = self.catalog.get(&tname) else {
112 continue;
113 };
114 let resident: u64 = t.rows().iter().map(|r| approx_row_bytes(r) as u64).sum();
115 let mut idx_bytes: u64 = 0;
116 for idx in t.indices() {
117 idx_bytes += match &idx.kind {
118 spg_storage::IndexKind::BTree(map) => {
119 let mut b: u64 = 0;
120 for (_, locs) in map.iter() {
121 b += (core::mem::size_of::<spg_storage::IndexKey>()
122 + 24
123 + locs.len() * core::mem::size_of::<spg_storage::RowLocator>())
124 as u64;
125 }
126 b
127 }
128 spg_storage::IndexKind::Nsw(g) => {
131 (g.levels.len() * (g.m_max_0 * 8 + 16)) as u64
132 }
133 _ => 1024,
135 };
136 }
137 total_enc += t.hot_bytes();
138 total_res += resident;
139 total_idx += idx_bytes;
140 tables.push(TableMemoryStats {
141 name: tname.clone(),
142 hot_rows: t.rows().len() as u64,
143 cold_rows: t.cold_row_count(),
144 hot_encoded_bytes: t.hot_bytes(),
145 approx_resident_bytes: resident,
146 index_count: t.indices().len() as u64,
147 approx_index_bytes: idx_bytes,
148 });
149 }
150 MemoryStats {
151 tables,
152 total_hot_encoded_bytes: total_enc,
153 total_approx_resident_bytes: total_res,
154 total_approx_index_bytes: total_idx,
155 max_query_bytes: self.max_query_bytes,
156 }
157 }
158
159 pub(crate) fn exec_spg_memory_stats(&self) -> QueryResult {
163 let columns = alloc::vec![
164 ColumnSchema::new("table_name", DataType::Text, false),
165 ColumnSchema::new("hot_rows", DataType::BigInt, false),
166 ColumnSchema::new("cold_rows", DataType::BigInt, false),
167 ColumnSchema::new("hot_encoded_bytes", DataType::BigInt, false),
168 ColumnSchema::new("approx_resident_bytes", DataType::BigInt, false),
169 ColumnSchema::new("index_count", DataType::BigInt, false),
170 ColumnSchema::new("approx_index_bytes", DataType::BigInt, false),
171 ];
172 #[allow(clippy::cast_possible_wrap)]
173 let rows: Vec<Row> = self
174 .memory_stats()
175 .tables
176 .into_iter()
177 .map(|t| {
178 Row::new(alloc::vec![
179 Value::Text(t.name),
180 Value::BigInt(t.hot_rows as i64),
181 Value::BigInt(t.cold_rows as i64),
182 Value::BigInt(t.hot_encoded_bytes as i64),
183 Value::BigInt(t.approx_resident_bytes as i64),
184 Value::BigInt(t.index_count as i64),
185 Value::BigInt(t.approx_index_bytes as i64),
186 ])
187 })
188 .collect();
189 QueryResult::Rows { columns, rows }
190 }
191
192 pub(crate) fn exec_spg_stat_segment(&self) -> QueryResult {
193 let columns = alloc::vec![
194 ColumnSchema::new("segment_id", DataType::BigInt, false),
195 ColumnSchema::new("table_name", DataType::Text, false),
196 ColumnSchema::new("num_rows", DataType::BigInt, false),
197 ColumnSchema::new("num_pages", DataType::BigInt, false),
198 ColumnSchema::new("total_bytes", DataType::BigInt, false),
199 ];
200 let mut segment_owners: alloc::collections::BTreeMap<u32, String> = BTreeMap::new();
206 for tname in self.catalog.table_names() {
207 if is_internal_table_name(&tname) {
208 continue;
209 }
210 let Some(t) = self.catalog.get(&tname) else {
211 continue;
212 };
213 for idx in t.indices() {
214 if let spg_storage::IndexKind::BTree(map) = &idx.kind {
215 for (_, locs) in map.iter() {
216 for loc in locs {
217 if let spg_storage::RowLocator::Cold { segment_id, .. } = loc {
218 segment_owners
219 .entry(*segment_id)
220 .or_insert_with(|| tname.clone());
221 }
222 }
223 }
224 }
225 }
226 }
227 let rows: Vec<Row> = self
228 .catalog
229 .cold_segment_ids_global()
230 .iter()
231 .filter_map(|&id| {
232 let seg = self.catalog.cold_segment(id)?;
233 let meta = seg.meta();
234 let owner = segment_owners.get(&id).cloned().unwrap_or_default();
235 Some(Row::new(alloc::vec![
236 Value::BigInt(i64::from(id)),
237 Value::Text(owner),
238 Value::BigInt(i64::try_from(meta.num_rows).unwrap_or(i64::MAX)),
239 Value::BigInt(i64::from(meta.num_pages)),
240 Value::BigInt(i64::try_from(meta.total_bytes).unwrap_or(i64::MAX)),
241 ]))
242 })
243 .collect();
244 QueryResult::Rows { columns, rows }
245 }
246
247 pub(crate) fn exec_spg_stat_query(&self) -> QueryResult {
253 let columns = alloc::vec![
254 ColumnSchema::new("sql", DataType::Text, false),
255 ColumnSchema::new("exec_count", DataType::BigInt, false),
256 ColumnSchema::new("total_us", DataType::BigInt, false),
257 ColumnSchema::new("mean_us", DataType::BigInt, false),
258 ColumnSchema::new("max_us", DataType::BigInt, false),
259 ColumnSchema::new("last_seen_us", DataType::BigInt, false),
260 ];
261 let rows: Vec<Row> = self
262 .query_stats
263 .snapshot()
264 .into_iter()
265 .map(|(sql, s)| {
266 let mean = if s.exec_count == 0 {
267 0
268 } else {
269 s.total_us / s.exec_count
270 };
271 Row::new(alloc::vec![
272 Value::Text(sql),
273 Value::BigInt(i64::try_from(s.exec_count).unwrap_or(i64::MAX)),
274 Value::BigInt(i64::try_from(s.total_us).unwrap_or(i64::MAX)),
275 Value::BigInt(i64::try_from(mean).unwrap_or(i64::MAX)),
276 Value::BigInt(i64::try_from(s.max_us).unwrap_or(i64::MAX)),
277 Value::BigInt(i64::try_from(s.last_seen_us).unwrap_or(i64::MAX)),
278 ])
279 })
280 .collect();
281 QueryResult::Rows { columns, rows }
282 }
283
284 #[must_use]
289 pub const fn with_activity_provider(mut self, f: ActivityProvider) -> Self {
290 self.activity_provider = Some(f);
291 self
292 }
293
294 #[must_use]
296 pub const fn with_audit_providers(
297 mut self,
298 chain: AuditChainProvider,
299 verify: AuditVerifier,
300 ) -> Self {
301 self.audit_chain_provider = Some(chain);
302 self.audit_verifier = Some(verify);
303 self
304 }
305
306 #[must_use]
311 pub const fn with_slow_query_log(mut self, threshold_us: u64, logger: SlowQueryLogger) -> Self {
312 self.slow_query_threshold_us = Some(threshold_us);
313 self.slow_query_logger = Some(logger);
314 self
315 }
316
317 pub fn set_plan_cache_max(&mut self, n: usize) {
321 self.plan_cache.set_max_entries(n);
322 }
323
324 pub(crate) fn exec_spg_stat_activity(&self) -> QueryResult {
329 let columns = alloc::vec![
330 ColumnSchema::new("pid", DataType::Int, false),
331 ColumnSchema::new("user", DataType::Text, false),
332 ColumnSchema::new("started_at_us", DataType::BigInt, false),
333 ColumnSchema::new("current_sql", DataType::Text, false),
334 ColumnSchema::new("wait_event", DataType::Text, false),
335 ColumnSchema::new("elapsed_us", DataType::BigInt, false),
336 ColumnSchema::new("in_transaction", DataType::Bool, false),
337 ColumnSchema::new("application_name", DataType::Text, false),
338 ];
339 let rows: Vec<Row> = self
340 .activity_provider
341 .map(|f| f())
342 .unwrap_or_default()
343 .into_iter()
344 .map(|r| {
345 Row::new(alloc::vec![
346 Value::Int(i32::try_from(r.pid).unwrap_or(i32::MAX)),
347 Value::Text(r.user),
348 Value::BigInt(r.started_at_us),
349 Value::Text(r.current_sql),
350 Value::Text(r.wait_event),
351 Value::BigInt(r.elapsed_us),
352 Value::Bool(r.in_transaction),
353 Value::Text(r.application_name),
354 ])
355 })
356 .collect();
357 QueryResult::Rows { columns, rows }
358 }
359
360 pub(crate) fn exec_spg_table_ddl(&self) -> QueryResult {
364 let columns = alloc::vec![
365 ColumnSchema::new("table_name", DataType::Text, false),
366 ColumnSchema::new("ddl", DataType::Text, false),
367 ];
368 let rows: Vec<Row> = self
369 .catalog
370 .table_names()
371 .into_iter()
372 .filter(|n| !is_internal_table_name(n))
373 .filter_map(|name| {
374 let table = self.catalog.get(&name)?;
375 let ddl = render_create_table(&name, &table.schema().columns);
376 Some(Row::new(alloc::vec![Value::Text(name), Value::Text(ddl),]))
377 })
378 .collect();
379 QueryResult::Rows { columns, rows }
380 }
381
382 pub(crate) fn exec_spg_role_ddl(&self) -> QueryResult {
386 let columns = alloc::vec![
387 ColumnSchema::new("role_name", DataType::Text, false),
388 ColumnSchema::new("ddl", DataType::Text, false),
389 ];
390 let rows: Vec<Row> = self
391 .users
392 .iter()
393 .map(|(name, rec)| {
394 let ddl = alloc::format!(
395 "CREATE USER {name} WITH PASSWORD '<redacted>' ROLE '{}'",
396 rec.role.as_str(),
397 );
398 Row::new(alloc::vec![
399 Value::Text(String::from(name)),
400 Value::Text(ddl)
401 ])
402 })
403 .collect();
404 QueryResult::Rows { columns, rows }
405 }
406
407 pub(crate) fn exec_spg_database_ddl(&self) -> QueryResult {
413 let columns = alloc::vec![ColumnSchema::new("ddl", DataType::Text, false)];
414 let mut out = String::new();
415 for (name, rec) in self.users.iter() {
416 out.push_str(&alloc::format!(
417 "CREATE USER {name} WITH PASSWORD '<redacted>' ROLE '{}';\n",
418 rec.role.as_str(),
419 ));
420 }
421 for name in self.catalog.table_names() {
422 if is_internal_table_name(&name) {
423 continue;
424 }
425 if let Some(table) = self.catalog.get(&name) {
426 out.push_str(&render_create_table(&name, &table.schema().columns));
427 out.push_str(";\n");
428 }
429 }
430 QueryResult::Rows {
431 columns,
432 rows: alloc::vec![Row::new(alloc::vec![Value::Text(out)])],
433 }
434 }
435
436 pub(crate) fn exec_spg_audit_chain(&self) -> QueryResult {
440 let columns = alloc::vec![
441 ColumnSchema::new("seq", DataType::BigInt, false),
442 ColumnSchema::new("ts_ms", DataType::BigInt, false),
443 ColumnSchema::new("prev_hash", DataType::Text, false),
444 ColumnSchema::new("entry_hash", DataType::Text, false),
445 ColumnSchema::new("sql", DataType::Text, false),
446 ];
447 let rows: Vec<Row> = self
448 .audit_chain_provider
449 .map(|f| f())
450 .unwrap_or_default()
451 .into_iter()
452 .map(|r| {
453 Row::new(alloc::vec![
454 Value::BigInt(r.seq),
455 Value::BigInt(r.ts_ms),
456 Value::Text(r.prev_hash_hex),
457 Value::Text(r.entry_hash_hex),
458 Value::Text(r.sql),
459 ])
460 })
461 .collect();
462 QueryResult::Rows { columns, rows }
463 }
464
465 pub(crate) fn exec_spg_audit_verify(&self) -> QueryResult {
471 let columns = alloc::vec![
472 ColumnSchema::new("verified_count", DataType::BigInt, false),
473 ColumnSchema::new("broken_at_seq", DataType::BigInt, false),
474 ];
475 let (verified, broken) = self.audit_verifier.map(|f| f()).unwrap_or((0, -1));
476 let row = Row::new(alloc::vec![Value::BigInt(verified), Value::BigInt(broken),]);
477 QueryResult::Rows {
478 columns,
479 rows: alloc::vec![row],
480 }
481 }
482
483 pub fn query_stats(&self) -> &query_stats::QueryStats {
485 &self.query_stats
486 }
487
488 pub fn query_stats_mut(&mut self) -> &mut query_stats::QueryStats {
490 &mut self.query_stats
491 }
492
493 pub const fn statistics(&self) -> &statistics::Statistics {
497 &self.statistics
498 }
499
500 pub fn tables_needing_analyze(&self) -> Vec<String> {
513 const MIN_ROWS: u64 = 100;
514 let mut out = Vec::new();
515 for name in self.catalog.table_names() {
516 if is_internal_table_name(&name) {
517 continue;
518 }
519 let Some(table) = self.catalog.get(&name) else {
520 continue;
521 };
522 let row_count = table.rows().len() as u64;
523 let modified = self.statistics.modified_since_last_analyze(&name);
524 let base = row_count.max(MIN_ROWS);
529 let threshold = base.saturating_add(9) / 10;
530 if modified >= threshold {
531 out.push(name);
532 }
533 }
534 out
535 }
536}