spg_engine/lib.rs
1//! SPG execution engine — v0.3 wires the SQL front-end to the in-memory
2//! storage layer. Implements `CREATE TABLE`, single-row `INSERT VALUES`, and
3//! `SELECT * FROM <table>` (no WHERE yet — that lands in v0.4 alongside
4//! expression evaluation against rows).
5#![no_std]
6
7extern crate alloc;
8
9pub mod aggregate;
10mod bytebudget;
11mod cancel;
12mod clock;
13mod constraints;
14mod conversions;
15pub mod copy;
16mod ddl;
17pub mod describe;
18mod dml;
19mod envelope;
20pub mod eval;
21mod execute;
22mod explain;
23mod expr_analysis;
24pub mod fts;
25mod index_access;
26mod join;
27pub mod json;
28mod maintenance;
29pub mod memoize;
30mod numeric;
31mod orderby;
32pub mod plan_cache;
33mod plpgsql;
34pub mod publications;
35pub mod query_stats;
36mod readonly;
37pub mod reorder;
38mod select;
39pub mod selectivity;
40mod sequence;
41mod session;
42mod show;
43mod spg_admin;
44pub mod statistics;
45mod subquery;
46pub mod subscriptions;
47mod substitute;
48mod system_catalog;
49mod table_access;
50mod transaction;
51pub mod triggers;
52pub mod users;
53mod window;
54
55pub use crate::users::{Role, ScramSecrets, UserError, UserStore};
56pub use cancel::{CancelToken, MonotonicNowFn};
57
58use bytebudget::*;
59pub(crate) use clock::{rewrite_clock_calls, value_to_literal};
60use constraints::*;
61use conversions::*;
62pub use conversions::{
63 format_bigint_2d_text_pub, format_hstore_text, format_int_2d_text_pub, format_range_text,
64 format_text_2d_text_pub,
65};
66pub(crate) use ddl::{
67 canonicalize_set_value, enforce_enum_label, eval_runtime_default_free,
68 resolve_column_default_free,
69};
70pub(crate) use envelope::{EnvelopeParse, build_envelope, split_envelope};
71use expr_analysis::*;
72use index_access::*;
73pub(crate) use orderby::{
74 apply_offset_and_limit, apply_offset_and_limit_tagged, build_order_keys, canonical_value_repr,
75 expand_group_by_all, order_by_value_cmp, partial_sort_tagged, render_histogram_bounds,
76 resolve_order_by_position, sort_by_keys, sort_values_for_histogram, value_cmp, value_to_f64,
77};
78pub(crate) use select::{build_projection, infer_column_types, value_to_order_key};
79pub(crate) use show::render_create_table;
80pub(crate) use subquery::{
81 build_in_list_set, collect_scalar_subqueries, expr_has_subquery, expr_tree_has_subquery,
82};
83pub use substitute::substitute_placeholders;
84use substitute::*;
85use system_catalog::*;
86use window::*;
87
88use alloc::collections::BTreeMap;
89use alloc::string::String;
90use alloc::vec::Vec;
91use core::fmt;
92
93// v7.16.0 — re-export the parsed-statement AST so downstream
94// crates (spg-embedded → spg-sqlx) don't need a direct dep on
95// spg-sql for the prepare/bind handle.
96pub use spg_sql::ast::Statement as ParsedStatement;
97use spg_sql::parser::ParseError;
98use spg_storage::{Catalog, ColumnSchema, Row, StorageError};
99
100use crate::eval::EvalError;
101
102/// Result of executing one statement.
103#[derive(Debug, Clone, PartialEq)]
104#[non_exhaustive]
105pub enum QueryResult {
106 /// DDL or DML succeeded.
107 ///
108 /// `affected` is the row count for `INSERT` and 0 elsewhere.
109 /// `modified_catalog` tells the server whether this statement
110 /// caused the *committed* catalog to change — it's the signal to
111 /// snapshot/audit. False for `BEGIN`/`ROLLBACK`, false for writeful
112 /// statements executed inside a transaction (those only touch the
113 /// shadow), and true for `COMMIT` and for writes outside a TX.
114 CommandOk {
115 affected: usize,
116 modified_catalog: bool,
117 },
118 /// `SELECT` returned a (possibly empty) row set.
119 Rows {
120 columns: Vec<ColumnSchema>,
121 rows: Vec<Row>,
122 },
123}
124
125/// All errors the engine can return.
126///
127/// Marked `#[non_exhaustive]` from v7.5.0 onward: external `match`
128/// must include a `_` arm so new variants in subsequent v7.x releases
129/// are not breaking changes.
130#[derive(Debug, Clone, PartialEq)]
131#[non_exhaustive]
132pub enum EngineError {
133 Parse(ParseError),
134 Storage(StorageError),
135 Eval(EvalError),
136 /// Front-end accepted a construct that the v0.x executor doesn't support.
137 Unsupported(String),
138 /// `BEGIN` while another transaction is already open.
139 TransactionAlreadyOpen,
140 /// `COMMIT` / `ROLLBACK` with no active transaction.
141 NoActiveTransaction,
142 /// v4.0 sentinel: `execute_readonly` got a statement that
143 /// mutates engine state (INSERT / CREATE / BEGIN / COMMIT / …).
144 /// The caller should retake the write lock and dispatch through
145 /// `execute(&mut self)` instead.
146 WriteRequired,
147 /// v4.2: a SELECT would have returned more rows than the
148 /// configured `max_query_rows` cap. Carries the cap.
149 RowLimitExceeded(usize),
150 /// v7.30.3 (mailrs round-26): a SELECT's join/filter
151 /// materialisation would have held more (approximate) heap
152 /// bytes than the configured `max_query_bytes` cap. The row
153 /// cap above counts rows; this counts bytes, because one row
154 /// can be a multi-MB mail body — 1000 fat rows pressure the
155 /// host long before any row ceiling trips. Carries the cap.
156 QueryBytesExceeded(usize),
157 /// v4.5: cooperative cancellation — the host (server's
158 /// per-query watchdog) set the cancel flag while a long-running
159 /// SELECT / UPDATE / DELETE was scanning rows. The partial work
160 /// is discarded; the caller should surface this as a timeout
161 /// to the client.
162 Cancelled,
163}
164
165impl fmt::Display for EngineError {
166 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
167 match self {
168 Self::Parse(e) => write!(f, "parse: {e}"),
169 Self::Storage(e) => write!(f, "storage: {e}"),
170 Self::Eval(e) => write!(f, "eval: {e}"),
171 Self::Unsupported(s) => write!(f, "unsupported: {s}"),
172 Self::TransactionAlreadyOpen => f.write_str("a transaction is already open"),
173 Self::NoActiveTransaction => f.write_str("no active transaction"),
174 Self::WriteRequired => {
175 f.write_str("statement requires a write lock (use execute, not execute_readonly)")
176 }
177 Self::RowLimitExceeded(n) => {
178 write!(f, "query exceeded max_query_rows={n}")
179 }
180 Self::QueryBytesExceeded(n) => {
181 write!(
182 f,
183 "query materialisation exceeded max_query_bytes={n} (set SPG_MAX_QUERY_BYTES to raise, 0 to disable)"
184 )
185 }
186 Self::Cancelled => f.write_str("query cancelled (timeout or client request)"),
187 }
188 }
189}
190
191impl From<ParseError> for EngineError {
192 fn from(e: ParseError) -> Self {
193 Self::Parse(e)
194 }
195}
196impl From<StorageError> for EngineError {
197 fn from(e: StorageError) -> Self {
198 Self::Storage(e)
199 }
200}
201impl From<EvalError> for EngineError {
202 fn from(e: EvalError) -> Self {
203 Self::Eval(e)
204 }
205}
206
207/// The execution engine. Holds the catalog and (later) other server-scope
208/// state. `Engine::new()` is intentionally cheap so callers can construct one
209/// per database, per test.
210/// Function pointer that returns "now" as microseconds since Unix
211/// epoch. The engine is `no_std`, so it can't reach for `std::time`
212/// itself — callers (`spg-server`, the sqllogictest runner) inject a
213/// concrete implementation. `None` means `NOW()` / `CURRENT_*` raise
214/// `Unsupported`.
215pub type ClockFn = fn() -> i64;
216
217/// Function pointer that produces 16 cryptographically random bytes.
218/// Like `ClockFn`, the engine is `no_std` and can't reach for /dev/urandom
219/// itself — host (`spg-server`) injects an OS-backed source. `None`
220/// means SQL-driven `CREATE USER` falls back to a deterministic salt
221/// derived from the username (acceptable in tests; the server always
222/// installs a real RNG so production paths never see this).
223pub type SaltFn = fn() -> [u8; 16];
224
225/// v4.5 cooperative cancellation token. A long-running SELECT /
226/// UPDATE / DELETE checks `is_cancelled` at row-loop checkpoints
227/// and bails with `EngineError::Cancelled`. The host
228/// (`spg-server`) creates an `AtomicBool` per query, spawns a
229/// watchdog thread that sets it after `SPG_QUERY_TIMEOUT_MS`,
230/// and passes it via `execute_with_cancel` / `execute_readonly_with_cancel`.
231///
232/// `CancelToken::none()` is a no-op — used by the legacy `execute`
233/// and `execute_readonly` entry points so existing callers don't
234/// change.
235/// v4.41.1 opaque transaction handle. Returned by `Engine::alloc_tx_id`,
236/// threaded through `Engine::execute_in` so dispatch can identify which
237/// in-flight TX a statement belongs to. `IMPLICIT_TX` is the reserved
238/// slot every legacy caller — engine self-tests, spg-cli, spg-embedded,
239/// startup replay — implicitly uses through the unchanged
240/// `Engine::execute(sql)` API. v4.41.1 keeps at most one active slot at
241/// runtime (dispatch holds `engine.write()` across the wrap, same as
242/// v4.34); the map shape is here to let v4.42 turn on N in-flight
243/// implicit TXs without reshuffling the engine internals.
244#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
245pub struct TxId(pub u64);
246
247/// Reserved slot used by `Engine::execute(sql)` — the legacy single-
248/// global-shadow path. New `alloc_tx_id` handles start at 1.
249pub const IMPLICIT_TX: TxId = TxId(0);
250
251/// v6.7.3 — default segment-size threshold used by `COMPACT COLD
252/// SEGMENTS` when no explicit target is supplied. Segments whose
253/// `OwnedSegment::bytes().len()` is **strictly** less than this
254/// value are eligible to merge. spg-server reads
255/// `SPG_COMPACTION_TARGET_SEGMENT_BYTES` to override.
256pub const COMPACTION_TARGET_DEFAULT_BYTES: u64 = 4 * 1024 * 1024;
257
258/// Per-slot transaction state. Held inside `tx_catalogs[tx_id]` for the
259/// lifetime of a BEGIN..COMMIT (or BEGIN..ROLLBACK) window. Drops when
260/// the TX commits (its `catalog` is moved over `Engine.catalog`) or
261/// rolls back (slot removed, catalog discarded).
262#[derive(Debug, Default, Clone)]
263struct TxState {
264 /// The TX's shadow copy of the catalog. Started as a clone of
265 /// `Engine.catalog` at BEGIN time; writes flow into it; COMMIT
266 /// installs it over `Engine.catalog`. `Catalog::clone()` is O(1)
267 /// since v4.40 (`PersistentVec` rows + `PersistentBTreeMap` indices).
268 catalog: Catalog,
269 /// Per-TX savepoint stack. Each entry pairs the savepoint name with
270 /// a clone of `catalog` at the moment `SAVEPOINT <name>` fired.
271 /// `ROLLBACK TO <name>` restores from the entry and pops everything
272 /// after it; `RELEASE <name>` discards the entry and everything
273 /// after; COMMIT/ROLLBACK clears the whole stack.
274 savepoints: Vec<(String, Catalog)>,
275}
276
277/// v7.11.0 — frozen read-only view of the engine's committed state.
278/// Constructed via [`Engine::clone_snapshot`]. Holds clones of the
279/// catalog, statistics, clock function, and row-cap config — the
280/// four fields the `execute_readonly` path actually reads. Cheap to
281/// `Clone` (each clone shares the underlying `PersistentVec` row
282/// storage; only the trie root pointers copy). Send + Sync so a
283/// snapshot can be moved across `tokio::task::spawn_blocking`
284/// boundaries without coordination.
285///
286/// The contract: a snapshot reflects the engine's state at the
287/// moment `clone_snapshot()` returned. Subsequent writes to the
288/// engine are NOT visible. Callers who need fresher data take a
289/// new snapshot.
290#[derive(Debug, Clone)]
291pub struct CatalogSnapshot {
292 catalog: Catalog,
293 statistics: statistics::Statistics,
294 clock: Option<ClockFn>,
295 max_query_rows: Option<usize>,
296}
297
298#[derive(Debug, Default)]
299pub struct Engine {
300 /// Committed catalog — what survives `Engine::snapshot()` and what
301 /// outside-TX `SELECT`s read.
302 catalog: Catalog,
303 /// Active TX slots, keyed by `TxId`. Empty when no TX is in flight.
304 /// v4.41.1 runtime invariant: at most one entry (single-writer
305 /// model unchanged). v4.42 will let dispatch hold multiple entries
306 /// concurrently for group commit + engine MVCC.
307 tx_catalogs: BTreeMap<TxId, TxState>,
308 /// Which slot the next exec_* call should mutate. Set by
309 /// `execute_in(sql, tx_id)` at the entry point; legacy `execute(sql)`
310 /// sets it to `IMPLICIT_TX`. None when no TX is in flight (read /
311 /// write goes straight against `catalog`).
312 current_tx: Option<TxId>,
313 /// Monotonic counter for `alloc_tx_id`. Starts at 1 — slot 0 is
314 /// reserved for `IMPLICIT_TX`.
315 next_tx_id: u64,
316 /// v7.22 (round-13 T3) — session string-literal dialect. `false`
317 /// (default) = PG semantics (backslash literal, `''` escape);
318 /// `true` = MySQL semantics (`\'` etc.). Flipped by the
319 /// deterministic session signals each dump emits: `SET sql_mode`
320 /// (only MySQL clients/dumps send it) turns it on,
321 /// `SET standard_conforming_strings = on` (every pg_dump
322 /// preamble) turns it off. The plan cache is cleared on every
323 /// flip — the same SQL text lexes differently per dialect.
324 backslash_escapes: bool,
325 /// Optional wall clock used to satisfy `NOW()` / `CURRENT_TIMESTAMP`
326 /// / `CURRENT_DATE`. Set by the host environment.
327 clock: Option<ClockFn>,
328 /// v4.1 cryptographic RNG for per-user password salt. Set by the
329 /// host. `None` means SQL-driven `CREATE USER` uses a
330 /// deterministic fallback — see `SaltFn`.
331 salt_fn: Option<SaltFn>,
332 /// v4.2 per-query row cap. `None` = unlimited. When set, a
333 /// SELECT that materialises more than `n` rows returns
334 /// `EngineError::RowLimitExceeded`. Enforced before the result
335 /// is shaped into wire frames so a runaway scan can't blow the
336 /// server's heap.
337 max_query_rows: Option<usize>,
338 /// v7.30.3 (mailrs round-26) per-query byte cap on join/filter
339 /// materialisation. `None` = unlimited. Approximate net
340 /// accounting (Value heap payloads + per-cell enum overhead)
341 /// charged at every point the join pipeline clones rows;
342 /// crossing the cap raises `EngineError::QueryBytesExceeded`
343 /// instead of pressuring the host into reclaim livelock. The
344 /// host wires this to `SPG_MAX_QUERY_BYTES` (embed defaults it
345 /// ON; the server keeps its allocator-precise budget as the
346 /// outer layer).
347 pub(crate) max_query_bytes: Option<usize>,
348 /// v4.1 RBAC user table. Empty means "no RBAC configured yet" —
349 /// the server decides what that means at the auth boundary
350 /// (open mode vs legacy single-password mode). User CRUD goes
351 /// through `create_user`/`drop_user`/`verify_user`; persistence
352 /// rides the snapshot envelope alongside the catalog.
353 pub(crate) users: UserStore,
354 /// v6.1.2 logical-replication publication catalog. Empty until
355 /// `CREATE PUBLICATION` runs. Persistence rides the v3 envelope
356 /// trailer (see `build_envelope`).
357 publications: publications::Publications,
358 /// v6.1.4 logical-replication subscription catalog. Empty until
359 /// `CREATE SUBSCRIPTION` runs. Persistence rides the v4 envelope
360 /// trailer.
361 subscriptions: subscriptions::Subscriptions,
362 /// v6.2.0 — per-column statistics for the cost-based optimizer.
363 /// Populated by `ANALYZE`; queried via `spg_statistic` virtual
364 /// table. Persistence rides the v5 envelope trailer.
365 statistics: statistics::Statistics,
366 /// v6.3.0 — engine-level plan cache. Caches the post-`prepare()`
367 /// `Statement` keyed on SQL text. In-memory only — does NOT ride
368 /// the snapshot envelope (rebuilt on demand after restart).
369 plan_cache: plan_cache::PlanCache,
370 /// v6.5.1 — per-distinct-SQL execution stats. In-memory only,
371 /// surfaced via `spg_stat_query` virtual table. Updated by the
372 /// `execute_*` paths after a successful execute.
373 query_stats: query_stats::QueryStats,
374 /// v6.5.2 — connection-state provider callback. spg-server
375 /// registers a function at startup that snapshots its
376 /// per-pgwire-connection registry into `ActivityRow`s; engine
377 /// reads through it on every `SELECT * FROM spg_stat_activity`.
378 /// `None` ⇒ no-data (returns empty rows; matches the no_std
379 /// embedded callers that don't run pgwire).
380 activity_provider: Option<ActivityProvider>,
381 /// v6.5.3 — audit-chain provider + verifier. Same pattern as
382 /// activity_provider: spg-server registers both at startup;
383 /// engine reads through on `SELECT * FROM spg_audit_chain` and
384 /// `SELECT * FROM spg_audit_verify`. `None` ⇒ no-data.
385 audit_chain_provider: Option<AuditChainProvider>,
386 audit_verifier: Option<AuditVerifier>,
387 /// v6.5.6 — slow-query log threshold in microseconds. When set,
388 /// every successful execute whose elapsed exceeds the threshold
389 /// gets fed to the registered slow-query log callback (so
390 /// spg-server can emit a structured log line). Default `None`
391 /// = no slow-query logging.
392 slow_query_threshold_us: Option<u64>,
393 slow_query_logger: Option<SlowQueryLogger>,
394 /// v7.12.1 — session parameters set via `SET <name> = <value>`.
395 /// Only `default_text_search_config` is consumed by the engine
396 /// today (the FTS function dispatcher reads it when
397 /// `to_tsvector(text)` is called without an explicit config).
398 /// All other names are accepted + recorded so PG-dump output
399 /// loads, but have no behavioural effect.
400 pub(crate) session_params: BTreeMap<String, String>,
401 /// v7.12.7 — depth counter for trigger-emitted embedded SQL.
402 /// Each time the engine executes a `DeferredEmbeddedStmt` it
403 /// increments this; the recursive `execute_stmt_with_cancel`
404 /// inside that path checks against [`MAX_TRIGGER_RECURSION`]
405 /// to bound runaway cascades (trigger A's UPDATE on table B
406 /// fires trigger B which UPDATEs table A which fires trigger
407 /// A again…). Reset to 0 once the original DML returns.
408 trigger_recursion_depth: u32,
409 /// v7.14.0 — when `SET FOREIGN_KEY_CHECKS=0` is in effect
410 /// (mysqldump preamble), the FK existence + arity check at
411 /// CREATE TABLE time is deferred. FKs referencing a
412 /// not-yet-existing parent land in `pending_foreign_keys`
413 /// keyed by child table; `SET FOREIGN_KEY_CHECKS=1` drains
414 /// the queue and resolves each FK against the now-complete
415 /// catalog. Empty by default; the queue is drained on every
416 /// `RESET ALL` too.
417 foreign_key_checks: bool,
418 /// v7.16.2 — true on the temp Engine an outer
419 /// `exec_select_with_meta_views` builds, telling that
420 /// temp engine "stop short-circuiting into the meta-view
421 /// path — your catalog already has the materialised
422 /// tables; just run the regular SELECT." Without this we'd
423 /// infinite-loop since the meta-view name (e.g.
424 /// `__spg_info_columns`) still triggers
425 /// `select_references_meta_view`.
426 meta_views_materialised: bool,
427 pending_foreign_keys: Vec<(alloc::string::String, spg_sql::ast::ForeignKeyConstraint)>,
428}
429
430/// v7.12.7 — hard cap on nested trigger-emitted embedded SQL
431/// fires. 16 deep is well past anything a normal trigger graph
432/// uses while still preventing infinite-loop wedging.
433const MAX_TRIGGER_RECURSION: u32 = 16;
434
435/// v6.5.6 — callback signature for slow-query log emission. Called
436/// with `(sql, elapsed_us)` once per successful execute that crosses
437/// the threshold.
438pub type SlowQueryLogger = fn(&str, u64);
439
440/// v6.5.2 — one row of `spg_stat_activity`. Engine-public so
441/// spg-server can construct rows without re-exporting internal
442/// dispatch types.
443#[derive(Debug, Clone)]
444pub struct ActivityRow {
445 pub pid: u32,
446 pub user: String,
447 pub started_at_us: i64,
448 pub current_sql: String,
449 pub wait_event: String,
450 pub elapsed_us: i64,
451 pub in_transaction: bool,
452 /// v7.17 Phase 2.4 — startup-param `application_name` (or the
453 /// last value the client sent via `SET application_name = '...'`).
454 /// Empty when the client never declared one.
455 pub application_name: String,
456}
457
458/// v6.5.2 — provider callback type. Fresh snapshot returned each
459/// call; engine doesn't cache the slice.
460pub type ActivityProvider = fn() -> Vec<ActivityRow>;
461
462/// v6.5.3 — one row of `spg_audit_chain`. Engine-public so
463/// spg-server can construct rows directly from `AuditEntry`.
464#[derive(Debug, Clone)]
465pub struct AuditRow {
466 pub seq: i64,
467 pub ts_ms: i64,
468 pub prev_hash_hex: String,
469 pub entry_hash_hex: String,
470 pub sql: String,
471}
472
473/// v6.5.3 — chain-table provider + verifier. spg-server registers
474/// fn pointers that snapshot / verify the audit log. `verify`
475/// returns `(verified_count, broken_at_seq)` — `broken_at_seq` is
476/// `-1` on a clean chain.
477pub type AuditChainProvider = fn() -> Vec<AuditRow>;
478pub type AuditVerifier = fn() -> (i64, i64);
479
480impl Engine {
481 pub fn new() -> Self {
482 Self {
483 catalog: Catalog::new(),
484 tx_catalogs: BTreeMap::new(),
485 current_tx: None,
486 backslash_escapes: false,
487 next_tx_id: 1,
488 clock: None,
489 salt_fn: None,
490 max_query_rows: None,
491 max_query_bytes: None,
492 users: UserStore::new(),
493 publications: publications::Publications::new(),
494 subscriptions: subscriptions::Subscriptions::new(),
495 statistics: statistics::Statistics::new(),
496 plan_cache: plan_cache::PlanCache::new(),
497 query_stats: query_stats::QueryStats::new(),
498 activity_provider: None,
499 audit_chain_provider: None,
500 audit_verifier: None,
501 slow_query_threshold_us: None,
502 slow_query_logger: None,
503 session_params: BTreeMap::new(),
504 trigger_recursion_depth: 0,
505 foreign_key_checks: true,
506 meta_views_materialised: false,
507 pending_foreign_keys: Vec::new(),
508 }
509 }
510
511 /// v7.11.0 — clone the engine's committed catalog + read-time
512 /// state into a frozen `CatalogSnapshot`. Cheap (`Catalog` is
513 /// backed by `PersistentVec`; cloning is O(log n) per table).
514 /// Subsequent writes to this engine are invisible to the
515 /// snapshot; the snapshot is self-contained and can be moved
516 /// to another thread for concurrent `execute_readonly_on_snapshot`
517 /// calls. The basis for [`AsyncReadHandle`] in spg-embedded-tokio
518 /// and any other read-fanout pattern.
519 #[must_use]
520 pub fn clone_snapshot(&self) -> CatalogSnapshot {
521 CatalogSnapshot {
522 catalog: self.active_catalog().clone(),
523 statistics: self.statistics.clone(),
524 clock: self.clock,
525 max_query_rows: self.max_query_rows,
526 }
527 }
528
529 /// Construct an engine restored from a previously-snapshotted catalog
530 /// (see `snapshot()`).
531 pub fn restore(catalog: Catalog) -> Self {
532 Self {
533 catalog,
534 tx_catalogs: BTreeMap::new(),
535 current_tx: None,
536 backslash_escapes: false,
537 next_tx_id: 1,
538 clock: None,
539 salt_fn: None,
540 max_query_rows: None,
541 max_query_bytes: None,
542 users: UserStore::new(),
543 publications: publications::Publications::new(),
544 subscriptions: subscriptions::Subscriptions::new(),
545 statistics: statistics::Statistics::new(),
546 plan_cache: plan_cache::PlanCache::new(),
547 query_stats: query_stats::QueryStats::new(),
548 activity_provider: None,
549 audit_chain_provider: None,
550 audit_verifier: None,
551 slow_query_threshold_us: None,
552 slow_query_logger: None,
553 session_params: BTreeMap::new(),
554 trigger_recursion_depth: 0,
555 foreign_key_checks: true,
556 meta_views_materialised: false,
557 pending_foreign_keys: Vec::new(),
558 }
559 }
560
561 /// Restore an engine + user table from a v4.1 envelope produced
562 /// by `snapshot_with_users()`. Falls back to plain catalog-only
563 /// restore if the envelope magic isn't present (so v3.x snapshot
564 /// files still load). v6.1.2 adds the optional publications
565 /// trailer (envelope v3); a v1/v2 envelope deserialises to an
566 /// empty publication table.
567 pub fn restore_envelope(buf: &[u8]) -> Result<Self, EngineError> {
568 match split_envelope(buf) {
569 EnvelopeParse::Pair {
570 catalog: catalog_bytes,
571 users: user_bytes,
572 publications: pub_bytes,
573 subscriptions: sub_bytes,
574 statistics: stats_bytes,
575 } => {
576 let catalog = Catalog::deserialize(catalog_bytes).map_err(EngineError::Storage)?;
577 let users = users::deserialize_users(user_bytes)
578 .map_err(|e| EngineError::Unsupported(alloc::format!("users restore: {e}")))?;
579 let publications = match pub_bytes {
580 Some(b) => publications::Publications::deserialize(b).map_err(|e| {
581 EngineError::Unsupported(alloc::format!("publications restore: {e:?}"))
582 })?,
583 None => publications::Publications::new(),
584 };
585 let subscriptions = match sub_bytes {
586 Some(b) => subscriptions::Subscriptions::deserialize(b).map_err(|e| {
587 EngineError::Unsupported(alloc::format!("subscriptions restore: {e:?}"))
588 })?,
589 None => subscriptions::Subscriptions::new(),
590 };
591 let statistics = match stats_bytes {
592 Some(b) => statistics::Statistics::deserialize(b).map_err(|e| {
593 EngineError::Unsupported(alloc::format!("statistics restore: {e:?}"))
594 })?,
595 None => statistics::Statistics::new(),
596 };
597 Ok(Self {
598 catalog,
599 tx_catalogs: BTreeMap::new(),
600 current_tx: None,
601 backslash_escapes: false,
602 next_tx_id: 1,
603 clock: None,
604 salt_fn: None,
605 max_query_rows: None,
606 max_query_bytes: None,
607 users,
608 publications,
609 subscriptions,
610 statistics,
611 plan_cache: plan_cache::PlanCache::new(),
612 query_stats: query_stats::QueryStats::new(),
613 activity_provider: None,
614 audit_chain_provider: None,
615 audit_verifier: None,
616 slow_query_threshold_us: None,
617 slow_query_logger: None,
618 session_params: BTreeMap::new(),
619 trigger_recursion_depth: 0,
620 foreign_key_checks: true,
621 meta_views_materialised: false,
622 pending_foreign_keys: Vec::new(),
623 })
624 }
625 EnvelopeParse::CrcMismatch { expected, computed } => {
626 Err(EngineError::Storage(StorageError::Corrupt(alloc::format!(
627 "snapshot envelope CRC32 mismatch (expected={expected:#010x}, computed={computed:#010x})"
628 ))))
629 }
630 EnvelopeParse::Bare => {
631 let catalog = Catalog::deserialize(buf).map_err(EngineError::Storage)?;
632 Ok(Self::restore(catalog))
633 }
634 }
635 }
636
637 pub const fn users(&self) -> &UserStore {
638 &self.users
639 }
640
641 /// Builder: attach a wall clock so `NOW()` / `CURRENT_TIMESTAMP` /
642 /// `CURRENT_DATE` evaluate to a real value instead of erroring out.
643 #[must_use]
644 pub const fn with_clock(mut self, clock: ClockFn) -> Self {
645 self.clock = Some(clock);
646 self
647 }
648
649 /// Builder: attach an OS-backed RNG for per-user password salts.
650 /// The host (`spg-server`) typically wires this to `/dev/urandom`.
651 #[must_use]
652 pub const fn with_salt_fn(mut self, f: SaltFn) -> Self {
653 self.salt_fn = Some(f);
654 self
655 }
656
657 /// Builder: cap the number of rows a single SELECT may return.
658 /// Exceeding the cap raises `EngineError::RowLimitExceeded` —
659 /// the bound is checked inside the executor so a runaway
660 /// catalog scan can't allocate millions of rows before the
661 /// server gets a chance to reject the result.
662 #[must_use]
663 pub const fn with_max_query_rows(mut self, n: usize) -> Self {
664 self.max_query_rows = Some(n);
665 self
666 }
667
668 /// Builder: cap the approximate heap bytes a single SELECT's
669 /// join/filter materialisation may hold. Exceeding the cap
670 /// raises `EngineError::QueryBytesExceeded`. Rows are the wrong
671 /// unit when one row carries a multi-MB body (mailrs round-26:
672 /// 1000-row batches of full mail text walked a 15 GiB host into
673 /// reclaim livelock without ever tripping a row ceiling).
674 #[must_use]
675 pub const fn with_max_query_bytes(mut self, n: usize) -> Self {
676 self.max_query_bytes = Some(n);
677 self
678 }
679
680 /// The *committed* catalog. Note: during a transaction this returns the
681 /// pre-TX state — `SELECT` inside a TX goes through `execute()` and reads
682 /// the shadow. Tests that inspect outside-TX state should use this.
683 pub const fn catalog(&self) -> &Catalog {
684 &self.catalog
685 }
686
687 /// Serialize the *committed* catalog to bytes. v0.6 was full-snapshot; v0.9
688 /// adds the rule that an open TX's shadow is never snapshotted — only the
689 /// post-COMMIT state is persisted. v4.1 wraps the catalog in an envelope
690 /// when there are users to persist; an empty user table snapshots as the
691 /// bare catalog format (backwards-compat with v3.x readers). v6.1.2
692 /// adds publications to the envelope condition: either non-empty
693 /// users OR non-empty publications now triggers the envelope path.
694 pub fn snapshot(&self) -> Vec<u8> {
695 if self.users.is_empty()
696 && self.publications.is_empty()
697 && self.subscriptions.is_empty()
698 && self.statistics.is_empty()
699 {
700 self.catalog.serialize()
701 } else {
702 build_envelope(
703 &self.catalog.serialize(),
704 &users::serialize_users(&self.users),
705 &self.publications.serialize(),
706 &self.subscriptions.serialize(),
707 &self.statistics.serialize(),
708 )
709 }
710 }
711
712 /// True when at least one TX slot is in flight. v4.41.1 runtime
713 /// invariant: at most one slot active at a time (dispatch holds
714 /// `engine.write()` across the entire wrap). v4.42 will let this
715 /// return true with multiple slots concurrently.
716 pub fn in_transaction(&self) -> bool {
717 !self.tx_catalogs.is_empty()
718 }
719
720 /// v4.41.1 allocate a fresh TX handle. Used by spg-server dispatch
721 /// to scope each implicit-wrap BEGIN..stmt..COMMIT to its own slot
722 /// in `tx_catalogs`. v4.42 — the commit-barrier leader allocates
723 /// one of these per task in its group, runs `BEGIN`+sql+`COMMIT`
724 /// sequentially under a single `engine.write()` so each task's
725 /// mutations accumulate into shared state, then either keeps the
726 /// accumulated state (fsync OK) or restores the pre-image via
727 /// `replace_catalog` (fsync err).
728 pub fn alloc_tx_id(&mut self) -> TxId {
729 let id = TxId(self.next_tx_id);
730 self.next_tx_id = self.next_tx_id.saturating_add(1);
731 id
732 }
733
734 /// v4.42 — atomically replace the live catalog. Used by the
735 /// commit-barrier leader to roll back a group whose batched
736 /// fsync failed: the leader snapshots `engine.catalog().clone()`
737 /// (O(1) Arc bump after the v4.39/v4.40 persistent migration)
738 /// at group start, sequentially applies each task's BEGIN+sql+
739 /// COMMIT under the same write lock to accumulate mutations
740 /// into shared state, batches the WAL bytes, fsyncs once, and
741 /// on failure calls this with the pre-image to undo every
742 /// task in the group at once.
743 ///
744 /// **Does NOT touch `tx_catalogs` / `current_tx`.** Any
745 /// explicit-TX slot from a concurrent client (created via the
746 /// legacy `IMPLICIT_TX`-less dispatch path or via the future
747 /// MVCC-readers v5+ work) has its own snapshot baked into the
748 /// slot — restoring `self.catalog` to the pre-image leaves
749 /// those slots untouched, exactly as they were when the leader
750 /// took the lock. The leader's own implicit-TX slots are all
751 /// already discarded (`exec_commit` removed them as each
752 /// task's COMMIT ran) by the time this is reached.
753 pub fn replace_catalog(&mut self, catalog: Catalog) {
754 self.catalog = catalog;
755 }
756
757 /// v6.7.0 — public shim around `Catalog::freeze_oldest_to_cold`
758 /// so tests + the spg-server freezer can drive a freeze without
759 /// reaching into the private `active_catalog_mut`. v6.7.4
760 /// parallel freezer will build on this surface.
761 ///
762 /// Marks the table's cached `cold_row_count` stale because the
763 /// freeze added cold locators that ANALYZE hasn't yet refreshed.
764 pub fn freeze_oldest_to_cold(
765 &mut self,
766 table_name: &str,
767 index_name: &str,
768 max_rows: usize,
769 ) -> Result<spg_storage::FreezeReport, EngineError> {
770 let report = self
771 .active_catalog_mut()
772 .freeze_oldest_to_cold(table_name, index_name, max_rows)
773 .map_err(EngineError::Storage)?;
774 if let Some(t) = self.active_catalog_mut().get_mut(table_name) {
775 t.mark_cold_row_count_stale();
776 }
777 Ok(report)
778 }
779
780 /// v6.7.5 — public shim used by the spg-server follower's
781 /// segment-forwarding receiver. Registers a cold-tier segment
782 /// at a specific id (the master's id, as transmitted on the
783 /// wire) so the follower's BTree-Cold locators stay byte-
784 /// identical with the master's. Wraps
785 /// `Catalog::load_segment_bytes_at` under the standard
786 /// clone-mutate-replace pattern.
787 ///
788 /// Returns `Ok(())` on success **and** on the "slot already
789 /// occupied" case — a follower mid-reconnect may receive a
790 /// segment chunk for a segment_id it already has on disk
791 /// (forwarded last session); the caller should treat that
792 /// path as a no-op rather than a fatal error.
793 pub fn receive_cold_segment(
794 &mut self,
795 segment_id: u32,
796 bytes: Vec<u8>,
797 ) -> Result<(), EngineError> {
798 let mut new_cat = self.catalog.clone();
799 match new_cat.load_segment_bytes_at(segment_id, bytes) {
800 Ok(()) => {
801 self.replace_catalog(new_cat);
802 Ok(())
803 }
804 Err(StorageError::Corrupt(msg)) if msg.contains("already occupied") => Ok(()),
805 Err(e) => Err(EngineError::Storage(e)),
806 }
807 }
808
809 pub(crate) fn active_catalog(&self) -> &Catalog {
810 match self.current_tx {
811 Some(t) => self
812 .tx_catalogs
813 .get(&t)
814 .map_or(&self.catalog, |s| &s.catalog),
815 None => &self.catalog,
816 }
817 }
818
819 fn active_catalog_mut(&mut self) -> &mut Catalog {
820 let tx = self.current_tx;
821 match tx {
822 Some(t) => match self.tx_catalogs.get_mut(&t) {
823 Some(s) => &mut s.catalog,
824 None => &mut self.catalog,
825 },
826 None => &mut self.catalog,
827 }
828 }
829
830 /// Read-only execute path. Succeeds for `SELECT` / `SHOW TABLES`
831 /// / `SHOW COLUMNS`; returns `EngineError::WriteRequired` for
832 /// every other statement, so the caller can fall through to the
833 /// `&mut self` `execute` path under a write lock. Engine state is
834 /// not mutated even on the success path (`rewrite_clock_calls`
835 /// and `resolve_order_by_position` both mutate the locally-owned
836 /// AST, not `self`).
837 ///
838 /// v4.2: cap result-set size. Applied after the executor
839 /// materialises rows but before they leave the engine — wrapping
840 /// every Rows-returning exec_* function would scatter the check.
841 ///
842 /// v7.31 (memory campaign, bucket A) — the same choke point now
843 /// also enforces the BYTE budget on the final result set, so
844 /// single-table and aggregate paths (which don't route through
845 /// the join materialiser's incremental accounting) still cannot
846 /// hand the host an unbounded result. Intermediate single-table
847 /// clones are the 7.31.x follow-up (design doc, bucket A).
848 fn enforce_row_limit(
849 &self,
850 result: Result<QueryResult, EngineError>,
851 ) -> Result<QueryResult, EngineError> {
852 if let Ok(QueryResult::Rows { rows, .. }) = &result {
853 if let Some(cap) = self.max_query_rows
854 && rows.len() > cap
855 {
856 return Err(EngineError::RowLimitExceeded(cap));
857 }
858 if let Some(byte_cap) = self.max_query_bytes
859 && approx_rows_bytes(rows) > byte_cap
860 {
861 return Err(EngineError::QueryBytesExceeded(byte_cap));
862 }
863 }
864 result
865 }
866}
867
868/// v7.31 (memory campaign — ceiling-first / never-die, design v1) —
869/// per-table slice of the engine's resident-memory accounting.
870/// `hot_encoded_bytes` is the storage layer's maintained meter (what
871/// the rows encode to); `approx_resident_bytes` is what they COST in
872/// RAM (per-cell enum slots + heap payloads via `approx_row_bytes`)
873/// — the gap between the two is the representation multiplier the
874/// round-26 report measured at ~11× end-to-end.
875#[derive(Debug, Clone)]
876pub struct TableMemoryStats {
877 pub name: String,
878 pub hot_rows: u64,
879 /// Cached cold-row count (refreshed by ANALYZE — see
880 /// `Table::cold_row_count`'s staleness contract).
881 pub cold_rows: u64,
882 pub hot_encoded_bytes: u64,
883 pub approx_resident_bytes: u64,
884 pub index_count: u64,
885 /// BTree indices are walked entry-by-entry (operator surface,
886 /// not a hot path); NSW graphs and BRIN are parametric
887 /// ESTIMATES until spg-storage carries its own byte meters
888 /// (7.31.x follow-up in the design doc).
889 pub approx_index_bytes: u64,
890}
891
892/// v7.31 — whole-engine memory snapshot: the polling form of the
893/// round-26 ask-4 watermark signal. Hosts compare
894/// `total_approx_resident_bytes` (+ their own WAL/file accounting)
895/// against their deployment ceiling and shed/shrink before the
896/// kernel does it for them.
897#[derive(Debug, Clone)]
898pub struct MemoryStats {
899 pub tables: Vec<TableMemoryStats>,
900 pub total_hot_encoded_bytes: u64,
901 pub total_approx_resident_bytes: u64,
902 pub total_approx_index_bytes: u64,
903 /// The active per-query materialisation budget (bucket A), so a
904 /// monitoring host sees ceiling and usage through one call.
905 pub max_query_bytes: Option<usize>,
906}
907
908/// v6.2.0 — true for engine-managed catalog tables that the bare
909/// `ANALYZE` (no target) should skip. v6.2.0 has no internal
910/// tables yet (publications / subscriptions / users / statistics
911/// all live as engine fields, not catalog tables), so this is a
912/// reserved future-proofing hook — every existing user table is
913/// analysed.
914const fn is_internal_table_name(_name: &str) -> bool {
915 false
916}
917
918#[cfg(test)]
919mod tests;