Skip to main content

spg_engine/
lib.rs

1//! SPG execution engine — v0.3 wires the SQL front-end to the in-memory
2//! storage layer. Implements `CREATE TABLE`, single-row `INSERT VALUES`, and
3//! `SELECT * FROM <table>` (no WHERE yet — that lands in v0.4 alongside
4//! expression evaluation against rows).
5#![no_std]
6
7extern crate alloc;
8
9pub mod aggregate;
10mod bytebudget;
11mod cancel;
12mod clock;
13mod constraints;
14mod conversions;
15pub mod copy;
16mod ddl;
17pub mod describe;
18mod dml;
19mod envelope;
20pub mod eval;
21mod execute;
22mod explain;
23mod expr_analysis;
24pub mod fts;
25mod index_access;
26mod join;
27pub mod json;
28mod maintenance;
29pub mod memoize;
30mod numeric;
31mod orderby;
32pub mod plan_cache;
33mod plpgsql;
34pub mod publications;
35pub mod query_stats;
36mod readonly;
37pub mod reorder;
38mod select;
39pub mod selectivity;
40mod sequence;
41mod session;
42mod show;
43mod spg_admin;
44pub mod statistics;
45mod subquery;
46pub mod subscriptions;
47mod substitute;
48mod system_catalog;
49mod table_access;
50mod transaction;
51pub mod triggers;
52pub mod users;
53mod window;
54
55pub use crate::users::{Role, ScramSecrets, UserError, UserStore};
56pub use cancel::{CancelToken, MonotonicNowFn};
57
58use bytebudget::*;
59pub(crate) use clock::{rewrite_clock_calls, value_to_literal};
60use constraints::*;
61use conversions::*;
62pub use conversions::{
63    format_bigint_2d_text_pub, format_hstore_text, format_int_2d_text_pub, format_range_text,
64    format_text_2d_text_pub,
65};
66pub(crate) use ddl::{
67    canonicalize_set_value, enforce_enum_label, eval_runtime_default_free,
68    resolve_column_default_free,
69};
70pub(crate) use envelope::{EnvelopeParse, build_envelope, split_envelope};
71use expr_analysis::*;
72use index_access::*;
73pub(crate) use orderby::{
74    apply_offset_and_limit, apply_offset_and_limit_tagged, build_order_keys, canonical_value_repr,
75    expand_group_by_all, order_by_value_cmp, partial_sort_tagged, render_histogram_bounds,
76    resolve_order_by_position, sort_by_keys, sort_values_for_histogram, value_cmp, value_to_f64,
77};
78pub(crate) use select::{build_projection, infer_column_types, value_to_order_key};
79pub(crate) use show::render_create_table;
80pub(crate) use subquery::{
81    build_in_list_set, collect_scalar_subqueries, expr_has_subquery, expr_tree_has_subquery,
82};
83pub use substitute::substitute_placeholders;
84use substitute::*;
85use system_catalog::*;
86use window::*;
87
88use alloc::collections::BTreeMap;
89use alloc::string::String;
90use alloc::vec::Vec;
91use core::fmt;
92
93// v7.16.0 — re-export the parsed-statement AST so downstream
94// crates (spg-embedded → spg-sqlx) don't need a direct dep on
95// spg-sql for the prepare/bind handle.
96pub use spg_sql::ast::Statement as ParsedStatement;
97use spg_sql::parser::ParseError;
98use spg_storage::{Catalog, ColumnSchema, Row, RowChange, StorageError};
99
100use crate::eval::EvalError;
101
102/// Result of executing one statement.
103#[derive(Debug, Clone, PartialEq)]
104#[non_exhaustive]
105pub enum QueryResult {
106    /// DDL or DML succeeded.
107    ///
108    /// `affected` is the row count for `INSERT` and 0 elsewhere.
109    /// `modified_catalog` tells the server whether this statement
110    /// caused the *committed* catalog to change — it's the signal to
111    /// snapshot/audit. False for `BEGIN`/`ROLLBACK`, false for writeful
112    /// statements executed inside a transaction (those only touch the
113    /// shadow), and true for `COMMIT` and for writes outside a TX.
114    CommandOk {
115        affected: usize,
116        modified_catalog: bool,
117    },
118    /// `SELECT` returned a (possibly empty) row set.
119    Rows {
120        columns: Vec<ColumnSchema>,
121        rows: Vec<Row>,
122    },
123}
124
125/// All errors the engine can return.
126///
127/// Marked `#[non_exhaustive]` from v7.5.0 onward: external `match`
128/// must include a `_` arm so new variants in subsequent v7.x releases
129/// are not breaking changes.
130#[derive(Debug, Clone, PartialEq)]
131#[non_exhaustive]
132pub enum EngineError {
133    Parse(ParseError),
134    Storage(StorageError),
135    Eval(EvalError),
136    /// Front-end accepted a construct that the v0.x executor doesn't support.
137    Unsupported(String),
138    /// `BEGIN` while another transaction is already open.
139    TransactionAlreadyOpen,
140    /// `COMMIT` / `ROLLBACK` with no active transaction.
141    NoActiveTransaction,
142    /// v4.0 sentinel: `execute_readonly` got a statement that
143    /// mutates engine state (INSERT / CREATE / BEGIN / COMMIT / …).
144    /// The caller should retake the write lock and dispatch through
145    /// `execute(&mut self)` instead.
146    WriteRequired,
147    /// v4.2: a SELECT would have returned more rows than the
148    /// configured `max_query_rows` cap. Carries the cap.
149    RowLimitExceeded(usize),
150    /// v7.30.3 (mailrs round-26): a SELECT's join/filter
151    /// materialisation would have held more (approximate) heap
152    /// bytes than the configured `max_query_bytes` cap. The row
153    /// cap above counts rows; this counts bytes, because one row
154    /// can be a multi-MB mail body — 1000 fat rows pressure the
155    /// host long before any row ceiling trips. Carries the cap.
156    QueryBytesExceeded(usize),
157    /// v4.5: cooperative cancellation — the host (server's
158    /// per-query watchdog) set the cancel flag while a long-running
159    /// SELECT / UPDATE / DELETE was scanning rows. The partial work
160    /// is discarded; the caller should surface this as a timeout
161    /// to the client.
162    Cancelled,
163}
164
165impl fmt::Display for EngineError {
166    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
167        match self {
168            Self::Parse(e) => write!(f, "parse: {e}"),
169            Self::Storage(e) => write!(f, "storage: {e}"),
170            Self::Eval(e) => write!(f, "eval: {e}"),
171            Self::Unsupported(s) => write!(f, "unsupported: {s}"),
172            Self::TransactionAlreadyOpen => f.write_str("a transaction is already open"),
173            Self::NoActiveTransaction => f.write_str("no active transaction"),
174            Self::WriteRequired => {
175                f.write_str("statement requires a write lock (use execute, not execute_readonly)")
176            }
177            Self::RowLimitExceeded(n) => {
178                write!(f, "query exceeded max_query_rows={n}")
179            }
180            Self::QueryBytesExceeded(n) => {
181                write!(
182                    f,
183                    "query materialisation exceeded max_query_bytes={n} (set SPG_MAX_QUERY_BYTES to raise, 0 to disable)"
184                )
185            }
186            Self::Cancelled => f.write_str("query cancelled (timeout or client request)"),
187        }
188    }
189}
190
191impl From<ParseError> for EngineError {
192    fn from(e: ParseError) -> Self {
193        Self::Parse(e)
194    }
195}
196impl From<StorageError> for EngineError {
197    fn from(e: StorageError) -> Self {
198        Self::Storage(e)
199    }
200}
201impl From<EvalError> for EngineError {
202    fn from(e: EvalError) -> Self {
203        Self::Eval(e)
204    }
205}
206
207/// The execution engine. Holds the catalog and (later) other server-scope
208/// state. `Engine::new()` is intentionally cheap so callers can construct one
209/// per database, per test.
210/// Function pointer that returns "now" as microseconds since Unix
211/// epoch. The engine is `no_std`, so it can't reach for `std::time`
212/// itself — callers (`spg-server`, the sqllogictest runner) inject a
213/// concrete implementation. `None` means `NOW()` / `CURRENT_*` raise
214/// `Unsupported`.
215pub type ClockFn = fn() -> i64;
216
217/// Function pointer that produces 16 cryptographically random bytes.
218/// Like `ClockFn`, the engine is `no_std` and can't reach for /dev/urandom
219/// itself — host (`spg-server`) injects an OS-backed source. `None`
220/// means SQL-driven `CREATE USER` falls back to a deterministic salt
221/// derived from the username (acceptable in tests; the server always
222/// installs a real RNG so production paths never see this).
223pub type SaltFn = fn() -> [u8; 16];
224
225/// v4.5 cooperative cancellation token. A long-running SELECT /
226/// UPDATE / DELETE checks `is_cancelled` at row-loop checkpoints
227/// and bails with `EngineError::Cancelled`. The host
228/// (`spg-server`) creates an `AtomicBool` per query, spawns a
229/// watchdog thread that sets it after `SPG_QUERY_TIMEOUT_MS`,
230/// and passes it via `execute_with_cancel` / `execute_readonly_with_cancel`.
231///
232/// `CancelToken::none()` is a no-op — used by the legacy `execute`
233/// and `execute_readonly` entry points so existing callers don't
234/// change.
235/// v4.41.1 opaque transaction handle. Returned by `Engine::alloc_tx_id`,
236/// threaded through `Engine::execute_in` so dispatch can identify which
237/// in-flight TX a statement belongs to. `IMPLICIT_TX` is the reserved
238/// slot every legacy caller — engine self-tests, spg-cli, spg-embedded,
239/// startup replay — implicitly uses through the unchanged
240/// `Engine::execute(sql)` API. v4.41.1 keeps at most one active slot at
241/// runtime (dispatch holds `engine.write()` across the wrap, same as
242/// v4.34); the map shape is here to let v4.42 turn on N in-flight
243/// implicit TXs without reshuffling the engine internals.
244#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
245pub struct TxId(pub u64);
246
247/// Reserved slot used by `Engine::execute(sql)` — the legacy single-
248/// global-shadow path. New `alloc_tx_id` handles start at 1.
249pub const IMPLICIT_TX: TxId = TxId(0);
250
251/// v6.7.3 — default segment-size threshold used by `COMPACT COLD
252/// SEGMENTS` when no explicit target is supplied. Segments whose
253/// `OwnedSegment::bytes().len()` is **strictly** less than this
254/// value are eligible to merge. spg-server reads
255/// `SPG_COMPACTION_TARGET_SEGMENT_BYTES` to override.
256pub const COMPACTION_TARGET_DEFAULT_BYTES: u64 = 4 * 1024 * 1024;
257
258/// Per-slot transaction state. Held inside `tx_catalogs[tx_id]` for the
259/// lifetime of a BEGIN..COMMIT (or BEGIN..ROLLBACK) window. Drops when
260/// the TX commits (its `catalog` is moved over `Engine.catalog`) or
261/// rolls back (slot removed, catalog discarded).
262#[derive(Debug, Default, Clone)]
263struct TxState {
264    /// The TX's shadow copy of the catalog. Started as a clone of
265    /// `Engine.catalog` at BEGIN time; writes flow into it; COMMIT
266    /// installs it over `Engine.catalog`. `Catalog::clone()` is O(1)
267    /// since v4.40 (`PersistentVec` rows + `PersistentBTreeMap` indices).
268    catalog: Catalog,
269    /// Per-TX savepoint stack. Each entry pairs the savepoint name with
270    /// a clone of `catalog` at the moment `SAVEPOINT <name>` fired.
271    /// `ROLLBACK TO <name>` restores from the entry and pops everything
272    /// after it; `RELEASE <name>` discards the entry and everything
273    /// after; COMMIT/ROLLBACK clears the whole stack.
274    savepoints: Vec<(String, Catalog)>,
275}
276
277/// v7.11.0 — frozen read-only view of the engine's committed state.
278/// Constructed via [`Engine::clone_snapshot`]. Holds clones of the
279/// catalog, statistics, clock function, and row-cap config — the
280/// four fields the `execute_readonly` path actually reads. Cheap to
281/// `Clone` (each clone shares the underlying `PersistentVec` row
282/// storage; only the trie root pointers copy). Send + Sync so a
283/// snapshot can be moved across `tokio::task::spawn_blocking`
284/// boundaries without coordination.
285///
286/// The contract: a snapshot reflects the engine's state at the
287/// moment `clone_snapshot()` returned. Subsequent writes to the
288/// engine are NOT visible. Callers who need fresher data take a
289/// new snapshot.
290#[derive(Debug, Clone)]
291pub struct CatalogSnapshot {
292    catalog: Catalog,
293    statistics: statistics::Statistics,
294    clock: Option<ClockFn>,
295    max_query_rows: Option<usize>,
296}
297
298/// CoW-1 (v7.34) — frozen view of the *persisted* committed engine
299/// state. Carries every field the `snapshot()` envelope serializes;
300/// `Clone` is O(1) on the catalog (Arc bump) and cheap typed-clones
301/// on the trailers. Decouples "capture state" from "serialize bytes"
302/// so the background-checkpoint worker can hold the snapshot and
303/// produce bytes off the engine write lock.
304#[derive(Debug, Clone)]
305pub struct EngineSnapshot {
306    catalog: Catalog,
307    users: UserStore,
308    publications: publications::Publications,
309    subscriptions: subscriptions::Subscriptions,
310    statistics: statistics::Statistics,
311}
312
313impl EngineSnapshot {
314    /// Same envelope rules as `Engine::snapshot()`: bare catalog when
315    /// every trailer is empty, full envelope otherwise.
316    pub fn serialize(&self) -> Vec<u8> {
317        if self.users.is_empty()
318            && self.publications.is_empty()
319            && self.subscriptions.is_empty()
320            && self.statistics.is_empty()
321        {
322            self.catalog.serialize()
323        } else {
324            build_envelope(
325                &self.catalog.serialize(),
326                &users::serialize_users(&self.users),
327                &self.publications.serialize(),
328                &self.subscriptions.serialize(),
329                &self.statistics.serialize(),
330            )
331        }
332    }
333}
334
335// The engine carries several independent session/capture flags (dialect,
336// FK-checks, meta-view materialisation, redo capture); they're orthogonal
337// switches, not a state enum begging to be modelled.
338#[allow(clippy::struct_excessive_bools)]
339#[derive(Debug, Default)]
340pub struct Engine {
341    /// Committed catalog — what survives `Engine::snapshot()` and what
342    /// outside-TX `SELECT`s read.
343    catalog: Catalog,
344    /// Active TX slots, keyed by `TxId`. Empty when no TX is in flight.
345    /// v4.41.1 runtime invariant: at most one entry (single-writer
346    /// model unchanged). v4.42 will let dispatch hold multiple entries
347    /// concurrently for group commit + engine MVCC.
348    tx_catalogs: BTreeMap<TxId, TxState>,
349    /// Which slot the next exec_* call should mutate. Set by
350    /// `execute_in(sql, tx_id)` at the entry point; legacy `execute(sql)`
351    /// sets it to `IMPLICIT_TX`. None when no TX is in flight (read /
352    /// write goes straight against `catalog`).
353    current_tx: Option<TxId>,
354    /// Monotonic counter for `alloc_tx_id`. Starts at 1 — slot 0 is
355    /// reserved for `IMPLICIT_TX`.
356    next_tx_id: u64,
357    /// v7.22 (round-13 T3) — session string-literal dialect. `false`
358    /// (default) = PG semantics (backslash literal, `''` escape);
359    /// `true` = MySQL semantics (`\'` etc.). Flipped by the
360    /// deterministic session signals each dump emits: `SET sql_mode`
361    /// (only MySQL clients/dumps send it) turns it on,
362    /// `SET standard_conforming_strings = on` (every pg_dump
363    /// preamble) turns it off. The plan cache is cleared on every
364    /// flip — the same SQL text lexes differently per dialect.
365    backslash_escapes: bool,
366    /// Optional wall clock used to satisfy `NOW()` / `CURRENT_TIMESTAMP`
367    /// / `CURRENT_DATE`. Set by the host environment.
368    clock: Option<ClockFn>,
369    /// v4.1 cryptographic RNG for per-user password salt. Set by the
370    /// host. `None` means SQL-driven `CREATE USER` uses a
371    /// deterministic fallback — see `SaltFn`.
372    salt_fn: Option<SaltFn>,
373    /// v4.2 per-query row cap. `None` = unlimited. When set, a
374    /// SELECT that materialises more than `n` rows returns
375    /// `EngineError::RowLimitExceeded`. Enforced before the result
376    /// is shaped into wire frames so a runaway scan can't blow the
377    /// server's heap.
378    max_query_rows: Option<usize>,
379    /// v7.30.3 (mailrs round-26) per-query byte cap on join/filter
380    /// materialisation. `None` = unlimited. Approximate net
381    /// accounting (Value heap payloads + per-cell enum overhead)
382    /// charged at every point the join pipeline clones rows;
383    /// crossing the cap raises `EngineError::QueryBytesExceeded`
384    /// instead of pressuring the host into reclaim livelock. The
385    /// host wires this to `SPG_MAX_QUERY_BYTES` (embed defaults it
386    /// ON; the server keeps its allocator-precise budget as the
387    /// outer layer).
388    pub(crate) max_query_bytes: Option<usize>,
389    /// v4.1 RBAC user table. Empty means "no RBAC configured yet" —
390    /// the server decides what that means at the auth boundary
391    /// (open mode vs legacy single-password mode). User CRUD goes
392    /// through `create_user`/`drop_user`/`verify_user`; persistence
393    /// rides the snapshot envelope alongside the catalog.
394    pub(crate) users: UserStore,
395    /// v6.1.2 logical-replication publication catalog. Empty until
396    /// `CREATE PUBLICATION` runs. Persistence rides the v3 envelope
397    /// trailer (see `build_envelope`).
398    publications: publications::Publications,
399    /// v6.1.4 logical-replication subscription catalog. Empty until
400    /// `CREATE SUBSCRIPTION` runs. Persistence rides the v4 envelope
401    /// trailer.
402    subscriptions: subscriptions::Subscriptions,
403    /// v6.2.0 — per-column statistics for the cost-based optimizer.
404    /// Populated by `ANALYZE`; queried via `spg_statistic` virtual
405    /// table. Persistence rides the v5 envelope trailer.
406    statistics: statistics::Statistics,
407    /// v6.3.0 — engine-level plan cache. Caches the post-`prepare()`
408    /// `Statement` keyed on SQL text. In-memory only — does NOT ride
409    /// the snapshot envelope (rebuilt on demand after restart).
410    plan_cache: plan_cache::PlanCache,
411    /// v6.5.1 — per-distinct-SQL execution stats. In-memory only,
412    /// surfaced via `spg_stat_query` virtual table. Updated by the
413    /// `execute_*` paths after a successful execute.
414    query_stats: query_stats::QueryStats,
415    /// v6.5.2 — connection-state provider callback. spg-server
416    /// registers a function at startup that snapshots its
417    /// per-pgwire-connection registry into `ActivityRow`s; engine
418    /// reads through it on every `SELECT * FROM spg_stat_activity`.
419    /// `None` ⇒ no-data (returns empty rows; matches the no_std
420    /// embedded callers that don't run pgwire).
421    activity_provider: Option<ActivityProvider>,
422    /// v6.5.3 — audit-chain provider + verifier. Same pattern as
423    /// activity_provider: spg-server registers both at startup;
424    /// engine reads through on `SELECT * FROM spg_audit_chain` and
425    /// `SELECT * FROM spg_audit_verify`. `None` ⇒ no-data.
426    audit_chain_provider: Option<AuditChainProvider>,
427    audit_verifier: Option<AuditVerifier>,
428    /// v6.5.6 — slow-query log threshold in microseconds. When set,
429    /// every successful execute whose elapsed exceeds the threshold
430    /// gets fed to the registered slow-query log callback (so
431    /// spg-server can emit a structured log line). Default `None`
432    /// = no slow-query logging.
433    slow_query_threshold_us: Option<u64>,
434    slow_query_logger: Option<SlowQueryLogger>,
435    /// v7.12.1 — session parameters set via `SET <name> = <value>`.
436    /// Only `default_text_search_config` is consumed by the engine
437    /// today (the FTS function dispatcher reads it when
438    /// `to_tsvector(text)` is called without an explicit config).
439    /// All other names are accepted + recorded so PG-dump output
440    /// loads, but have no behavioural effect.
441    pub(crate) session_params: BTreeMap<String, String>,
442    /// v7.12.7 — depth counter for trigger-emitted embedded SQL.
443    /// Each time the engine executes a `DeferredEmbeddedStmt` it
444    /// increments this; the recursive `execute_stmt_with_cancel`
445    /// inside that path checks against [`MAX_TRIGGER_RECURSION`]
446    /// to bound runaway cascades (trigger A's UPDATE on table B
447    /// fires trigger B which UPDATEs table A which fires trigger
448    /// A again…). Reset to 0 once the original DML returns.
449    trigger_recursion_depth: u32,
450    /// v7.14.0 — when `SET FOREIGN_KEY_CHECKS=0` is in effect
451    /// (mysqldump preamble), the FK existence + arity check at
452    /// CREATE TABLE time is deferred. FKs referencing a
453    /// not-yet-existing parent land in `pending_foreign_keys`
454    /// keyed by child table; `SET FOREIGN_KEY_CHECKS=1` drains
455    /// the queue and resolves each FK against the now-complete
456    /// catalog. Empty by default; the queue is drained on every
457    /// `RESET ALL` too.
458    foreign_key_checks: bool,
459    /// v7.16.2 — true on the temp Engine an outer
460    /// `exec_select_with_meta_views` builds, telling that
461    /// temp engine "stop short-circuiting into the meta-view
462    /// path — your catalog already has the materialised
463    /// tables; just run the regular SELECT." Without this we'd
464    /// infinite-loop since the meta-view name (e.g.
465    /// `__spg_info_columns`) still triggers
466    /// `select_references_meta_view`.
467    meta_views_materialised: bool,
468    pending_foreign_keys: Vec<(alloc::string::String, spg_sql::ast::ForeignKeyConstraint)>,
469    /// v7.34 (crash-recovery P0 #2) — row-level redo capture. When the
470    /// embedding layer turns this on (persistence enabled), each mutating
471    /// `execute` records the physical [`RowChange`]s it applied; the
472    /// engine drains them into `last_redo` on success, and the embedded
473    /// layer reads them via [`Engine::take_redo`] to write the WAL in
474    /// place of the SQL text. Off (default) = zero capture overhead.
475    redo_capture: bool,
476    /// Redo captured by the most recent successful mutating `execute`,
477    /// awaiting drain by the embedding layer. Cleared on each capture.
478    last_redo: Vec<RowChange>,
479}
480
481/// v7.12.7 — hard cap on nested trigger-emitted embedded SQL
482/// fires. 16 deep is well past anything a normal trigger graph
483/// uses while still preventing infinite-loop wedging.
484const MAX_TRIGGER_RECURSION: u32 = 16;
485
486/// v6.5.6 — callback signature for slow-query log emission. Called
487/// with `(sql, elapsed_us)` once per successful execute that crosses
488/// the threshold.
489pub type SlowQueryLogger = fn(&str, u64);
490
491/// v6.5.2 — one row of `spg_stat_activity`. Engine-public so
492/// spg-server can construct rows without re-exporting internal
493/// dispatch types.
494#[derive(Debug, Clone)]
495pub struct ActivityRow {
496    pub pid: u32,
497    pub user: String,
498    pub started_at_us: i64,
499    pub current_sql: String,
500    pub wait_event: String,
501    pub elapsed_us: i64,
502    pub in_transaction: bool,
503    /// v7.17 Phase 2.4 — startup-param `application_name` (or the
504    /// last value the client sent via `SET application_name = '...'`).
505    /// Empty when the client never declared one.
506    pub application_name: String,
507}
508
509/// v6.5.2 — provider callback type. Fresh snapshot returned each
510/// call; engine doesn't cache the slice.
511pub type ActivityProvider = fn() -> Vec<ActivityRow>;
512
513/// v6.5.3 — one row of `spg_audit_chain`. Engine-public so
514/// spg-server can construct rows directly from `AuditEntry`.
515#[derive(Debug, Clone)]
516pub struct AuditRow {
517    pub seq: i64,
518    pub ts_ms: i64,
519    pub prev_hash_hex: String,
520    pub entry_hash_hex: String,
521    pub sql: String,
522}
523
524/// v6.5.3 — chain-table provider + verifier. spg-server registers
525/// fn pointers that snapshot / verify the audit log. `verify`
526/// returns `(verified_count, broken_at_seq)` — `broken_at_seq` is
527/// `-1` on a clean chain.
528pub type AuditChainProvider = fn() -> Vec<AuditRow>;
529pub type AuditVerifier = fn() -> (i64, i64);
530
531impl Engine {
532    pub fn new() -> Self {
533        Self {
534            catalog: Catalog::new(),
535            tx_catalogs: BTreeMap::new(),
536            current_tx: None,
537            backslash_escapes: false,
538            next_tx_id: 1,
539            clock: None,
540            salt_fn: None,
541            max_query_rows: None,
542            max_query_bytes: None,
543            users: UserStore::new(),
544            publications: publications::Publications::new(),
545            subscriptions: subscriptions::Subscriptions::new(),
546            statistics: statistics::Statistics::new(),
547            plan_cache: plan_cache::PlanCache::new(),
548            query_stats: query_stats::QueryStats::new(),
549            activity_provider: None,
550            audit_chain_provider: None,
551            audit_verifier: None,
552            slow_query_threshold_us: None,
553            slow_query_logger: None,
554            session_params: BTreeMap::new(),
555            trigger_recursion_depth: 0,
556            foreign_key_checks: true,
557            meta_views_materialised: false,
558            pending_foreign_keys: Vec::new(),
559            redo_capture: false,
560            last_redo: Vec::new(),
561        }
562    }
563
564    /// v7.11.0 — clone the engine's committed catalog + read-time
565    /// state into a frozen `CatalogSnapshot`. Cheap (`Catalog` is
566    /// backed by `PersistentVec`; cloning is O(log n) per table).
567    /// Subsequent writes to this engine are invisible to the
568    /// snapshot; the snapshot is self-contained and can be moved
569    /// to another thread for concurrent `execute_readonly_on_snapshot`
570    /// calls. The basis for [`AsyncReadHandle`] in spg-embedded-tokio
571    /// and any other read-fanout pattern.
572    #[must_use]
573    pub fn clone_snapshot(&self) -> CatalogSnapshot {
574        CatalogSnapshot {
575            catalog: self.active_catalog().clone(),
576            statistics: self.statistics.clone(),
577            clock: self.clock,
578            max_query_rows: self.max_query_rows,
579        }
580    }
581
582    /// Construct an engine restored from a previously-snapshotted catalog
583    /// (see `snapshot()`).
584    pub fn restore(catalog: Catalog) -> Self {
585        Self {
586            catalog,
587            tx_catalogs: BTreeMap::new(),
588            current_tx: None,
589            backslash_escapes: false,
590            next_tx_id: 1,
591            clock: None,
592            salt_fn: None,
593            max_query_rows: None,
594            max_query_bytes: None,
595            users: UserStore::new(),
596            publications: publications::Publications::new(),
597            subscriptions: subscriptions::Subscriptions::new(),
598            statistics: statistics::Statistics::new(),
599            plan_cache: plan_cache::PlanCache::new(),
600            query_stats: query_stats::QueryStats::new(),
601            activity_provider: None,
602            audit_chain_provider: None,
603            audit_verifier: None,
604            slow_query_threshold_us: None,
605            slow_query_logger: None,
606            session_params: BTreeMap::new(),
607            trigger_recursion_depth: 0,
608            foreign_key_checks: true,
609            meta_views_materialised: false,
610            pending_foreign_keys: Vec::new(),
611            redo_capture: false,
612            last_redo: Vec::new(),
613        }
614    }
615
616    /// Restore an engine + user table from a v4.1 envelope produced
617    /// by `snapshot_with_users()`. Falls back to plain catalog-only
618    /// restore if the envelope magic isn't present (so v3.x snapshot
619    /// files still load). v6.1.2 adds the optional publications
620    /// trailer (envelope v3); a v1/v2 envelope deserialises to an
621    /// empty publication table.
622    pub fn restore_envelope(buf: &[u8]) -> Result<Self, EngineError> {
623        match split_envelope(buf) {
624            EnvelopeParse::Pair {
625                catalog: catalog_bytes,
626                users: user_bytes,
627                publications: pub_bytes,
628                subscriptions: sub_bytes,
629                statistics: stats_bytes,
630            } => {
631                let catalog = Catalog::deserialize(catalog_bytes).map_err(EngineError::Storage)?;
632                let users = users::deserialize_users(user_bytes)
633                    .map_err(|e| EngineError::Unsupported(alloc::format!("users restore: {e}")))?;
634                let publications = match pub_bytes {
635                    Some(b) => publications::Publications::deserialize(b).map_err(|e| {
636                        EngineError::Unsupported(alloc::format!("publications restore: {e:?}"))
637                    })?,
638                    None => publications::Publications::new(),
639                };
640                let subscriptions = match sub_bytes {
641                    Some(b) => subscriptions::Subscriptions::deserialize(b).map_err(|e| {
642                        EngineError::Unsupported(alloc::format!("subscriptions restore: {e:?}"))
643                    })?,
644                    None => subscriptions::Subscriptions::new(),
645                };
646                let statistics = match stats_bytes {
647                    Some(b) => statistics::Statistics::deserialize(b).map_err(|e| {
648                        EngineError::Unsupported(alloc::format!("statistics restore: {e:?}"))
649                    })?,
650                    None => statistics::Statistics::new(),
651                };
652                Ok(Self {
653                    catalog,
654                    tx_catalogs: BTreeMap::new(),
655                    current_tx: None,
656                    backslash_escapes: false,
657                    next_tx_id: 1,
658                    clock: None,
659                    salt_fn: None,
660                    max_query_rows: None,
661                    max_query_bytes: None,
662                    users,
663                    publications,
664                    subscriptions,
665                    statistics,
666                    plan_cache: plan_cache::PlanCache::new(),
667                    query_stats: query_stats::QueryStats::new(),
668                    activity_provider: None,
669                    audit_chain_provider: None,
670                    audit_verifier: None,
671                    slow_query_threshold_us: None,
672                    slow_query_logger: None,
673                    session_params: BTreeMap::new(),
674                    trigger_recursion_depth: 0,
675                    foreign_key_checks: true,
676                    meta_views_materialised: false,
677                    pending_foreign_keys: Vec::new(),
678                    redo_capture: false,
679                    last_redo: Vec::new(),
680                })
681            }
682            EnvelopeParse::CrcMismatch { expected, computed } => {
683                Err(EngineError::Storage(StorageError::Corrupt(alloc::format!(
684                    "snapshot envelope CRC32 mismatch (expected={expected:#010x}, computed={computed:#010x})"
685                ))))
686            }
687            EnvelopeParse::Bare => {
688                let catalog = Catalog::deserialize(buf).map_err(EngineError::Storage)?;
689                Ok(Self::restore(catalog))
690            }
691        }
692    }
693
694    pub const fn users(&self) -> &UserStore {
695        &self.users
696    }
697
698    /// Builder: attach a wall clock so `NOW()` / `CURRENT_TIMESTAMP` /
699    /// `CURRENT_DATE` evaluate to a real value instead of erroring out.
700    #[must_use]
701    pub const fn with_clock(mut self, clock: ClockFn) -> Self {
702        self.clock = Some(clock);
703        self
704    }
705
706    /// Builder: attach an OS-backed RNG for per-user password salts.
707    /// The host (`spg-server`) typically wires this to `/dev/urandom`.
708    #[must_use]
709    pub const fn with_salt_fn(mut self, f: SaltFn) -> Self {
710        self.salt_fn = Some(f);
711        self
712    }
713
714    /// Builder: cap the number of rows a single SELECT may return.
715    /// Exceeding the cap raises `EngineError::RowLimitExceeded` —
716    /// the bound is checked inside the executor so a runaway
717    /// catalog scan can't allocate millions of rows before the
718    /// server gets a chance to reject the result.
719    #[must_use]
720    pub const fn with_max_query_rows(mut self, n: usize) -> Self {
721        self.max_query_rows = Some(n);
722        self
723    }
724
725    /// Builder: cap the approximate heap bytes a single SELECT's
726    /// join/filter materialisation may hold. Exceeding the cap
727    /// raises `EngineError::QueryBytesExceeded`. Rows are the wrong
728    /// unit when one row carries a multi-MB body (mailrs round-26:
729    /// 1000-row batches of full mail text walked a 15 GiB host into
730    /// reclaim livelock without ever tripping a row ceiling).
731    #[must_use]
732    pub const fn with_max_query_bytes(mut self, n: usize) -> Self {
733        self.max_query_bytes = Some(n);
734        self
735    }
736
737    /// The *committed* catalog. Note: during a transaction this returns the
738    /// pre-TX state — `SELECT` inside a TX goes through `execute()` and reads
739    /// the shadow. Tests that inspect outside-TX state should use this.
740    pub const fn catalog(&self) -> &Catalog {
741        &self.catalog
742    }
743
744    /// Capture a frozen view of the committed engine state. Catalog
745    /// is O(1) Arc bump; trailers are cheap clones. Decouples "capture"
746    /// (needs &Engine) from "serialize" (CPU, no engine access) — the
747    /// seam the background-checkpoint worker rides in CoW-2.
748    pub fn snapshot_data(&self) -> EngineSnapshot {
749        EngineSnapshot {
750            catalog: self.catalog.clone(),
751            users: self.users.clone(),
752            publications: self.publications.clone(),
753            subscriptions: self.subscriptions.clone(),
754            statistics: self.statistics.clone(),
755        }
756    }
757
758    /// Serialize the *committed* catalog to bytes. v0.6 was full-snapshot; v0.9
759    /// adds the rule that an open TX's shadow is never snapshotted — only the
760    /// post-COMMIT state is persisted. v4.1 wraps the catalog in an envelope
761    /// when there are users to persist; an empty user table snapshots as the
762    /// bare catalog format (backwards-compat with v3.x readers). v6.1.2
763    /// adds publications to the envelope condition: either non-empty
764    /// users OR non-empty publications now triggers the envelope path.
765    pub fn snapshot(&self) -> Vec<u8> {
766        self.snapshot_data().serialize()
767    }
768
769    /// True when at least one TX slot is in flight. v4.41.1 runtime
770    /// invariant: at most one slot active at a time (dispatch holds
771    /// `engine.write()` across the entire wrap). v4.42 will let this
772    /// return true with multiple slots concurrently.
773    pub fn in_transaction(&self) -> bool {
774        !self.tx_catalogs.is_empty()
775    }
776
777    /// v4.41.1 allocate a fresh TX handle. Used by spg-server dispatch
778    /// to scope each implicit-wrap BEGIN..stmt..COMMIT to its own slot
779    /// in `tx_catalogs`. v4.42 — the commit-barrier leader allocates
780    /// one of these per task in its group, runs `BEGIN`+sql+`COMMIT`
781    /// sequentially under a single `engine.write()` so each task's
782    /// mutations accumulate into shared state, then either keeps the
783    /// accumulated state (fsync OK) or restores the pre-image via
784    /// `replace_catalog` (fsync err).
785    pub fn alloc_tx_id(&mut self) -> TxId {
786        let id = TxId(self.next_tx_id);
787        self.next_tx_id = self.next_tx_id.saturating_add(1);
788        id
789    }
790
791    /// v4.42 — atomically replace the live catalog. Used by the
792    /// commit-barrier leader to roll back a group whose batched
793    /// fsync failed: the leader snapshots `engine.catalog().clone()`
794    /// (O(1) Arc bump after the v4.39/v4.40 persistent migration)
795    /// at group start, sequentially applies each task's BEGIN+sql+
796    /// COMMIT under the same write lock to accumulate mutations
797    /// into shared state, batches the WAL bytes, fsyncs once, and
798    /// on failure calls this with the pre-image to undo every
799    /// task in the group at once.
800    ///
801    /// **Does NOT touch `tx_catalogs` / `current_tx`.** Any
802    /// explicit-TX slot from a concurrent client (created via the
803    /// legacy `IMPLICIT_TX`-less dispatch path or via the future
804    /// MVCC-readers v5+ work) has its own snapshot baked into the
805    /// slot — restoring `self.catalog` to the pre-image leaves
806    /// those slots untouched, exactly as they were when the leader
807    /// took the lock. The leader's own implicit-TX slots are all
808    /// already discarded (`exec_commit` removed them as each
809    /// task's COMMIT ran) by the time this is reached.
810    pub fn replace_catalog(&mut self, catalog: Catalog) {
811        self.catalog = catalog;
812    }
813
814    /// v6.7.0 — public shim around `Catalog::freeze_oldest_to_cold`
815    /// so tests + the spg-server freezer can drive a freeze without
816    /// reaching into the private `active_catalog_mut`. v6.7.4
817    /// parallel freezer will build on this surface.
818    ///
819    /// Marks the table's cached `cold_row_count` stale because the
820    /// freeze added cold locators that ANALYZE hasn't yet refreshed.
821    pub fn freeze_oldest_to_cold(
822        &mut self,
823        table_name: &str,
824        index_name: &str,
825        max_rows: usize,
826    ) -> Result<spg_storage::FreezeReport, EngineError> {
827        let report = self
828            .active_catalog_mut()
829            .freeze_oldest_to_cold(table_name, index_name, max_rows)
830            .map_err(EngineError::Storage)?;
831        if let Some(t) = self.active_catalog_mut().get_mut(table_name) {
832            t.mark_cold_row_count_stale();
833        }
834        Ok(report)
835    }
836
837    /// v6.7.5 — public shim used by the spg-server follower's
838    /// segment-forwarding receiver. Registers a cold-tier segment
839    /// at a specific id (the master's id, as transmitted on the
840    /// wire) so the follower's BTree-Cold locators stay byte-
841    /// identical with the master's. Wraps
842    /// `Catalog::load_segment_bytes_at` under the standard
843    /// clone-mutate-replace pattern.
844    ///
845    /// Returns `Ok(())` on success **and** on the "slot already
846    /// occupied" case — a follower mid-reconnect may receive a
847    /// segment chunk for a segment_id it already has on disk
848    /// (forwarded last session); the caller should treat that
849    /// path as a no-op rather than a fatal error.
850    pub fn receive_cold_segment(
851        &mut self,
852        segment_id: u32,
853        bytes: Vec<u8>,
854    ) -> Result<(), EngineError> {
855        let mut new_cat = self.catalog.clone();
856        match new_cat.load_segment_bytes_at(segment_id, bytes) {
857            Ok(()) => {
858                self.replace_catalog(new_cat);
859                Ok(())
860            }
861            Err(StorageError::Corrupt(msg)) if msg.contains("already occupied") => Ok(()),
862            Err(e) => Err(EngineError::Storage(e)),
863        }
864    }
865
866    pub(crate) fn active_catalog(&self) -> &Catalog {
867        match self.current_tx {
868            Some(t) => self
869                .tx_catalogs
870                .get(&t)
871                .map_or(&self.catalog, |s| &s.catalog),
872            None => &self.catalog,
873        }
874    }
875
876    fn active_catalog_mut(&mut self) -> &mut Catalog {
877        let tx = self.current_tx;
878        match tx {
879            Some(t) => match self.tx_catalogs.get_mut(&t) {
880                Some(s) => &mut s.catalog,
881                None => &mut self.catalog,
882            },
883            None => &mut self.catalog,
884        }
885    }
886
887    /// v7.34 (crash-recovery P0 #2) — turn row-level redo capture on/off.
888    /// The embedding layer enables it when persistence is on so each
889    /// mutating `execute` records the physical [`RowChange`]s it applied
890    /// (drained via [`Engine::take_redo`]). Off = zero capture overhead.
891    pub fn set_redo_capture(&mut self, on: bool) {
892        self.redo_capture = on;
893    }
894
895    /// v7.34 — take the redo captured by the most recent successful
896    /// mutating `execute` (empty when capture is off, the statement was a
897    /// read, or it changed nothing). The embedding layer writes these to
898    /// the WAL in place of the SQL text.
899    pub fn take_redo(&mut self) -> Vec<RowChange> {
900        core::mem::take(&mut self.last_redo)
901    }
902
903    /// v7.34 (crash-recovery P0 #2) — replay a row-level redo log onto the
904    /// committed catalog (the row-level WAL recovery primitive: apply the
905    /// captured physical changes from a checkpoint baseline, in place of
906    /// re-executing the SQL). Trusts the log — no uniqueness/FK/parse.
907    pub fn apply_redo(&mut self, changes: &[RowChange]) -> Result<(), EngineError> {
908        self.catalog
909            .apply_redo(changes)
910            .map_err(EngineError::Storage)
911    }
912
913    /// Read-only execute path. Succeeds for `SELECT` / `SHOW TABLES`
914    /// / `SHOW COLUMNS`; returns `EngineError::WriteRequired` for
915    /// every other statement, so the caller can fall through to the
916    /// `&mut self` `execute` path under a write lock. Engine state is
917    /// not mutated even on the success path (`rewrite_clock_calls`
918    /// and `resolve_order_by_position` both mutate the locally-owned
919    /// AST, not `self`).
920    ///
921    /// v4.2: cap result-set size. Applied after the executor
922    /// materialises rows but before they leave the engine — wrapping
923    /// every Rows-returning exec_* function would scatter the check.
924    ///
925    /// v7.31 (memory campaign, bucket A) — the same choke point now
926    /// also enforces the BYTE budget on the final result set, so
927    /// single-table and aggregate paths (which don't route through
928    /// the join materialiser's incremental accounting) still cannot
929    /// hand the host an unbounded result. Intermediate single-table
930    /// clones are the 7.31.x follow-up (design doc, bucket A).
931    fn enforce_row_limit(
932        &self,
933        result: Result<QueryResult, EngineError>,
934    ) -> Result<QueryResult, EngineError> {
935        if let Ok(QueryResult::Rows { rows, .. }) = &result {
936            if let Some(cap) = self.max_query_rows
937                && rows.len() > cap
938            {
939                return Err(EngineError::RowLimitExceeded(cap));
940            }
941            if let Some(byte_cap) = self.max_query_bytes
942                && approx_rows_bytes(rows) > byte_cap
943            {
944                return Err(EngineError::QueryBytesExceeded(byte_cap));
945            }
946        }
947        result
948    }
949}
950
951/// v7.31 (memory campaign — ceiling-first / never-die, design v1) —
952/// per-table slice of the engine's resident-memory accounting.
953/// `hot_encoded_bytes` is the storage layer's maintained meter (what
954/// the rows encode to); `approx_resident_bytes` is what they COST in
955/// RAM (per-cell enum slots + heap payloads via `approx_row_bytes`)
956/// — the gap between the two is the representation multiplier the
957/// round-26 report measured at ~11× end-to-end.
958#[derive(Debug, Clone)]
959pub struct TableMemoryStats {
960    pub name: String,
961    pub hot_rows: u64,
962    /// Cached cold-row count (refreshed by ANALYZE — see
963    /// `Table::cold_row_count`'s staleness contract).
964    pub cold_rows: u64,
965    pub hot_encoded_bytes: u64,
966    pub approx_resident_bytes: u64,
967    pub index_count: u64,
968    /// v7.31 C2 — sum of `IndexKind::approx_resident_bytes()` over the
969    /// table's indices: every variant (BTree / NSW / BRIN / GIN family)
970    /// walks its own structure, so the GIN posting lists and NSW layer
971    /// adjacency that dominate text/vector tables are counted honestly
972    /// instead of the old flat-token estimate.
973    pub approx_index_bytes: u64,
974}
975
976/// v7.31 — whole-engine memory snapshot: the polling form of the
977/// round-26 ask-4 watermark signal. Hosts compare
978/// `total_approx_resident_bytes` (+ their own WAL/file accounting)
979/// against their deployment ceiling and shed/shrink before the
980/// kernel does it for them.
981#[derive(Debug, Clone)]
982pub struct MemoryStats {
983    pub tables: Vec<TableMemoryStats>,
984    pub total_hot_encoded_bytes: u64,
985    pub total_approx_resident_bytes: u64,
986    pub total_approx_index_bytes: u64,
987    /// The active per-query materialisation budget (bucket A), so a
988    /// monitoring host sees ceiling and usage through one call.
989    pub max_query_bytes: Option<usize>,
990    /// v7.31 C2 — bucket D: live WAL bytes (active chunk + buffered,
991    /// uncheckpointed). `None` from the engine itself — it has no WAL;
992    /// the durable hosts (embed `Database`, server) fill it in from
993    /// their own WAL accounting. `Some(0)` means "host has a WAL and
994    /// it is empty"; `None` means "no WAL on this path" (in-memory).
995    pub wal_bytes: Option<u64>,
996}
997
998/// v6.2.0 — true for engine-managed catalog tables that the bare
999/// `ANALYZE` (no target) should skip. v6.2.0 has no internal
1000/// tables yet (publications / subscriptions / users / statistics
1001/// all live as engine fields, not catalog tables), so this is a
1002/// reserved future-proofing hook — every existing user table is
1003/// analysed.
1004const fn is_internal_table_name(_name: &str) -> bool {
1005    false
1006}
1007
1008#[cfg(test)]
1009mod tests;