Skip to main content

spg_engine/
spg_admin.rs

1//! `spg_*` introspection views and admin/stats API. Lifted out of
2//! `lib.rs` (v7.32 engine modularisation). The `exec_spg_*` methods
3//! materialise the `spg_statistic` / `spg_stat_*` / `spg_*_ddl` /
4//! `spg_audit_*` meta-views dispatched from the meta-view SELECT path;
5//! the public `memory_stats` / `set_plan_cache_max` / `query_stats` /
6//! `tables_needing_analyze` methods form the embedded admin surface.
7
8use alloc::collections::BTreeMap;
9use alloc::string::String;
10use alloc::vec::Vec;
11
12use spg_storage::{ColumnSchema, DataType, Row, Value};
13
14use crate::{
15    ActivityProvider, AuditChainProvider, AuditVerifier, Engine, MemoryStats, QueryResult,
16    SlowQueryLogger, TableMemoryStats, approx_row_bytes, is_internal_table_name,
17    render_create_table, render_histogram_bounds,
18};
19use crate::{query_stats, statistics};
20
21impl Engine {
22    /// v6.2.0 — materialise `spg_statistic` rows. One row per
23    /// `(table, column)` pair tracked in `Statistics`, with
24    /// `histogram_bounds` rendered as a `[v0, v1, ...]` string —
25    /// the same canonical form vector literals use for round-trip.
26    pub(crate) fn exec_spg_statistic(&self) -> QueryResult {
27        let columns = alloc::vec![
28            ColumnSchema::new("table_name", DataType::Text, false),
29            ColumnSchema::new("column_name", DataType::Text, false),
30            ColumnSchema::new("null_frac", DataType::Float, false),
31            ColumnSchema::new("n_distinct", DataType::BigInt, false),
32            ColumnSchema::new("histogram_bounds", DataType::Text, false),
33            // v6.7.0 — appended column (v6.2.0 stability contract
34            // allows APPEND to spg_statistic, not reorder/rename).
35            // Reports the cached per-table cold-row count; same
36            // value across every column row of the same table.
37            ColumnSchema::new("cold_row_count", DataType::BigInt, false),
38        ];
39        let rows: Vec<Row> = self
40            .statistics
41            .iter()
42            .map(|((t, c), s)| {
43                let cold = self
44                    .catalog
45                    .get(t)
46                    .map_or(0, |table| table.cold_row_count());
47                Row::new(alloc::vec![
48                    Value::Text(t.clone()),
49                    Value::Text(c.clone()),
50                    Value::Float(f64::from(s.null_frac)),
51                    Value::BigInt(i64::try_from(s.n_distinct).unwrap_or(i64::MAX)),
52                    Value::Text(render_histogram_bounds(&s.histogram_bounds)),
53                    Value::BigInt(i64::try_from(cold).unwrap_or(i64::MAX)),
54                ])
55            })
56            .collect();
57        QueryResult::Rows { columns, rows }
58    }
59
60    /// v6.5.0 — materialise `spg_stat_replication` rows. One row
61    /// per subscription with `(name, conn_str, publications,
62    /// last_received_pos, enabled)`. Surface mirrors
63    /// `SHOW SUBSCRIPTIONS` but follows the virtual-table dispatch
64    /// shape so it composes with SELECT clauses (WHERE, projection
65    /// onto specific columns, etc).
66    pub(crate) fn exec_spg_stat_replication(&self) -> QueryResult {
67        let columns = alloc::vec![
68            ColumnSchema::new("name", DataType::Text, false),
69            ColumnSchema::new("conn_str", DataType::Text, false),
70            ColumnSchema::new("publications", DataType::Text, false),
71            ColumnSchema::new("last_received_pos", DataType::BigInt, false),
72            ColumnSchema::new("enabled", DataType::Bool, false),
73        ];
74        let rows: Vec<Row> = self
75            .subscriptions
76            .iter()
77            .map(|(name, sub)| {
78                Row::new(alloc::vec![
79                    Value::Text(name.clone()),
80                    Value::Text(sub.conn_str.clone()),
81                    Value::Text(sub.publications.join(",")),
82                    Value::BigInt(i64::try_from(sub.last_received_pos).unwrap_or(i64::MAX)),
83                    Value::Bool(sub.enabled),
84                ])
85            })
86            .collect();
87        QueryResult::Rows { columns, rows }
88    }
89
90    /// v6.5.0 — materialise `spg_stat_segment` rows. One row per
91    /// cold-tier segment with `(segment_id, num_rows, num_pages,
92    /// total_bytes)`.
93    ///
94    /// v6.7.0 — appended `table_name` column resolves the v6.5.0
95    /// carve-out. Walks every user table's BTree indices to find
96    /// which table's Cold locators point at each segment. Empty
97    /// string for orphan segments (loaded via SPG_PRELOAD_COLD_SEGMENT
98    /// before any index registered a locator). The walk is
99    /// O(tables × indices × keys); cached per call, not across
100    /// calls — re-walked on every `SELECT * FROM spg_stat_segment`.
101    /// v7.31 (memory campaign) — walk the committed catalog and
102    /// build the per-bucket memory snapshot. O(rows + index
103    /// entries): operator/monitoring surface, not a query path.
104    pub fn memory_stats(&self) -> MemoryStats {
105        let mut tables: Vec<TableMemoryStats> = Vec::new();
106        let (mut total_enc, mut total_res, mut total_idx) = (0u64, 0u64, 0u64);
107        for tname in self.catalog.table_names() {
108            if is_internal_table_name(&tname) {
109                continue;
110            }
111            let Some(t) = self.catalog.get(&tname) else {
112                continue;
113            };
114            let resident: u64 = t.rows().iter().map(|r| approx_row_bytes(r) as u64).sum();
115            // v7.31 C2 — each index variant accounts for its own
116            // resident bytes by walking its real structure (NSW layer
117            // adjacency, GIN posting lists), replacing the old inline
118            // parametric estimate that mis-sized NSW and flat-tokened
119            // every GIN family index.
120            let mut idx_bytes: u64 = 0;
121            for idx in t.indices() {
122                idx_bytes += idx.kind.approx_resident_bytes();
123            }
124            total_enc += t.hot_bytes();
125            total_res += resident;
126            total_idx += idx_bytes;
127            tables.push(TableMemoryStats {
128                name: tname.clone(),
129                hot_rows: t.rows().len() as u64,
130                cold_rows: t.cold_row_count(),
131                hot_encoded_bytes: t.hot_bytes(),
132                approx_resident_bytes: resident,
133                index_count: t.indices().len() as u64,
134                approx_index_bytes: idx_bytes,
135            });
136        }
137        MemoryStats {
138            tables,
139            total_hot_encoded_bytes: total_enc,
140            total_approx_resident_bytes: total_res,
141            total_approx_index_bytes: total_idx,
142            max_query_bytes: self.max_query_bytes,
143            // Bucket D belongs to the durable host (embed / server),
144            // not the engine — filled in there (C2).
145            wal_bytes: None,
146        }
147    }
148
149    /// v7.31 — `SELECT * FROM spg_memory_stats`: one row per user
150    /// table (same numbers as `Engine::memory_stats()`), so the
151    /// server path gets the meter through plain SQL.
152    pub(crate) fn exec_spg_memory_stats(&self) -> QueryResult {
153        let columns = alloc::vec![
154            ColumnSchema::new("table_name", DataType::Text, false),
155            ColumnSchema::new("hot_rows", DataType::BigInt, false),
156            ColumnSchema::new("cold_rows", DataType::BigInt, false),
157            ColumnSchema::new("hot_encoded_bytes", DataType::BigInt, false),
158            ColumnSchema::new("approx_resident_bytes", DataType::BigInt, false),
159            ColumnSchema::new("index_count", DataType::BigInt, false),
160            ColumnSchema::new("approx_index_bytes", DataType::BigInt, false),
161        ];
162        #[allow(clippy::cast_possible_wrap)]
163        let rows: Vec<Row> = self
164            .memory_stats()
165            .tables
166            .into_iter()
167            .map(|t| {
168                Row::new(alloc::vec![
169                    Value::Text(t.name),
170                    Value::BigInt(t.hot_rows as i64),
171                    Value::BigInt(t.cold_rows as i64),
172                    Value::BigInt(t.hot_encoded_bytes as i64),
173                    Value::BigInt(t.approx_resident_bytes as i64),
174                    Value::BigInt(t.index_count as i64),
175                    Value::BigInt(t.approx_index_bytes as i64),
176                ])
177            })
178            .collect();
179        QueryResult::Rows { columns, rows }
180    }
181
182    pub(crate) fn exec_spg_stat_segment(&self) -> QueryResult {
183        let columns = alloc::vec![
184            ColumnSchema::new("segment_id", DataType::BigInt, false),
185            ColumnSchema::new("table_name", DataType::Text, false),
186            ColumnSchema::new("num_rows", DataType::BigInt, false),
187            ColumnSchema::new("num_pages", DataType::BigInt, false),
188            ColumnSchema::new("total_bytes", DataType::BigInt, false),
189        ];
190        // v6.7.0 — build a segment_id → table_name map by walking
191        // every user table's BTree indices once. O(tables × indices
192        // × keys) for the v6.5.0 carve-out resolution; acceptable
193        // because spg_stat_segment is operator-facing (not on a
194        // hot-loop path).
195        let mut segment_owners: alloc::collections::BTreeMap<u32, String> = BTreeMap::new();
196        for tname in self.catalog.table_names() {
197            if is_internal_table_name(&tname) {
198                continue;
199            }
200            let Some(t) = self.catalog.get(&tname) else {
201                continue;
202            };
203            for idx in t.indices() {
204                if let spg_storage::IndexKind::BTree(map) = &idx.kind {
205                    for (_, locs) in map.iter() {
206                        for loc in locs {
207                            if let spg_storage::RowLocator::Cold { segment_id, .. } = loc {
208                                segment_owners
209                                    .entry(*segment_id)
210                                    .or_insert_with(|| tname.clone());
211                            }
212                        }
213                    }
214                }
215            }
216        }
217        let rows: Vec<Row> = self
218            .catalog
219            .cold_segment_ids_global()
220            .iter()
221            .filter_map(|&id| {
222                let seg = self.catalog.cold_segment(id)?;
223                let meta = seg.meta();
224                let owner = segment_owners.get(&id).cloned().unwrap_or_default();
225                Some(Row::new(alloc::vec![
226                    Value::BigInt(i64::from(id)),
227                    Value::Text(owner),
228                    Value::BigInt(i64::try_from(meta.num_rows).unwrap_or(i64::MAX)),
229                    Value::BigInt(i64::from(meta.num_pages)),
230                    Value::BigInt(i64::try_from(meta.total_bytes).unwrap_or(i64::MAX)),
231                ]))
232            })
233            .collect();
234        QueryResult::Rows { columns, rows }
235    }
236
237    /// v6.5.1 — materialise `spg_stat_query` rows. One row per
238    /// distinct SQL text recorded since the engine booted, capped
239    /// at `QUERY_STATS_MAX` (1024). Columns:
240    ///   sql, exec_count, total_us, mean_us, max_us, last_seen_us
241    /// mean_us = total_us / exec_count (saturating).
242    pub(crate) fn exec_spg_stat_query(&self) -> QueryResult {
243        let columns = alloc::vec![
244            ColumnSchema::new("sql", DataType::Text, false),
245            ColumnSchema::new("exec_count", DataType::BigInt, false),
246            ColumnSchema::new("total_us", DataType::BigInt, false),
247            ColumnSchema::new("mean_us", DataType::BigInt, false),
248            ColumnSchema::new("max_us", DataType::BigInt, false),
249            ColumnSchema::new("last_seen_us", DataType::BigInt, false),
250        ];
251        let rows: Vec<Row> = self
252            .query_stats
253            .snapshot()
254            .into_iter()
255            .map(|(sql, s)| {
256                let mean = if s.exec_count == 0 {
257                    0
258                } else {
259                    s.total_us / s.exec_count
260                };
261                Row::new(alloc::vec![
262                    Value::Text(sql),
263                    Value::BigInt(i64::try_from(s.exec_count).unwrap_or(i64::MAX)),
264                    Value::BigInt(i64::try_from(s.total_us).unwrap_or(i64::MAX)),
265                    Value::BigInt(i64::try_from(mean).unwrap_or(i64::MAX)),
266                    Value::BigInt(i64::try_from(s.max_us).unwrap_or(i64::MAX)),
267                    Value::BigInt(i64::try_from(s.last_seen_us).unwrap_or(i64::MAX)),
268                ])
269            })
270            .collect();
271        QueryResult::Rows { columns, rows }
272    }
273
274    /// v6.5.2 — register a connection-state provider. spg-server
275    /// calls this at startup with a function that snapshots its
276    /// per-pgwire-connection registry. Engine reads through the
277    /// callback on `SELECT * FROM spg_stat_activity`.
278    #[must_use]
279    pub const fn with_activity_provider(mut self, f: ActivityProvider) -> Self {
280        self.activity_provider = Some(f);
281        self
282    }
283
284    /// v6.5.3 — register audit chain provider + verifier.
285    #[must_use]
286    pub const fn with_audit_providers(
287        mut self,
288        chain: AuditChainProvider,
289        verify: AuditVerifier,
290    ) -> Self {
291        self.audit_chain_provider = Some(chain);
292        self.audit_verifier = Some(verify);
293        self
294    }
295
296    /// v6.5.6 — register a slow-query log callback. `threshold_us`
297    /// is the floor (in microseconds); only executes above the floor
298    /// fire the callback. spg-server wires this from
299    /// `SPG_SLOW_QUERY_THRESHOLD_MS` (default 100 ms).
300    #[must_use]
301    pub const fn with_slow_query_log(mut self, threshold_us: u64, logger: SlowQueryLogger) -> Self {
302        self.slow_query_threshold_us = Some(threshold_us);
303        self.slow_query_logger = Some(logger);
304        self
305    }
306
307    /// v6.5.6 — operator knob for plan cache cap. spg-server reads
308    /// `SPG_PLAN_CACHE_MAX` env at startup; uses this to override
309    /// the compile-time default of 256.
310    pub fn set_plan_cache_max(&mut self, n: usize) {
311        self.plan_cache.set_max_entries(n);
312    }
313
314    /// v6.5.2 — materialise `spg_stat_activity` rows. Pulls a fresh
315    /// snapshot from the registered `ActivityProvider`. Returns an
316    /// empty result set when no provider is registered (the no_std
317    /// embedded path with no pgwire layer).
318    pub(crate) fn exec_spg_stat_activity(&self) -> QueryResult {
319        let columns = alloc::vec![
320            ColumnSchema::new("pid", DataType::Int, false),
321            ColumnSchema::new("user", DataType::Text, false),
322            ColumnSchema::new("started_at_us", DataType::BigInt, false),
323            ColumnSchema::new("current_sql", DataType::Text, false),
324            ColumnSchema::new("wait_event", DataType::Text, false),
325            ColumnSchema::new("elapsed_us", DataType::BigInt, false),
326            ColumnSchema::new("in_transaction", DataType::Bool, false),
327            ColumnSchema::new("application_name", DataType::Text, false),
328        ];
329        let rows: Vec<Row> = self
330            .activity_provider
331            .map(|f| f())
332            .unwrap_or_default()
333            .into_iter()
334            .map(|r| {
335                Row::new(alloc::vec![
336                    Value::Int(i32::try_from(r.pid).unwrap_or(i32::MAX)),
337                    Value::Text(r.user),
338                    Value::BigInt(r.started_at_us),
339                    Value::Text(r.current_sql),
340                    Value::Text(r.wait_event),
341                    Value::BigInt(r.elapsed_us),
342                    Value::Bool(r.in_transaction),
343                    Value::Text(r.application_name),
344                ])
345            })
346            .collect();
347        QueryResult::Rows { columns, rows }
348    }
349
350    /// v6.5.4 — materialise `spg_table_ddl` rows. One row per user
351    /// table with `(table_name, ddl)`. Reconstructed from catalog
352    /// state on demand.
353    pub(crate) fn exec_spg_table_ddl(&self) -> QueryResult {
354        let columns = alloc::vec![
355            ColumnSchema::new("table_name", DataType::Text, false),
356            ColumnSchema::new("ddl", DataType::Text, false),
357        ];
358        let rows: Vec<Row> = self
359            .catalog
360            .table_names()
361            .into_iter()
362            .filter(|n| !is_internal_table_name(n))
363            .filter_map(|name| {
364                let table = self.catalog.get(&name)?;
365                let ddl = render_create_table(&name, &table.schema().columns);
366                Some(Row::new(alloc::vec![Value::Text(name), Value::Text(ddl),]))
367            })
368            .collect();
369        QueryResult::Rows { columns, rows }
370    }
371
372    /// v6.5.4 — materialise `spg_role_ddl` rows. One row per user
373    /// with `(role_name, ddl)`. Password is redacted (matches the
374    /// `Statement::CreateUser` Display which prints `'<redacted>'`).
375    pub(crate) fn exec_spg_role_ddl(&self) -> QueryResult {
376        let columns = alloc::vec![
377            ColumnSchema::new("role_name", DataType::Text, false),
378            ColumnSchema::new("ddl", DataType::Text, false),
379        ];
380        let rows: Vec<Row> = self
381            .users
382            .iter()
383            .map(|(name, rec)| {
384                let ddl = alloc::format!(
385                    "CREATE USER {name} WITH PASSWORD '<redacted>' ROLE '{}'",
386                    rec.role.as_str(),
387                );
388                Row::new(alloc::vec![
389                    Value::Text(String::from(name)),
390                    Value::Text(ddl)
391                ])
392            })
393            .collect();
394        QueryResult::Rows { columns, rows }
395    }
396
397    /// v6.5.4 — materialise `spg_database_ddl`: single row whose
398    /// `ddl` column concatenates every user table's CREATE +
399    /// every role's CREATE in deterministic catalog order. Suitable
400    /// for piping back through `Engine::execute` to recreate a
401    /// schema-equivalent database.
402    pub(crate) fn exec_spg_database_ddl(&self) -> QueryResult {
403        let columns = alloc::vec![ColumnSchema::new("ddl", DataType::Text, false)];
404        let mut out = String::new();
405        for (name, rec) in self.users.iter() {
406            out.push_str(&alloc::format!(
407                "CREATE USER {name} WITH PASSWORD '<redacted>' ROLE '{}';\n",
408                rec.role.as_str(),
409            ));
410        }
411        for name in self.catalog.table_names() {
412            if is_internal_table_name(&name) {
413                continue;
414            }
415            if let Some(table) = self.catalog.get(&name) {
416                out.push_str(&render_create_table(&name, &table.schema().columns));
417                out.push_str(";\n");
418            }
419        }
420        QueryResult::Rows {
421            columns,
422            rows: alloc::vec![Row::new(alloc::vec![Value::Text(out)])],
423        }
424    }
425
426    /// v6.5.3 — materialise `spg_audit_chain` rows. Pulls a fresh
427    /// snapshot from the registered provider; empty when no
428    /// provider is set.
429    pub(crate) fn exec_spg_audit_chain(&self) -> QueryResult {
430        let columns = alloc::vec![
431            ColumnSchema::new("seq", DataType::BigInt, false),
432            ColumnSchema::new("ts_ms", DataType::BigInt, false),
433            ColumnSchema::new("prev_hash", DataType::Text, false),
434            ColumnSchema::new("entry_hash", DataType::Text, false),
435            ColumnSchema::new("sql", DataType::Text, false),
436        ];
437        let rows: Vec<Row> = self
438            .audit_chain_provider
439            .map(|f| f())
440            .unwrap_or_default()
441            .into_iter()
442            .map(|r| {
443                Row::new(alloc::vec![
444                    Value::BigInt(r.seq),
445                    Value::BigInt(r.ts_ms),
446                    Value::Text(r.prev_hash_hex),
447                    Value::Text(r.entry_hash_hex),
448                    Value::Text(r.sql),
449                ])
450            })
451            .collect();
452        QueryResult::Rows { columns, rows }
453    }
454
455    /// v6.5.3 — materialise `spg_audit_verify` single-row result.
456    /// `(verified_count, broken_at_seq)` — broken_at_seq is `-1`
457    /// on a clean chain. Returns one row with both values 0 when
458    /// no verifier is registered (no-data fallback for embedded
459    /// callers).
460    pub(crate) fn exec_spg_audit_verify(&self) -> QueryResult {
461        let columns = alloc::vec![
462            ColumnSchema::new("verified_count", DataType::BigInt, false),
463            ColumnSchema::new("broken_at_seq", DataType::BigInt, false),
464        ];
465        let (verified, broken) = self.audit_verifier.map(|f| f()).unwrap_or((0, -1));
466        let row = Row::new(alloc::vec![Value::BigInt(verified), Value::BigInt(broken),]);
467        QueryResult::Rows {
468            columns,
469            rows: alloc::vec![row],
470        }
471    }
472
473    /// v6.5.1 — read-only accessor for tests + v6.5.6 ops resets.
474    pub fn query_stats(&self) -> &query_stats::QueryStats {
475        &self.query_stats
476    }
477
478    /// v6.5.1 — mutable accessor (clear, etc).
479    pub fn query_stats_mut(&mut self) -> &mut query_stats::QueryStats {
480        &mut self.query_stats
481    }
482
483    /// v6.2.0 — read access to the per-column statistics table.
484    /// Used by the planner (v6.2.2 selectivity functions read this),
485    /// by `SELECT * FROM spg_statistic`, and by e2e tests.
486    pub const fn statistics(&self) -> &statistics::Statistics {
487        &self.statistics
488    }
489
490    /// v6.2.1 — return tables whose modified-row count crossed the
491    /// auto-analyze threshold since the last ANALYZE on that table.
492    /// The threshold is `0.1 × max(row_count, MIN_ROWS_FOR_AUTO_
493    /// ANALYZE)` — combines PG-style fractional + absolute lower
494    /// bound so a fresh / tiny table doesn't get hammered on every
495    /// INSERT.
496    ///
497    /// Designed to be cheap: walks every user table's
498    /// `Catalog::table_names()` + reads `statistics::modified_
499    /// since_last_analyze()` (BTreeMap lookup). The background
500    /// worker calls this under `engine.read()` then drops the lock
501    /// before re-acquiring `engine.write()` for the actual ANALYZE.
502    pub fn tables_needing_analyze(&self) -> Vec<String> {
503        const MIN_ROWS: u64 = 100;
504        let mut out = Vec::new();
505        for name in self.catalog.table_names() {
506            if is_internal_table_name(&name) {
507                continue;
508            }
509            let Some(table) = self.catalog.get(&name) else {
510                continue;
511            };
512            let row_count = table.rows().len() as u64;
513            let modified = self.statistics.modified_since_last_analyze(&name);
514            // Threshold: ceil(0.1 × max(row_count, MIN_ROWS)),
515            // computed in integer arithmetic so spg-engine stays
516            // no_std without pulling in libm. `(n + 9) / 10` is
517            // `ceil(n / 10)` for non-negative `n`.
518            let base = row_count.max(MIN_ROWS);
519            let threshold = base.saturating_add(9) / 10;
520            if modified >= threshold {
521                out.push(name);
522            }
523        }
524        out
525    }
526}