Skip to main content

spg_engine/
spg_admin.rs

1//! `spg_*` introspection views and admin/stats API. Lifted out of
2//! `lib.rs` (v7.32 engine modularisation). The `exec_spg_*` methods
3//! materialise the `spg_statistic` / `spg_stat_*` / `spg_*_ddl` /
4//! `spg_audit_*` meta-views dispatched from the meta-view SELECT path;
5//! the public `memory_stats` / `set_plan_cache_max` / `query_stats` /
6//! `tables_needing_analyze` methods form the embedded admin surface.
7
8use alloc::collections::BTreeMap;
9use alloc::string::String;
10use alloc::vec::Vec;
11
12use spg_storage::{ColumnSchema, DataType, Row, Value};
13
14use crate::{
15    ActivityProvider, AuditChainProvider, AuditVerifier, Engine, MemoryStats, QueryResult,
16    SlowQueryLogger, TableMemoryStats, approx_row_bytes, is_internal_table_name,
17    render_create_table, render_histogram_bounds,
18};
19use crate::{query_stats, statistics};
20
21impl Engine {
22    /// v6.2.0 — materialise `spg_statistic` rows. One row per
23    /// `(table, column)` pair tracked in `Statistics`, with
24    /// `histogram_bounds` rendered as a `[v0, v1, ...]` string —
25    /// the same canonical form vector literals use for round-trip.
26    pub(crate) fn exec_spg_statistic(&self) -> QueryResult {
27        let columns = alloc::vec![
28            ColumnSchema::new("table_name", DataType::Text, false),
29            ColumnSchema::new("column_name", DataType::Text, false),
30            ColumnSchema::new("null_frac", DataType::Float, false),
31            ColumnSchema::new("n_distinct", DataType::BigInt, false),
32            ColumnSchema::new("histogram_bounds", DataType::Text, false),
33            // v6.7.0 — appended column (v6.2.0 stability contract
34            // allows APPEND to spg_statistic, not reorder/rename).
35            // Reports the cached per-table cold-row count; same
36            // value across every column row of the same table.
37            ColumnSchema::new("cold_row_count", DataType::BigInt, false),
38        ];
39        let rows: Vec<Row> = self
40            .statistics
41            .iter()
42            .map(|((t, c), s)| {
43                let cold = self
44                    .catalog
45                    .get(t)
46                    .map_or(0, |table| table.cold_row_count());
47                Row::new(alloc::vec![
48                    Value::Text(t.clone()),
49                    Value::Text(c.clone()),
50                    Value::Float(f64::from(s.null_frac)),
51                    Value::BigInt(i64::try_from(s.n_distinct).unwrap_or(i64::MAX)),
52                    Value::Text(render_histogram_bounds(&s.histogram_bounds)),
53                    Value::BigInt(i64::try_from(cold).unwrap_or(i64::MAX)),
54                ])
55            })
56            .collect();
57        QueryResult::Rows { columns, rows }
58    }
59
60    /// v6.5.0 — materialise `spg_stat_replication` rows. One row
61    /// per subscription with `(name, conn_str, publications,
62    /// last_received_pos, enabled)`. Surface mirrors
63    /// `SHOW SUBSCRIPTIONS` but follows the virtual-table dispatch
64    /// shape so it composes with SELECT clauses (WHERE, projection
65    /// onto specific columns, etc).
66    pub(crate) fn exec_spg_stat_replication(&self) -> QueryResult {
67        let columns = alloc::vec![
68            ColumnSchema::new("name", DataType::Text, false),
69            ColumnSchema::new("conn_str", DataType::Text, false),
70            ColumnSchema::new("publications", DataType::Text, false),
71            ColumnSchema::new("last_received_pos", DataType::BigInt, false),
72            ColumnSchema::new("enabled", DataType::Bool, false),
73        ];
74        let rows: Vec<Row> = self
75            .subscriptions
76            .iter()
77            .map(|(name, sub)| {
78                Row::new(alloc::vec![
79                    Value::Text(name.clone()),
80                    Value::Text(sub.conn_str.clone()),
81                    Value::Text(sub.publications.join(",")),
82                    Value::BigInt(i64::try_from(sub.last_received_pos).unwrap_or(i64::MAX)),
83                    Value::Bool(sub.enabled),
84                ])
85            })
86            .collect();
87        QueryResult::Rows { columns, rows }
88    }
89
90    /// v6.5.0 — materialise `spg_stat_segment` rows. One row per
91    /// cold-tier segment with `(segment_id, num_rows, num_pages,
92    /// total_bytes)`.
93    ///
94    /// v6.7.0 — appended `table_name` column resolves the v6.5.0
95    /// carve-out. Walks every user table's BTree indices to find
96    /// which table's Cold locators point at each segment. Empty
97    /// string for orphan segments (loaded via SPG_PRELOAD_COLD_SEGMENT
98    /// before any index registered a locator). The walk is
99    /// O(tables × indices × keys); cached per call, not across
100    /// calls — re-walked on every `SELECT * FROM spg_stat_segment`.
101    /// v7.31 (memory campaign) — walk the committed catalog and
102    /// build the per-bucket memory snapshot. O(rows + index
103    /// entries): operator/monitoring surface, not a query path.
104    pub fn memory_stats(&self) -> MemoryStats {
105        let mut tables: Vec<TableMemoryStats> = Vec::new();
106        let (mut total_enc, mut total_res, mut total_idx) = (0u64, 0u64, 0u64);
107        for tname in self.catalog.table_names() {
108            if is_internal_table_name(&tname) {
109                continue;
110            }
111            let Some(t) = self.catalog.get(&tname) else {
112                continue;
113            };
114            let resident: u64 = t.rows().iter().map(|r| approx_row_bytes(r) as u64).sum();
115            let mut idx_bytes: u64 = 0;
116            for idx in t.indices() {
117                idx_bytes += match &idx.kind {
118                    spg_storage::IndexKind::BTree(map) => {
119                        let mut b: u64 = 0;
120                        for (_, locs) in map.iter() {
121                            b += (core::mem::size_of::<spg_storage::IndexKey>()
122                                + 24
123                                + locs.len() * core::mem::size_of::<spg_storage::RowLocator>())
124                                as u64;
125                        }
126                        b
127                    }
128                    // Parametric estimate: per node, the dense
129                    // layer-0 neighbour list dominates.
130                    spg_storage::IndexKind::Nsw(g) => {
131                        (g.levels.len() * (g.m_max_0 * 8 + 16)) as u64
132                    }
133                    // BRIN is block-range metadata — flat token.
134                    _ => 1024,
135                };
136            }
137            total_enc += t.hot_bytes();
138            total_res += resident;
139            total_idx += idx_bytes;
140            tables.push(TableMemoryStats {
141                name: tname.clone(),
142                hot_rows: t.rows().len() as u64,
143                cold_rows: t.cold_row_count(),
144                hot_encoded_bytes: t.hot_bytes(),
145                approx_resident_bytes: resident,
146                index_count: t.indices().len() as u64,
147                approx_index_bytes: idx_bytes,
148            });
149        }
150        MemoryStats {
151            tables,
152            total_hot_encoded_bytes: total_enc,
153            total_approx_resident_bytes: total_res,
154            total_approx_index_bytes: total_idx,
155            max_query_bytes: self.max_query_bytes,
156        }
157    }
158
159    /// v7.31 — `SELECT * FROM spg_memory_stats`: one row per user
160    /// table (same numbers as `Engine::memory_stats()`), so the
161    /// server path gets the meter through plain SQL.
162    pub(crate) fn exec_spg_memory_stats(&self) -> QueryResult {
163        let columns = alloc::vec![
164            ColumnSchema::new("table_name", DataType::Text, false),
165            ColumnSchema::new("hot_rows", DataType::BigInt, false),
166            ColumnSchema::new("cold_rows", DataType::BigInt, false),
167            ColumnSchema::new("hot_encoded_bytes", DataType::BigInt, false),
168            ColumnSchema::new("approx_resident_bytes", DataType::BigInt, false),
169            ColumnSchema::new("index_count", DataType::BigInt, false),
170            ColumnSchema::new("approx_index_bytes", DataType::BigInt, false),
171        ];
172        #[allow(clippy::cast_possible_wrap)]
173        let rows: Vec<Row> = self
174            .memory_stats()
175            .tables
176            .into_iter()
177            .map(|t| {
178                Row::new(alloc::vec![
179                    Value::Text(t.name),
180                    Value::BigInt(t.hot_rows as i64),
181                    Value::BigInt(t.cold_rows as i64),
182                    Value::BigInt(t.hot_encoded_bytes as i64),
183                    Value::BigInt(t.approx_resident_bytes as i64),
184                    Value::BigInt(t.index_count as i64),
185                    Value::BigInt(t.approx_index_bytes as i64),
186                ])
187            })
188            .collect();
189        QueryResult::Rows { columns, rows }
190    }
191
192    pub(crate) fn exec_spg_stat_segment(&self) -> QueryResult {
193        let columns = alloc::vec![
194            ColumnSchema::new("segment_id", DataType::BigInt, false),
195            ColumnSchema::new("table_name", DataType::Text, false),
196            ColumnSchema::new("num_rows", DataType::BigInt, false),
197            ColumnSchema::new("num_pages", DataType::BigInt, false),
198            ColumnSchema::new("total_bytes", DataType::BigInt, false),
199        ];
200        // v6.7.0 — build a segment_id → table_name map by walking
201        // every user table's BTree indices once. O(tables × indices
202        // × keys) for the v6.5.0 carve-out resolution; acceptable
203        // because spg_stat_segment is operator-facing (not on a
204        // hot-loop path).
205        let mut segment_owners: alloc::collections::BTreeMap<u32, String> = BTreeMap::new();
206        for tname in self.catalog.table_names() {
207            if is_internal_table_name(&tname) {
208                continue;
209            }
210            let Some(t) = self.catalog.get(&tname) else {
211                continue;
212            };
213            for idx in t.indices() {
214                if let spg_storage::IndexKind::BTree(map) = &idx.kind {
215                    for (_, locs) in map.iter() {
216                        for loc in locs {
217                            if let spg_storage::RowLocator::Cold { segment_id, .. } = loc {
218                                segment_owners
219                                    .entry(*segment_id)
220                                    .or_insert_with(|| tname.clone());
221                            }
222                        }
223                    }
224                }
225            }
226        }
227        let rows: Vec<Row> = self
228            .catalog
229            .cold_segment_ids_global()
230            .iter()
231            .filter_map(|&id| {
232                let seg = self.catalog.cold_segment(id)?;
233                let meta = seg.meta();
234                let owner = segment_owners.get(&id).cloned().unwrap_or_default();
235                Some(Row::new(alloc::vec![
236                    Value::BigInt(i64::from(id)),
237                    Value::Text(owner),
238                    Value::BigInt(i64::try_from(meta.num_rows).unwrap_or(i64::MAX)),
239                    Value::BigInt(i64::from(meta.num_pages)),
240                    Value::BigInt(i64::try_from(meta.total_bytes).unwrap_or(i64::MAX)),
241                ]))
242            })
243            .collect();
244        QueryResult::Rows { columns, rows }
245    }
246
247    /// v6.5.1 — materialise `spg_stat_query` rows. One row per
248    /// distinct SQL text recorded since the engine booted, capped
249    /// at `QUERY_STATS_MAX` (1024). Columns:
250    ///   sql, exec_count, total_us, mean_us, max_us, last_seen_us
251    /// mean_us = total_us / exec_count (saturating).
252    pub(crate) fn exec_spg_stat_query(&self) -> QueryResult {
253        let columns = alloc::vec![
254            ColumnSchema::new("sql", DataType::Text, false),
255            ColumnSchema::new("exec_count", DataType::BigInt, false),
256            ColumnSchema::new("total_us", DataType::BigInt, false),
257            ColumnSchema::new("mean_us", DataType::BigInt, false),
258            ColumnSchema::new("max_us", DataType::BigInt, false),
259            ColumnSchema::new("last_seen_us", DataType::BigInt, false),
260        ];
261        let rows: Vec<Row> = self
262            .query_stats
263            .snapshot()
264            .into_iter()
265            .map(|(sql, s)| {
266                let mean = if s.exec_count == 0 {
267                    0
268                } else {
269                    s.total_us / s.exec_count
270                };
271                Row::new(alloc::vec![
272                    Value::Text(sql),
273                    Value::BigInt(i64::try_from(s.exec_count).unwrap_or(i64::MAX)),
274                    Value::BigInt(i64::try_from(s.total_us).unwrap_or(i64::MAX)),
275                    Value::BigInt(i64::try_from(mean).unwrap_or(i64::MAX)),
276                    Value::BigInt(i64::try_from(s.max_us).unwrap_or(i64::MAX)),
277                    Value::BigInt(i64::try_from(s.last_seen_us).unwrap_or(i64::MAX)),
278                ])
279            })
280            .collect();
281        QueryResult::Rows { columns, rows }
282    }
283
284    /// v6.5.2 — register a connection-state provider. spg-server
285    /// calls this at startup with a function that snapshots its
286    /// per-pgwire-connection registry. Engine reads through the
287    /// callback on `SELECT * FROM spg_stat_activity`.
288    #[must_use]
289    pub const fn with_activity_provider(mut self, f: ActivityProvider) -> Self {
290        self.activity_provider = Some(f);
291        self
292    }
293
294    /// v6.5.3 — register audit chain provider + verifier.
295    #[must_use]
296    pub const fn with_audit_providers(
297        mut self,
298        chain: AuditChainProvider,
299        verify: AuditVerifier,
300    ) -> Self {
301        self.audit_chain_provider = Some(chain);
302        self.audit_verifier = Some(verify);
303        self
304    }
305
306    /// v6.5.6 — register a slow-query log callback. `threshold_us`
307    /// is the floor (in microseconds); only executes above the floor
308    /// fire the callback. spg-server wires this from
309    /// `SPG_SLOW_QUERY_THRESHOLD_MS` (default 100 ms).
310    #[must_use]
311    pub const fn with_slow_query_log(mut self, threshold_us: u64, logger: SlowQueryLogger) -> Self {
312        self.slow_query_threshold_us = Some(threshold_us);
313        self.slow_query_logger = Some(logger);
314        self
315    }
316
317    /// v6.5.6 — operator knob for plan cache cap. spg-server reads
318    /// `SPG_PLAN_CACHE_MAX` env at startup; uses this to override
319    /// the compile-time default of 256.
320    pub fn set_plan_cache_max(&mut self, n: usize) {
321        self.plan_cache.set_max_entries(n);
322    }
323
324    /// v6.5.2 — materialise `spg_stat_activity` rows. Pulls a fresh
325    /// snapshot from the registered `ActivityProvider`. Returns an
326    /// empty result set when no provider is registered (the no_std
327    /// embedded path with no pgwire layer).
328    pub(crate) fn exec_spg_stat_activity(&self) -> QueryResult {
329        let columns = alloc::vec![
330            ColumnSchema::new("pid", DataType::Int, false),
331            ColumnSchema::new("user", DataType::Text, false),
332            ColumnSchema::new("started_at_us", DataType::BigInt, false),
333            ColumnSchema::new("current_sql", DataType::Text, false),
334            ColumnSchema::new("wait_event", DataType::Text, false),
335            ColumnSchema::new("elapsed_us", DataType::BigInt, false),
336            ColumnSchema::new("in_transaction", DataType::Bool, false),
337            ColumnSchema::new("application_name", DataType::Text, false),
338        ];
339        let rows: Vec<Row> = self
340            .activity_provider
341            .map(|f| f())
342            .unwrap_or_default()
343            .into_iter()
344            .map(|r| {
345                Row::new(alloc::vec![
346                    Value::Int(i32::try_from(r.pid).unwrap_or(i32::MAX)),
347                    Value::Text(r.user),
348                    Value::BigInt(r.started_at_us),
349                    Value::Text(r.current_sql),
350                    Value::Text(r.wait_event),
351                    Value::BigInt(r.elapsed_us),
352                    Value::Bool(r.in_transaction),
353                    Value::Text(r.application_name),
354                ])
355            })
356            .collect();
357        QueryResult::Rows { columns, rows }
358    }
359
360    /// v6.5.4 — materialise `spg_table_ddl` rows. One row per user
361    /// table with `(table_name, ddl)`. Reconstructed from catalog
362    /// state on demand.
363    pub(crate) fn exec_spg_table_ddl(&self) -> QueryResult {
364        let columns = alloc::vec![
365            ColumnSchema::new("table_name", DataType::Text, false),
366            ColumnSchema::new("ddl", DataType::Text, false),
367        ];
368        let rows: Vec<Row> = self
369            .catalog
370            .table_names()
371            .into_iter()
372            .filter(|n| !is_internal_table_name(n))
373            .filter_map(|name| {
374                let table = self.catalog.get(&name)?;
375                let ddl = render_create_table(&name, &table.schema().columns);
376                Some(Row::new(alloc::vec![Value::Text(name), Value::Text(ddl),]))
377            })
378            .collect();
379        QueryResult::Rows { columns, rows }
380    }
381
382    /// v6.5.4 — materialise `spg_role_ddl` rows. One row per user
383    /// with `(role_name, ddl)`. Password is redacted (matches the
384    /// `Statement::CreateUser` Display which prints `'<redacted>'`).
385    pub(crate) fn exec_spg_role_ddl(&self) -> QueryResult {
386        let columns = alloc::vec![
387            ColumnSchema::new("role_name", DataType::Text, false),
388            ColumnSchema::new("ddl", DataType::Text, false),
389        ];
390        let rows: Vec<Row> = self
391            .users
392            .iter()
393            .map(|(name, rec)| {
394                let ddl = alloc::format!(
395                    "CREATE USER {name} WITH PASSWORD '<redacted>' ROLE '{}'",
396                    rec.role.as_str(),
397                );
398                Row::new(alloc::vec![
399                    Value::Text(String::from(name)),
400                    Value::Text(ddl)
401                ])
402            })
403            .collect();
404        QueryResult::Rows { columns, rows }
405    }
406
407    /// v6.5.4 — materialise `spg_database_ddl`: single row whose
408    /// `ddl` column concatenates every user table's CREATE +
409    /// every role's CREATE in deterministic catalog order. Suitable
410    /// for piping back through `Engine::execute` to recreate a
411    /// schema-equivalent database.
412    pub(crate) fn exec_spg_database_ddl(&self) -> QueryResult {
413        let columns = alloc::vec![ColumnSchema::new("ddl", DataType::Text, false)];
414        let mut out = String::new();
415        for (name, rec) in self.users.iter() {
416            out.push_str(&alloc::format!(
417                "CREATE USER {name} WITH PASSWORD '<redacted>' ROLE '{}';\n",
418                rec.role.as_str(),
419            ));
420        }
421        for name in self.catalog.table_names() {
422            if is_internal_table_name(&name) {
423                continue;
424            }
425            if let Some(table) = self.catalog.get(&name) {
426                out.push_str(&render_create_table(&name, &table.schema().columns));
427                out.push_str(";\n");
428            }
429        }
430        QueryResult::Rows {
431            columns,
432            rows: alloc::vec![Row::new(alloc::vec![Value::Text(out)])],
433        }
434    }
435
436    /// v6.5.3 — materialise `spg_audit_chain` rows. Pulls a fresh
437    /// snapshot from the registered provider; empty when no
438    /// provider is set.
439    pub(crate) fn exec_spg_audit_chain(&self) -> QueryResult {
440        let columns = alloc::vec![
441            ColumnSchema::new("seq", DataType::BigInt, false),
442            ColumnSchema::new("ts_ms", DataType::BigInt, false),
443            ColumnSchema::new("prev_hash", DataType::Text, false),
444            ColumnSchema::new("entry_hash", DataType::Text, false),
445            ColumnSchema::new("sql", DataType::Text, false),
446        ];
447        let rows: Vec<Row> = self
448            .audit_chain_provider
449            .map(|f| f())
450            .unwrap_or_default()
451            .into_iter()
452            .map(|r| {
453                Row::new(alloc::vec![
454                    Value::BigInt(r.seq),
455                    Value::BigInt(r.ts_ms),
456                    Value::Text(r.prev_hash_hex),
457                    Value::Text(r.entry_hash_hex),
458                    Value::Text(r.sql),
459                ])
460            })
461            .collect();
462        QueryResult::Rows { columns, rows }
463    }
464
465    /// v6.5.3 — materialise `spg_audit_verify` single-row result.
466    /// `(verified_count, broken_at_seq)` — broken_at_seq is `-1`
467    /// on a clean chain. Returns one row with both values 0 when
468    /// no verifier is registered (no-data fallback for embedded
469    /// callers).
470    pub(crate) fn exec_spg_audit_verify(&self) -> QueryResult {
471        let columns = alloc::vec![
472            ColumnSchema::new("verified_count", DataType::BigInt, false),
473            ColumnSchema::new("broken_at_seq", DataType::BigInt, false),
474        ];
475        let (verified, broken) = self.audit_verifier.map(|f| f()).unwrap_or((0, -1));
476        let row = Row::new(alloc::vec![Value::BigInt(verified), Value::BigInt(broken),]);
477        QueryResult::Rows {
478            columns,
479            rows: alloc::vec![row],
480        }
481    }
482
483    /// v6.5.1 — read-only accessor for tests + v6.5.6 ops resets.
484    pub fn query_stats(&self) -> &query_stats::QueryStats {
485        &self.query_stats
486    }
487
488    /// v6.5.1 — mutable accessor (clear, etc).
489    pub fn query_stats_mut(&mut self) -> &mut query_stats::QueryStats {
490        &mut self.query_stats
491    }
492
493    /// v6.2.0 — read access to the per-column statistics table.
494    /// Used by the planner (v6.2.2 selectivity functions read this),
495    /// by `SELECT * FROM spg_statistic`, and by e2e tests.
496    pub const fn statistics(&self) -> &statistics::Statistics {
497        &self.statistics
498    }
499
500    /// v6.2.1 — return tables whose modified-row count crossed the
501    /// auto-analyze threshold since the last ANALYZE on that table.
502    /// The threshold is `0.1 × max(row_count, MIN_ROWS_FOR_AUTO_
503    /// ANALYZE)` — combines PG-style fractional + absolute lower
504    /// bound so a fresh / tiny table doesn't get hammered on every
505    /// INSERT.
506    ///
507    /// Designed to be cheap: walks every user table's
508    /// `Catalog::table_names()` + reads `statistics::modified_
509    /// since_last_analyze()` (BTreeMap lookup). The background
510    /// worker calls this under `engine.read()` then drops the lock
511    /// before re-acquiring `engine.write()` for the actual ANALYZE.
512    pub fn tables_needing_analyze(&self) -> Vec<String> {
513        const MIN_ROWS: u64 = 100;
514        let mut out = Vec::new();
515        for name in self.catalog.table_names() {
516            if is_internal_table_name(&name) {
517                continue;
518            }
519            let Some(table) = self.catalog.get(&name) else {
520                continue;
521            };
522            let row_count = table.rows().len() as u64;
523            let modified = self.statistics.modified_since_last_analyze(&name);
524            // Threshold: ceil(0.1 × max(row_count, MIN_ROWS)),
525            // computed in integer arithmetic so spg-engine stays
526            // no_std without pulling in libm. `(n + 9) / 10` is
527            // `ceil(n / 10)` for non-negative `n`.
528            let base = row_count.max(MIN_ROWS);
529            let threshold = base.saturating_add(9) / 10;
530            if modified >= threshold {
531                out.push(name);
532            }
533        }
534        out
535    }
536}