Skip to main content

spg_engine/
lib.rs

1//! SPG execution engine — v0.3 wires the SQL front-end to the in-memory
2//! storage layer. Implements `CREATE TABLE`, single-row `INSERT VALUES`, and
3//! `SELECT * FROM <table>` (no WHERE yet — that lands in v0.4 alongside
4//! expression evaluation against rows).
5#![no_std]
6
7extern crate alloc;
8
9pub mod aggregate;
10mod bytebudget;
11mod cancel;
12mod clock;
13mod constraints;
14mod conversions;
15pub mod copy;
16mod ddl;
17pub mod describe;
18mod dml;
19mod envelope;
20pub mod eval;
21mod execute;
22mod explain;
23mod expr_analysis;
24pub mod fts;
25mod index_access;
26mod join;
27pub mod json;
28mod maintenance;
29pub mod memoize;
30mod numeric;
31mod orderby;
32pub mod plan_cache;
33mod plpgsql;
34pub mod publications;
35pub mod query_stats;
36mod readonly;
37pub mod reorder;
38mod select;
39pub mod selectivity;
40mod sequence;
41mod session;
42mod show;
43mod spg_admin;
44pub mod statistics;
45mod subquery;
46pub mod subscriptions;
47mod substitute;
48mod system_catalog;
49mod table_access;
50mod transaction;
51pub mod triggers;
52pub mod users;
53mod window;
54
55pub use crate::users::{Role, ScramSecrets, UserError, UserStore};
56pub use cancel::{CancelToken, MonotonicNowFn};
57
58use bytebudget::*;
59pub(crate) use clock::{rewrite_clock_calls, value_to_literal};
60use constraints::*;
61use conversions::*;
62pub use conversions::{
63    format_bigint_2d_text_pub, format_hstore_text, format_int_2d_text_pub, format_range_text,
64    format_text_2d_text_pub,
65};
66pub(crate) use ddl::{
67    canonicalize_set_value, enforce_enum_label, eval_runtime_default_free,
68    resolve_column_default_free,
69};
70pub(crate) use envelope::{EnvelopeParse, build_envelope, split_envelope};
71use expr_analysis::*;
72use index_access::*;
73pub(crate) use orderby::{
74    apply_offset_and_limit, apply_offset_and_limit_tagged, build_order_keys, canonical_value_repr,
75    expand_group_by_all, order_by_value_cmp, partial_sort_tagged, render_histogram_bounds,
76    resolve_order_by_position, sort_by_keys, sort_values_for_histogram, value_cmp, value_to_f64,
77};
78pub(crate) use select::{build_projection, infer_column_types, value_to_order_key};
79pub(crate) use show::render_create_table;
80pub(crate) use subquery::{
81    build_in_list_set, collect_scalar_subqueries, expr_has_subquery, expr_tree_has_subquery,
82};
83pub use substitute::substitute_placeholders;
84use substitute::*;
85use system_catalog::*;
86use window::*;
87
88use alloc::collections::BTreeMap;
89use alloc::string::String;
90use alloc::vec::Vec;
91use core::fmt;
92
93// v7.16.0 — re-export the parsed-statement AST so downstream
94// crates (spg-embedded → spg-sqlx) don't need a direct dep on
95// spg-sql for the prepare/bind handle.
96pub use spg_sql::ast::Statement as ParsedStatement;
97use spg_sql::parser::ParseError;
98use spg_storage::{Catalog, ColumnSchema, Row, StorageError};
99
100use crate::eval::EvalError;
101
102/// Result of executing one statement.
103#[derive(Debug, Clone, PartialEq)]
104#[non_exhaustive]
105pub enum QueryResult {
106    /// DDL or DML succeeded.
107    ///
108    /// `affected` is the row count for `INSERT` and 0 elsewhere.
109    /// `modified_catalog` tells the server whether this statement
110    /// caused the *committed* catalog to change — it's the signal to
111    /// snapshot/audit. False for `BEGIN`/`ROLLBACK`, false for writeful
112    /// statements executed inside a transaction (those only touch the
113    /// shadow), and true for `COMMIT` and for writes outside a TX.
114    CommandOk {
115        affected: usize,
116        modified_catalog: bool,
117    },
118    /// `SELECT` returned a (possibly empty) row set.
119    Rows {
120        columns: Vec<ColumnSchema>,
121        rows: Vec<Row>,
122    },
123}
124
125/// All errors the engine can return.
126///
127/// Marked `#[non_exhaustive]` from v7.5.0 onward: external `match`
128/// must include a `_` arm so new variants in subsequent v7.x releases
129/// are not breaking changes.
130#[derive(Debug, Clone, PartialEq)]
131#[non_exhaustive]
132pub enum EngineError {
133    Parse(ParseError),
134    Storage(StorageError),
135    Eval(EvalError),
136    /// Front-end accepted a construct that the v0.x executor doesn't support.
137    Unsupported(String),
138    /// `BEGIN` while another transaction is already open.
139    TransactionAlreadyOpen,
140    /// `COMMIT` / `ROLLBACK` with no active transaction.
141    NoActiveTransaction,
142    /// v4.0 sentinel: `execute_readonly` got a statement that
143    /// mutates engine state (INSERT / CREATE / BEGIN / COMMIT / …).
144    /// The caller should retake the write lock and dispatch through
145    /// `execute(&mut self)` instead.
146    WriteRequired,
147    /// v4.2: a SELECT would have returned more rows than the
148    /// configured `max_query_rows` cap. Carries the cap.
149    RowLimitExceeded(usize),
150    /// v7.30.3 (mailrs round-26): a SELECT's join/filter
151    /// materialisation would have held more (approximate) heap
152    /// bytes than the configured `max_query_bytes` cap. The row
153    /// cap above counts rows; this counts bytes, because one row
154    /// can be a multi-MB mail body — 1000 fat rows pressure the
155    /// host long before any row ceiling trips. Carries the cap.
156    QueryBytesExceeded(usize),
157    /// v4.5: cooperative cancellation — the host (server's
158    /// per-query watchdog) set the cancel flag while a long-running
159    /// SELECT / UPDATE / DELETE was scanning rows. The partial work
160    /// is discarded; the caller should surface this as a timeout
161    /// to the client.
162    Cancelled,
163}
164
165impl fmt::Display for EngineError {
166    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
167        match self {
168            Self::Parse(e) => write!(f, "parse: {e}"),
169            Self::Storage(e) => write!(f, "storage: {e}"),
170            Self::Eval(e) => write!(f, "eval: {e}"),
171            Self::Unsupported(s) => write!(f, "unsupported: {s}"),
172            Self::TransactionAlreadyOpen => f.write_str("a transaction is already open"),
173            Self::NoActiveTransaction => f.write_str("no active transaction"),
174            Self::WriteRequired => {
175                f.write_str("statement requires a write lock (use execute, not execute_readonly)")
176            }
177            Self::RowLimitExceeded(n) => {
178                write!(f, "query exceeded max_query_rows={n}")
179            }
180            Self::QueryBytesExceeded(n) => {
181                write!(
182                    f,
183                    "query materialisation exceeded max_query_bytes={n} (set SPG_MAX_QUERY_BYTES to raise, 0 to disable)"
184                )
185            }
186            Self::Cancelled => f.write_str("query cancelled (timeout or client request)"),
187        }
188    }
189}
190
191impl From<ParseError> for EngineError {
192    fn from(e: ParseError) -> Self {
193        Self::Parse(e)
194    }
195}
196impl From<StorageError> for EngineError {
197    fn from(e: StorageError) -> Self {
198        Self::Storage(e)
199    }
200}
201impl From<EvalError> for EngineError {
202    fn from(e: EvalError) -> Self {
203        Self::Eval(e)
204    }
205}
206
207/// The execution engine. Holds the catalog and (later) other server-scope
208/// state. `Engine::new()` is intentionally cheap so callers can construct one
209/// per database, per test.
210/// Function pointer that returns "now" as microseconds since Unix
211/// epoch. The engine is `no_std`, so it can't reach for `std::time`
212/// itself — callers (`spg-server`, the sqllogictest runner) inject a
213/// concrete implementation. `None` means `NOW()` / `CURRENT_*` raise
214/// `Unsupported`.
215pub type ClockFn = fn() -> i64;
216
217/// Function pointer that produces 16 cryptographically random bytes.
218/// Like `ClockFn`, the engine is `no_std` and can't reach for /dev/urandom
219/// itself — host (`spg-server`) injects an OS-backed source. `None`
220/// means SQL-driven `CREATE USER` falls back to a deterministic salt
221/// derived from the username (acceptable in tests; the server always
222/// installs a real RNG so production paths never see this).
223pub type SaltFn = fn() -> [u8; 16];
224
225/// v4.5 cooperative cancellation token. A long-running SELECT /
226/// UPDATE / DELETE checks `is_cancelled` at row-loop checkpoints
227/// and bails with `EngineError::Cancelled`. The host
228/// (`spg-server`) creates an `AtomicBool` per query, spawns a
229/// watchdog thread that sets it after `SPG_QUERY_TIMEOUT_MS`,
230/// and passes it via `execute_with_cancel` / `execute_readonly_with_cancel`.
231///
232/// `CancelToken::none()` is a no-op — used by the legacy `execute`
233/// and `execute_readonly` entry points so existing callers don't
234/// change.
235/// v4.41.1 opaque transaction handle. Returned by `Engine::alloc_tx_id`,
236/// threaded through `Engine::execute_in` so dispatch can identify which
237/// in-flight TX a statement belongs to. `IMPLICIT_TX` is the reserved
238/// slot every legacy caller — engine self-tests, spg-cli, spg-embedded,
239/// startup replay — implicitly uses through the unchanged
240/// `Engine::execute(sql)` API. v4.41.1 keeps at most one active slot at
241/// runtime (dispatch holds `engine.write()` across the wrap, same as
242/// v4.34); the map shape is here to let v4.42 turn on N in-flight
243/// implicit TXs without reshuffling the engine internals.
244#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
245pub struct TxId(pub u64);
246
247/// Reserved slot used by `Engine::execute(sql)` — the legacy single-
248/// global-shadow path. New `alloc_tx_id` handles start at 1.
249pub const IMPLICIT_TX: TxId = TxId(0);
250
251/// v6.7.3 — default segment-size threshold used by `COMPACT COLD
252/// SEGMENTS` when no explicit target is supplied. Segments whose
253/// `OwnedSegment::bytes().len()` is **strictly** less than this
254/// value are eligible to merge. spg-server reads
255/// `SPG_COMPACTION_TARGET_SEGMENT_BYTES` to override.
256pub const COMPACTION_TARGET_DEFAULT_BYTES: u64 = 4 * 1024 * 1024;
257
258/// Per-slot transaction state. Held inside `tx_catalogs[tx_id]` for the
259/// lifetime of a BEGIN..COMMIT (or BEGIN..ROLLBACK) window. Drops when
260/// the TX commits (its `catalog` is moved over `Engine.catalog`) or
261/// rolls back (slot removed, catalog discarded).
262#[derive(Debug, Default, Clone)]
263struct TxState {
264    /// The TX's shadow copy of the catalog. Started as a clone of
265    /// `Engine.catalog` at BEGIN time; writes flow into it; COMMIT
266    /// installs it over `Engine.catalog`. `Catalog::clone()` is O(1)
267    /// since v4.40 (`PersistentVec` rows + `PersistentBTreeMap` indices).
268    catalog: Catalog,
269    /// Per-TX savepoint stack. Each entry pairs the savepoint name with
270    /// a clone of `catalog` at the moment `SAVEPOINT <name>` fired.
271    /// `ROLLBACK TO <name>` restores from the entry and pops everything
272    /// after it; `RELEASE <name>` discards the entry and everything
273    /// after; COMMIT/ROLLBACK clears the whole stack.
274    savepoints: Vec<(String, Catalog)>,
275}
276
277/// v7.11.0 — frozen read-only view of the engine's committed state.
278/// Constructed via [`Engine::clone_snapshot`]. Holds clones of the
279/// catalog, statistics, clock function, and row-cap config — the
280/// four fields the `execute_readonly` path actually reads. Cheap to
281/// `Clone` (each clone shares the underlying `PersistentVec` row
282/// storage; only the trie root pointers copy). Send + Sync so a
283/// snapshot can be moved across `tokio::task::spawn_blocking`
284/// boundaries without coordination.
285///
286/// The contract: a snapshot reflects the engine's state at the
287/// moment `clone_snapshot()` returned. Subsequent writes to the
288/// engine are NOT visible. Callers who need fresher data take a
289/// new snapshot.
290#[derive(Debug, Clone)]
291pub struct CatalogSnapshot {
292    catalog: Catalog,
293    statistics: statistics::Statistics,
294    clock: Option<ClockFn>,
295    max_query_rows: Option<usize>,
296}
297
298#[derive(Debug, Default)]
299pub struct Engine {
300    /// Committed catalog — what survives `Engine::snapshot()` and what
301    /// outside-TX `SELECT`s read.
302    catalog: Catalog,
303    /// Active TX slots, keyed by `TxId`. Empty when no TX is in flight.
304    /// v4.41.1 runtime invariant: at most one entry (single-writer
305    /// model unchanged). v4.42 will let dispatch hold multiple entries
306    /// concurrently for group commit + engine MVCC.
307    tx_catalogs: BTreeMap<TxId, TxState>,
308    /// Which slot the next exec_* call should mutate. Set by
309    /// `execute_in(sql, tx_id)` at the entry point; legacy `execute(sql)`
310    /// sets it to `IMPLICIT_TX`. None when no TX is in flight (read /
311    /// write goes straight against `catalog`).
312    current_tx: Option<TxId>,
313    /// Monotonic counter for `alloc_tx_id`. Starts at 1 — slot 0 is
314    /// reserved for `IMPLICIT_TX`.
315    next_tx_id: u64,
316    /// v7.22 (round-13 T3) — session string-literal dialect. `false`
317    /// (default) = PG semantics (backslash literal, `''` escape);
318    /// `true` = MySQL semantics (`\'` etc.). Flipped by the
319    /// deterministic session signals each dump emits: `SET sql_mode`
320    /// (only MySQL clients/dumps send it) turns it on,
321    /// `SET standard_conforming_strings = on` (every pg_dump
322    /// preamble) turns it off. The plan cache is cleared on every
323    /// flip — the same SQL text lexes differently per dialect.
324    backslash_escapes: bool,
325    /// Optional wall clock used to satisfy `NOW()` / `CURRENT_TIMESTAMP`
326    /// / `CURRENT_DATE`. Set by the host environment.
327    clock: Option<ClockFn>,
328    /// v4.1 cryptographic RNG for per-user password salt. Set by the
329    /// host. `None` means SQL-driven `CREATE USER` uses a
330    /// deterministic fallback — see `SaltFn`.
331    salt_fn: Option<SaltFn>,
332    /// v4.2 per-query row cap. `None` = unlimited. When set, a
333    /// SELECT that materialises more than `n` rows returns
334    /// `EngineError::RowLimitExceeded`. Enforced before the result
335    /// is shaped into wire frames so a runaway scan can't blow the
336    /// server's heap.
337    max_query_rows: Option<usize>,
338    /// v7.30.3 (mailrs round-26) per-query byte cap on join/filter
339    /// materialisation. `None` = unlimited. Approximate net
340    /// accounting (Value heap payloads + per-cell enum overhead)
341    /// charged at every point the join pipeline clones rows;
342    /// crossing the cap raises `EngineError::QueryBytesExceeded`
343    /// instead of pressuring the host into reclaim livelock. The
344    /// host wires this to `SPG_MAX_QUERY_BYTES` (embed defaults it
345    /// ON; the server keeps its allocator-precise budget as the
346    /// outer layer).
347    pub(crate) max_query_bytes: Option<usize>,
348    /// v4.1 RBAC user table. Empty means "no RBAC configured yet" —
349    /// the server decides what that means at the auth boundary
350    /// (open mode vs legacy single-password mode). User CRUD goes
351    /// through `create_user`/`drop_user`/`verify_user`; persistence
352    /// rides the snapshot envelope alongside the catalog.
353    pub(crate) users: UserStore,
354    /// v6.1.2 logical-replication publication catalog. Empty until
355    /// `CREATE PUBLICATION` runs. Persistence rides the v3 envelope
356    /// trailer (see `build_envelope`).
357    publications: publications::Publications,
358    /// v6.1.4 logical-replication subscription catalog. Empty until
359    /// `CREATE SUBSCRIPTION` runs. Persistence rides the v4 envelope
360    /// trailer.
361    subscriptions: subscriptions::Subscriptions,
362    /// v6.2.0 — per-column statistics for the cost-based optimizer.
363    /// Populated by `ANALYZE`; queried via `spg_statistic` virtual
364    /// table. Persistence rides the v5 envelope trailer.
365    statistics: statistics::Statistics,
366    /// v6.3.0 — engine-level plan cache. Caches the post-`prepare()`
367    /// `Statement` keyed on SQL text. In-memory only — does NOT ride
368    /// the snapshot envelope (rebuilt on demand after restart).
369    plan_cache: plan_cache::PlanCache,
370    /// v6.5.1 — per-distinct-SQL execution stats. In-memory only,
371    /// surfaced via `spg_stat_query` virtual table. Updated by the
372    /// `execute_*` paths after a successful execute.
373    query_stats: query_stats::QueryStats,
374    /// v6.5.2 — connection-state provider callback. spg-server
375    /// registers a function at startup that snapshots its
376    /// per-pgwire-connection registry into `ActivityRow`s; engine
377    /// reads through it on every `SELECT * FROM spg_stat_activity`.
378    /// `None` ⇒ no-data (returns empty rows; matches the no_std
379    /// embedded callers that don't run pgwire).
380    activity_provider: Option<ActivityProvider>,
381    /// v6.5.3 — audit-chain provider + verifier. Same pattern as
382    /// activity_provider: spg-server registers both at startup;
383    /// engine reads through on `SELECT * FROM spg_audit_chain` and
384    /// `SELECT * FROM spg_audit_verify`. `None` ⇒ no-data.
385    audit_chain_provider: Option<AuditChainProvider>,
386    audit_verifier: Option<AuditVerifier>,
387    /// v6.5.6 — slow-query log threshold in microseconds. When set,
388    /// every successful execute whose elapsed exceeds the threshold
389    /// gets fed to the registered slow-query log callback (so
390    /// spg-server can emit a structured log line). Default `None`
391    /// = no slow-query logging.
392    slow_query_threshold_us: Option<u64>,
393    slow_query_logger: Option<SlowQueryLogger>,
394    /// v7.12.1 — session parameters set via `SET <name> = <value>`.
395    /// Only `default_text_search_config` is consumed by the engine
396    /// today (the FTS function dispatcher reads it when
397    /// `to_tsvector(text)` is called without an explicit config).
398    /// All other names are accepted + recorded so PG-dump output
399    /// loads, but have no behavioural effect.
400    pub(crate) session_params: BTreeMap<String, String>,
401    /// v7.12.7 — depth counter for trigger-emitted embedded SQL.
402    /// Each time the engine executes a `DeferredEmbeddedStmt` it
403    /// increments this; the recursive `execute_stmt_with_cancel`
404    /// inside that path checks against [`MAX_TRIGGER_RECURSION`]
405    /// to bound runaway cascades (trigger A's UPDATE on table B
406    /// fires trigger B which UPDATEs table A which fires trigger
407    /// A again…). Reset to 0 once the original DML returns.
408    trigger_recursion_depth: u32,
409    /// v7.14.0 — when `SET FOREIGN_KEY_CHECKS=0` is in effect
410    /// (mysqldump preamble), the FK existence + arity check at
411    /// CREATE TABLE time is deferred. FKs referencing a
412    /// not-yet-existing parent land in `pending_foreign_keys`
413    /// keyed by child table; `SET FOREIGN_KEY_CHECKS=1` drains
414    /// the queue and resolves each FK against the now-complete
415    /// catalog. Empty by default; the queue is drained on every
416    /// `RESET ALL` too.
417    foreign_key_checks: bool,
418    /// v7.16.2 — true on the temp Engine an outer
419    /// `exec_select_with_meta_views` builds, telling that
420    /// temp engine "stop short-circuiting into the meta-view
421    /// path — your catalog already has the materialised
422    /// tables; just run the regular SELECT." Without this we'd
423    /// infinite-loop since the meta-view name (e.g.
424    /// `__spg_info_columns`) still triggers
425    /// `select_references_meta_view`.
426    meta_views_materialised: bool,
427    pending_foreign_keys: Vec<(alloc::string::String, spg_sql::ast::ForeignKeyConstraint)>,
428}
429
430/// v7.12.7 — hard cap on nested trigger-emitted embedded SQL
431/// fires. 16 deep is well past anything a normal trigger graph
432/// uses while still preventing infinite-loop wedging.
433const MAX_TRIGGER_RECURSION: u32 = 16;
434
435/// v6.5.6 — callback signature for slow-query log emission. Called
436/// with `(sql, elapsed_us)` once per successful execute that crosses
437/// the threshold.
438pub type SlowQueryLogger = fn(&str, u64);
439
440/// v6.5.2 — one row of `spg_stat_activity`. Engine-public so
441/// spg-server can construct rows without re-exporting internal
442/// dispatch types.
443#[derive(Debug, Clone)]
444pub struct ActivityRow {
445    pub pid: u32,
446    pub user: String,
447    pub started_at_us: i64,
448    pub current_sql: String,
449    pub wait_event: String,
450    pub elapsed_us: i64,
451    pub in_transaction: bool,
452    /// v7.17 Phase 2.4 — startup-param `application_name` (or the
453    /// last value the client sent via `SET application_name = '...'`).
454    /// Empty when the client never declared one.
455    pub application_name: String,
456}
457
458/// v6.5.2 — provider callback type. Fresh snapshot returned each
459/// call; engine doesn't cache the slice.
460pub type ActivityProvider = fn() -> Vec<ActivityRow>;
461
462/// v6.5.3 — one row of `spg_audit_chain`. Engine-public so
463/// spg-server can construct rows directly from `AuditEntry`.
464#[derive(Debug, Clone)]
465pub struct AuditRow {
466    pub seq: i64,
467    pub ts_ms: i64,
468    pub prev_hash_hex: String,
469    pub entry_hash_hex: String,
470    pub sql: String,
471}
472
473/// v6.5.3 — chain-table provider + verifier. spg-server registers
474/// fn pointers that snapshot / verify the audit log. `verify`
475/// returns `(verified_count, broken_at_seq)` — `broken_at_seq` is
476/// `-1` on a clean chain.
477pub type AuditChainProvider = fn() -> Vec<AuditRow>;
478pub type AuditVerifier = fn() -> (i64, i64);
479
480impl Engine {
481    pub fn new() -> Self {
482        Self {
483            catalog: Catalog::new(),
484            tx_catalogs: BTreeMap::new(),
485            current_tx: None,
486            backslash_escapes: false,
487            next_tx_id: 1,
488            clock: None,
489            salt_fn: None,
490            max_query_rows: None,
491            max_query_bytes: None,
492            users: UserStore::new(),
493            publications: publications::Publications::new(),
494            subscriptions: subscriptions::Subscriptions::new(),
495            statistics: statistics::Statistics::new(),
496            plan_cache: plan_cache::PlanCache::new(),
497            query_stats: query_stats::QueryStats::new(),
498            activity_provider: None,
499            audit_chain_provider: None,
500            audit_verifier: None,
501            slow_query_threshold_us: None,
502            slow_query_logger: None,
503            session_params: BTreeMap::new(),
504            trigger_recursion_depth: 0,
505            foreign_key_checks: true,
506            meta_views_materialised: false,
507            pending_foreign_keys: Vec::new(),
508        }
509    }
510
511    /// v7.11.0 — clone the engine's committed catalog + read-time
512    /// state into a frozen `CatalogSnapshot`. Cheap (`Catalog` is
513    /// backed by `PersistentVec`; cloning is O(log n) per table).
514    /// Subsequent writes to this engine are invisible to the
515    /// snapshot; the snapshot is self-contained and can be moved
516    /// to another thread for concurrent `execute_readonly_on_snapshot`
517    /// calls. The basis for [`AsyncReadHandle`] in spg-embedded-tokio
518    /// and any other read-fanout pattern.
519    #[must_use]
520    pub fn clone_snapshot(&self) -> CatalogSnapshot {
521        CatalogSnapshot {
522            catalog: self.active_catalog().clone(),
523            statistics: self.statistics.clone(),
524            clock: self.clock,
525            max_query_rows: self.max_query_rows,
526        }
527    }
528
529    /// Construct an engine restored from a previously-snapshotted catalog
530    /// (see `snapshot()`).
531    pub fn restore(catalog: Catalog) -> Self {
532        Self {
533            catalog,
534            tx_catalogs: BTreeMap::new(),
535            current_tx: None,
536            backslash_escapes: false,
537            next_tx_id: 1,
538            clock: None,
539            salt_fn: None,
540            max_query_rows: None,
541            max_query_bytes: None,
542            users: UserStore::new(),
543            publications: publications::Publications::new(),
544            subscriptions: subscriptions::Subscriptions::new(),
545            statistics: statistics::Statistics::new(),
546            plan_cache: plan_cache::PlanCache::new(),
547            query_stats: query_stats::QueryStats::new(),
548            activity_provider: None,
549            audit_chain_provider: None,
550            audit_verifier: None,
551            slow_query_threshold_us: None,
552            slow_query_logger: None,
553            session_params: BTreeMap::new(),
554            trigger_recursion_depth: 0,
555            foreign_key_checks: true,
556            meta_views_materialised: false,
557            pending_foreign_keys: Vec::new(),
558        }
559    }
560
561    /// Restore an engine + user table from a v4.1 envelope produced
562    /// by `snapshot_with_users()`. Falls back to plain catalog-only
563    /// restore if the envelope magic isn't present (so v3.x snapshot
564    /// files still load). v6.1.2 adds the optional publications
565    /// trailer (envelope v3); a v1/v2 envelope deserialises to an
566    /// empty publication table.
567    pub fn restore_envelope(buf: &[u8]) -> Result<Self, EngineError> {
568        match split_envelope(buf) {
569            EnvelopeParse::Pair {
570                catalog: catalog_bytes,
571                users: user_bytes,
572                publications: pub_bytes,
573                subscriptions: sub_bytes,
574                statistics: stats_bytes,
575            } => {
576                let catalog = Catalog::deserialize(catalog_bytes).map_err(EngineError::Storage)?;
577                let users = users::deserialize_users(user_bytes)
578                    .map_err(|e| EngineError::Unsupported(alloc::format!("users restore: {e}")))?;
579                let publications = match pub_bytes {
580                    Some(b) => publications::Publications::deserialize(b).map_err(|e| {
581                        EngineError::Unsupported(alloc::format!("publications restore: {e:?}"))
582                    })?,
583                    None => publications::Publications::new(),
584                };
585                let subscriptions = match sub_bytes {
586                    Some(b) => subscriptions::Subscriptions::deserialize(b).map_err(|e| {
587                        EngineError::Unsupported(alloc::format!("subscriptions restore: {e:?}"))
588                    })?,
589                    None => subscriptions::Subscriptions::new(),
590                };
591                let statistics = match stats_bytes {
592                    Some(b) => statistics::Statistics::deserialize(b).map_err(|e| {
593                        EngineError::Unsupported(alloc::format!("statistics restore: {e:?}"))
594                    })?,
595                    None => statistics::Statistics::new(),
596                };
597                Ok(Self {
598                    catalog,
599                    tx_catalogs: BTreeMap::new(),
600                    current_tx: None,
601                    backslash_escapes: false,
602                    next_tx_id: 1,
603                    clock: None,
604                    salt_fn: None,
605                    max_query_rows: None,
606                    max_query_bytes: None,
607                    users,
608                    publications,
609                    subscriptions,
610                    statistics,
611                    plan_cache: plan_cache::PlanCache::new(),
612                    query_stats: query_stats::QueryStats::new(),
613                    activity_provider: None,
614                    audit_chain_provider: None,
615                    audit_verifier: None,
616                    slow_query_threshold_us: None,
617                    slow_query_logger: None,
618                    session_params: BTreeMap::new(),
619                    trigger_recursion_depth: 0,
620                    foreign_key_checks: true,
621                    meta_views_materialised: false,
622                    pending_foreign_keys: Vec::new(),
623                })
624            }
625            EnvelopeParse::CrcMismatch { expected, computed } => {
626                Err(EngineError::Storage(StorageError::Corrupt(alloc::format!(
627                    "snapshot envelope CRC32 mismatch (expected={expected:#010x}, computed={computed:#010x})"
628                ))))
629            }
630            EnvelopeParse::Bare => {
631                let catalog = Catalog::deserialize(buf).map_err(EngineError::Storage)?;
632                Ok(Self::restore(catalog))
633            }
634        }
635    }
636
637    pub const fn users(&self) -> &UserStore {
638        &self.users
639    }
640
641    /// Builder: attach a wall clock so `NOW()` / `CURRENT_TIMESTAMP` /
642    /// `CURRENT_DATE` evaluate to a real value instead of erroring out.
643    #[must_use]
644    pub const fn with_clock(mut self, clock: ClockFn) -> Self {
645        self.clock = Some(clock);
646        self
647    }
648
649    /// Builder: attach an OS-backed RNG for per-user password salts.
650    /// The host (`spg-server`) typically wires this to `/dev/urandom`.
651    #[must_use]
652    pub const fn with_salt_fn(mut self, f: SaltFn) -> Self {
653        self.salt_fn = Some(f);
654        self
655    }
656
657    /// Builder: cap the number of rows a single SELECT may return.
658    /// Exceeding the cap raises `EngineError::RowLimitExceeded` —
659    /// the bound is checked inside the executor so a runaway
660    /// catalog scan can't allocate millions of rows before the
661    /// server gets a chance to reject the result.
662    #[must_use]
663    pub const fn with_max_query_rows(mut self, n: usize) -> Self {
664        self.max_query_rows = Some(n);
665        self
666    }
667
668    /// Builder: cap the approximate heap bytes a single SELECT's
669    /// join/filter materialisation may hold. Exceeding the cap
670    /// raises `EngineError::QueryBytesExceeded`. Rows are the wrong
671    /// unit when one row carries a multi-MB body (mailrs round-26:
672    /// 1000-row batches of full mail text walked a 15 GiB host into
673    /// reclaim livelock without ever tripping a row ceiling).
674    #[must_use]
675    pub const fn with_max_query_bytes(mut self, n: usize) -> Self {
676        self.max_query_bytes = Some(n);
677        self
678    }
679
680    /// The *committed* catalog. Note: during a transaction this returns the
681    /// pre-TX state — `SELECT` inside a TX goes through `execute()` and reads
682    /// the shadow. Tests that inspect outside-TX state should use this.
683    pub const fn catalog(&self) -> &Catalog {
684        &self.catalog
685    }
686
687    /// Serialize the *committed* catalog to bytes. v0.6 was full-snapshot; v0.9
688    /// adds the rule that an open TX's shadow is never snapshotted — only the
689    /// post-COMMIT state is persisted. v4.1 wraps the catalog in an envelope
690    /// when there are users to persist; an empty user table snapshots as the
691    /// bare catalog format (backwards-compat with v3.x readers). v6.1.2
692    /// adds publications to the envelope condition: either non-empty
693    /// users OR non-empty publications now triggers the envelope path.
694    pub fn snapshot(&self) -> Vec<u8> {
695        if self.users.is_empty()
696            && self.publications.is_empty()
697            && self.subscriptions.is_empty()
698            && self.statistics.is_empty()
699        {
700            self.catalog.serialize()
701        } else {
702            build_envelope(
703                &self.catalog.serialize(),
704                &users::serialize_users(&self.users),
705                &self.publications.serialize(),
706                &self.subscriptions.serialize(),
707                &self.statistics.serialize(),
708            )
709        }
710    }
711
712    /// True when at least one TX slot is in flight. v4.41.1 runtime
713    /// invariant: at most one slot active at a time (dispatch holds
714    /// `engine.write()` across the entire wrap). v4.42 will let this
715    /// return true with multiple slots concurrently.
716    pub fn in_transaction(&self) -> bool {
717        !self.tx_catalogs.is_empty()
718    }
719
720    /// v4.41.1 allocate a fresh TX handle. Used by spg-server dispatch
721    /// to scope each implicit-wrap BEGIN..stmt..COMMIT to its own slot
722    /// in `tx_catalogs`. v4.42 — the commit-barrier leader allocates
723    /// one of these per task in its group, runs `BEGIN`+sql+`COMMIT`
724    /// sequentially under a single `engine.write()` so each task's
725    /// mutations accumulate into shared state, then either keeps the
726    /// accumulated state (fsync OK) or restores the pre-image via
727    /// `replace_catalog` (fsync err).
728    pub fn alloc_tx_id(&mut self) -> TxId {
729        let id = TxId(self.next_tx_id);
730        self.next_tx_id = self.next_tx_id.saturating_add(1);
731        id
732    }
733
734    /// v4.42 — atomically replace the live catalog. Used by the
735    /// commit-barrier leader to roll back a group whose batched
736    /// fsync failed: the leader snapshots `engine.catalog().clone()`
737    /// (O(1) Arc bump after the v4.39/v4.40 persistent migration)
738    /// at group start, sequentially applies each task's BEGIN+sql+
739    /// COMMIT under the same write lock to accumulate mutations
740    /// into shared state, batches the WAL bytes, fsyncs once, and
741    /// on failure calls this with the pre-image to undo every
742    /// task in the group at once.
743    ///
744    /// **Does NOT touch `tx_catalogs` / `current_tx`.** Any
745    /// explicit-TX slot from a concurrent client (created via the
746    /// legacy `IMPLICIT_TX`-less dispatch path or via the future
747    /// MVCC-readers v5+ work) has its own snapshot baked into the
748    /// slot — restoring `self.catalog` to the pre-image leaves
749    /// those slots untouched, exactly as they were when the leader
750    /// took the lock. The leader's own implicit-TX slots are all
751    /// already discarded (`exec_commit` removed them as each
752    /// task's COMMIT ran) by the time this is reached.
753    pub fn replace_catalog(&mut self, catalog: Catalog) {
754        self.catalog = catalog;
755    }
756
757    /// v6.7.0 — public shim around `Catalog::freeze_oldest_to_cold`
758    /// so tests + the spg-server freezer can drive a freeze without
759    /// reaching into the private `active_catalog_mut`. v6.7.4
760    /// parallel freezer will build on this surface.
761    ///
762    /// Marks the table's cached `cold_row_count` stale because the
763    /// freeze added cold locators that ANALYZE hasn't yet refreshed.
764    pub fn freeze_oldest_to_cold(
765        &mut self,
766        table_name: &str,
767        index_name: &str,
768        max_rows: usize,
769    ) -> Result<spg_storage::FreezeReport, EngineError> {
770        let report = self
771            .active_catalog_mut()
772            .freeze_oldest_to_cold(table_name, index_name, max_rows)
773            .map_err(EngineError::Storage)?;
774        if let Some(t) = self.active_catalog_mut().get_mut(table_name) {
775            t.mark_cold_row_count_stale();
776        }
777        Ok(report)
778    }
779
780    /// v6.7.5 — public shim used by the spg-server follower's
781    /// segment-forwarding receiver. Registers a cold-tier segment
782    /// at a specific id (the master's id, as transmitted on the
783    /// wire) so the follower's BTree-Cold locators stay byte-
784    /// identical with the master's. Wraps
785    /// `Catalog::load_segment_bytes_at` under the standard
786    /// clone-mutate-replace pattern.
787    ///
788    /// Returns `Ok(())` on success **and** on the "slot already
789    /// occupied" case — a follower mid-reconnect may receive a
790    /// segment chunk for a segment_id it already has on disk
791    /// (forwarded last session); the caller should treat that
792    /// path as a no-op rather than a fatal error.
793    pub fn receive_cold_segment(
794        &mut self,
795        segment_id: u32,
796        bytes: Vec<u8>,
797    ) -> Result<(), EngineError> {
798        let mut new_cat = self.catalog.clone();
799        match new_cat.load_segment_bytes_at(segment_id, bytes) {
800            Ok(()) => {
801                self.replace_catalog(new_cat);
802                Ok(())
803            }
804            Err(StorageError::Corrupt(msg)) if msg.contains("already occupied") => Ok(()),
805            Err(e) => Err(EngineError::Storage(e)),
806        }
807    }
808
809    pub(crate) fn active_catalog(&self) -> &Catalog {
810        match self.current_tx {
811            Some(t) => self
812                .tx_catalogs
813                .get(&t)
814                .map_or(&self.catalog, |s| &s.catalog),
815            None => &self.catalog,
816        }
817    }
818
819    fn active_catalog_mut(&mut self) -> &mut Catalog {
820        let tx = self.current_tx;
821        match tx {
822            Some(t) => match self.tx_catalogs.get_mut(&t) {
823                Some(s) => &mut s.catalog,
824                None => &mut self.catalog,
825            },
826            None => &mut self.catalog,
827        }
828    }
829
830    /// Read-only execute path. Succeeds for `SELECT` / `SHOW TABLES`
831    /// / `SHOW COLUMNS`; returns `EngineError::WriteRequired` for
832    /// every other statement, so the caller can fall through to the
833    /// `&mut self` `execute` path under a write lock. Engine state is
834    /// not mutated even on the success path (`rewrite_clock_calls`
835    /// and `resolve_order_by_position` both mutate the locally-owned
836    /// AST, not `self`).
837    ///
838    /// v4.2: cap result-set size. Applied after the executor
839    /// materialises rows but before they leave the engine — wrapping
840    /// every Rows-returning exec_* function would scatter the check.
841    ///
842    /// v7.31 (memory campaign, bucket A) — the same choke point now
843    /// also enforces the BYTE budget on the final result set, so
844    /// single-table and aggregate paths (which don't route through
845    /// the join materialiser's incremental accounting) still cannot
846    /// hand the host an unbounded result. Intermediate single-table
847    /// clones are the 7.31.x follow-up (design doc, bucket A).
848    fn enforce_row_limit(
849        &self,
850        result: Result<QueryResult, EngineError>,
851    ) -> Result<QueryResult, EngineError> {
852        if let Ok(QueryResult::Rows { rows, .. }) = &result {
853            if let Some(cap) = self.max_query_rows
854                && rows.len() > cap
855            {
856                return Err(EngineError::RowLimitExceeded(cap));
857            }
858            if let Some(byte_cap) = self.max_query_bytes
859                && approx_rows_bytes(rows) > byte_cap
860            {
861                return Err(EngineError::QueryBytesExceeded(byte_cap));
862            }
863        }
864        result
865    }
866}
867
868/// v7.31 (memory campaign — ceiling-first / never-die, design v1) —
869/// per-table slice of the engine's resident-memory accounting.
870/// `hot_encoded_bytes` is the storage layer's maintained meter (what
871/// the rows encode to); `approx_resident_bytes` is what they COST in
872/// RAM (per-cell enum slots + heap payloads via `approx_row_bytes`)
873/// — the gap between the two is the representation multiplier the
874/// round-26 report measured at ~11× end-to-end.
875#[derive(Debug, Clone)]
876pub struct TableMemoryStats {
877    pub name: String,
878    pub hot_rows: u64,
879    /// Cached cold-row count (refreshed by ANALYZE — see
880    /// `Table::cold_row_count`'s staleness contract).
881    pub cold_rows: u64,
882    pub hot_encoded_bytes: u64,
883    pub approx_resident_bytes: u64,
884    pub index_count: u64,
885    /// BTree indices are walked entry-by-entry (operator surface,
886    /// not a hot path); NSW graphs and BRIN are parametric
887    /// ESTIMATES until spg-storage carries its own byte meters
888    /// (7.31.x follow-up in the design doc).
889    pub approx_index_bytes: u64,
890}
891
892/// v7.31 — whole-engine memory snapshot: the polling form of the
893/// round-26 ask-4 watermark signal. Hosts compare
894/// `total_approx_resident_bytes` (+ their own WAL/file accounting)
895/// against their deployment ceiling and shed/shrink before the
896/// kernel does it for them.
897#[derive(Debug, Clone)]
898pub struct MemoryStats {
899    pub tables: Vec<TableMemoryStats>,
900    pub total_hot_encoded_bytes: u64,
901    pub total_approx_resident_bytes: u64,
902    pub total_approx_index_bytes: u64,
903    /// The active per-query materialisation budget (bucket A), so a
904    /// monitoring host sees ceiling and usage through one call.
905    pub max_query_bytes: Option<usize>,
906}
907
908/// v6.2.0 — true for engine-managed catalog tables that the bare
909/// `ANALYZE` (no target) should skip. v6.2.0 has no internal
910/// tables yet (publications / subscriptions / users / statistics
911/// all live as engine fields, not catalog tables), so this is a
912/// reserved future-proofing hook — every existing user table is
913/// analysed.
914const fn is_internal_table_name(_name: &str) -> bool {
915    false
916}
917
918#[cfg(test)]
919mod tests;