Skip to main content

spg_engine/
lib.rs

1//! SPG execution engine — v0.3 wires the SQL front-end to the in-memory
2//! storage layer. Implements `CREATE TABLE`, single-row `INSERT VALUES`, and
3//! `SELECT * FROM <table>` (no WHERE yet — that lands in v0.4 alongside
4//! expression evaluation against rows).
5#![no_std]
6
7extern crate alloc;
8
9pub mod aggregate;
10mod bytebudget;
11mod cancel;
12mod clock;
13mod constraints;
14mod conversions;
15pub mod copy;
16mod ddl;
17pub mod describe;
18mod dml;
19mod envelope;
20pub mod eval;
21mod execute;
22mod explain;
23mod expr_analysis;
24pub mod fts;
25mod index_access;
26mod join;
27mod joinfold;
28pub mod json;
29mod maintenance;
30pub mod memoize;
31mod numeric;
32mod orderby;
33pub mod plan_cache;
34mod plpgsql;
35pub mod publications;
36pub mod query_stats;
37mod readonly;
38pub mod reorder;
39mod select;
40pub mod selectivity;
41mod sequence;
42mod session;
43mod show;
44mod spg_admin;
45pub mod statistics;
46mod subquery;
47pub mod subscriptions;
48mod substitute;
49mod system_catalog;
50mod table_access;
51mod transaction;
52pub mod triggers;
53pub mod users;
54mod window;
55
56pub use crate::users::{Role, ScramSecrets, UserError, UserStore};
57pub use cancel::{CancelToken, MonotonicNowFn};
58pub use execute::StreamItem;
59
60use bytebudget::*;
61pub(crate) use clock::{rewrite_clock_calls, value_to_literal};
62use constraints::*;
63use conversions::*;
64pub use conversions::{
65    format_bigint_2d_text_pub, format_hstore_text, format_int_2d_text_pub, format_range_text,
66    format_text_2d_text_pub,
67};
68pub(crate) use ddl::{
69    canonicalize_set_value, enforce_enum_label, eval_runtime_default_free,
70    resolve_column_default_free,
71};
72pub(crate) use envelope::{EnvelopeParse, build_envelope, split_envelope};
73use expr_analysis::*;
74use index_access::*;
75pub(crate) use orderby::{
76    apply_offset_and_limit, apply_offset_and_limit_tagged, build_order_keys, canonical_value_repr,
77    expand_group_by_all, order_by_value_cmp, partial_sort_tagged, render_histogram_bounds,
78    resolve_order_by_position, sort_by_keys, sort_values_for_histogram, value_cmp, value_to_f64,
79};
80pub(crate) use select::{build_projection, infer_column_types, value_to_order_key};
81pub(crate) use show::render_create_table;
82pub(crate) use subquery::{
83    build_in_list_set, collect_scalar_subqueries, expr_has_subquery, expr_tree_has_subquery,
84};
85pub use substitute::substitute_placeholders;
86use substitute::*;
87use system_catalog::*;
88use window::*;
89
90use alloc::collections::BTreeMap;
91use alloc::string::String;
92use alloc::vec::Vec;
93use core::fmt;
94
95// v7.16.0 — re-export the parsed-statement AST so downstream
96// crates (spg-embedded → spg-sqlx) don't need a direct dep on
97// spg-sql for the prepare/bind handle.
98pub use spg_sql::ast::Statement as ParsedStatement;
99use spg_sql::parser::ParseError;
100use spg_storage::{Catalog, ColumnSchema, Row, RowChange, StorageError};
101
102use crate::eval::EvalError;
103
104/// Result of executing one statement.
105#[derive(Debug, Clone, PartialEq)]
106#[non_exhaustive]
107pub enum QueryResult {
108    /// DDL or DML succeeded.
109    ///
110    /// `affected` is the row count for `INSERT` and 0 elsewhere.
111    /// `modified_catalog` tells the server whether this statement
112    /// caused the *committed* catalog to change — it's the signal to
113    /// snapshot/audit. False for `BEGIN`/`ROLLBACK`, false for writeful
114    /// statements executed inside a transaction (those only touch the
115    /// shadow), and true for `COMMIT` and for writes outside a TX.
116    CommandOk {
117        affected: usize,
118        modified_catalog: bool,
119    },
120    /// `SELECT` returned a (possibly empty) row set.
121    Rows {
122        columns: Vec<ColumnSchema>,
123        rows: Vec<Row>,
124    },
125}
126
127/// All errors the engine can return.
128///
129/// Marked `#[non_exhaustive]` from v7.5.0 onward: external `match`
130/// must include a `_` arm so new variants in subsequent v7.x releases
131/// are not breaking changes.
132#[derive(Debug, Clone, PartialEq)]
133#[non_exhaustive]
134pub enum EngineError {
135    Parse(ParseError),
136    Storage(StorageError),
137    Eval(EvalError),
138    /// Front-end accepted a construct that the v0.x executor doesn't support.
139    Unsupported(String),
140    /// `BEGIN` while another transaction is already open.
141    TransactionAlreadyOpen,
142    /// `COMMIT` / `ROLLBACK` with no active transaction.
143    NoActiveTransaction,
144    /// v4.0 sentinel: `execute_readonly` got a statement that
145    /// mutates engine state (INSERT / CREATE / BEGIN / COMMIT / …).
146    /// The caller should retake the write lock and dispatch through
147    /// `execute(&mut self)` instead.
148    WriteRequired,
149    /// v4.2: a SELECT would have returned more rows than the
150    /// configured `max_query_rows` cap. Carries the cap.
151    RowLimitExceeded(usize),
152    /// v7.30.3 (mailrs round-26): a SELECT's join/filter
153    /// materialisation would have held more (approximate) heap
154    /// bytes than the configured `max_query_bytes` cap. The row
155    /// cap above counts rows; this counts bytes, because one row
156    /// can be a multi-MB mail body — 1000 fat rows pressure the
157    /// host long before any row ceiling trips. Carries the cap.
158    QueryBytesExceeded(usize),
159    /// v4.5: cooperative cancellation — the host (server's
160    /// per-query watchdog) set the cancel flag while a long-running
161    /// SELECT / UPDATE / DELETE was scanning rows. The partial work
162    /// is discarded; the caller should surface this as a timeout
163    /// to the client.
164    Cancelled,
165}
166
167impl fmt::Display for EngineError {
168    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
169        match self {
170            Self::Parse(e) => write!(f, "parse: {e}"),
171            Self::Storage(e) => write!(f, "storage: {e}"),
172            Self::Eval(e) => write!(f, "eval: {e}"),
173            Self::Unsupported(s) => write!(f, "unsupported: {s}"),
174            Self::TransactionAlreadyOpen => f.write_str("a transaction is already open"),
175            Self::NoActiveTransaction => f.write_str("no active transaction"),
176            Self::WriteRequired => {
177                f.write_str("statement requires a write lock (use execute, not execute_readonly)")
178            }
179            Self::RowLimitExceeded(n) => {
180                write!(f, "query exceeded max_query_rows={n}")
181            }
182            Self::QueryBytesExceeded(n) => {
183                write!(
184                    f,
185                    "query materialisation exceeded max_query_bytes={n} (set SPG_MAX_QUERY_BYTES to raise, 0 to disable)"
186                )
187            }
188            Self::Cancelled => f.write_str("query cancelled (timeout or client request)"),
189        }
190    }
191}
192
193impl From<ParseError> for EngineError {
194    fn from(e: ParseError) -> Self {
195        Self::Parse(e)
196    }
197}
198impl From<StorageError> for EngineError {
199    fn from(e: StorageError) -> Self {
200        Self::Storage(e)
201    }
202}
203impl From<EvalError> for EngineError {
204    fn from(e: EvalError) -> Self {
205        Self::Eval(e)
206    }
207}
208
209/// The execution engine. Holds the catalog and (later) other server-scope
210/// state. `Engine::new()` is intentionally cheap so callers can construct one
211/// per database, per test.
212/// Function pointer that returns "now" as microseconds since Unix
213/// epoch. The engine is `no_std`, so it can't reach for `std::time`
214/// itself — callers (`spg-server`, the sqllogictest runner) inject a
215/// concrete implementation. `None` means `NOW()` / `CURRENT_*` raise
216/// `Unsupported`.
217pub type ClockFn = fn() -> i64;
218
219/// Function pointer that produces 16 cryptographically random bytes.
220/// Like `ClockFn`, the engine is `no_std` and can't reach for /dev/urandom
221/// itself — host (`spg-server`) injects an OS-backed source. `None`
222/// means SQL-driven `CREATE USER` falls back to a deterministic salt
223/// derived from the username (acceptable in tests; the server always
224/// installs a real RNG so production paths never see this).
225pub type SaltFn = fn() -> [u8; 16];
226
227/// v4.5 cooperative cancellation token. A long-running SELECT /
228/// UPDATE / DELETE checks `is_cancelled` at row-loop checkpoints
229/// and bails with `EngineError::Cancelled`. The host
230/// (`spg-server`) creates an `AtomicBool` per query, spawns a
231/// watchdog thread that sets it after `SPG_QUERY_TIMEOUT_MS`,
232/// and passes it via `execute_with_cancel` / `execute_readonly_with_cancel`.
233///
234/// `CancelToken::none()` is a no-op — used by the legacy `execute`
235/// and `execute_readonly` entry points so existing callers don't
236/// change.
237/// v4.41.1 opaque transaction handle. Returned by `Engine::alloc_tx_id`,
238/// threaded through `Engine::execute_in` so dispatch can identify which
239/// in-flight TX a statement belongs to. `IMPLICIT_TX` is the reserved
240/// slot every legacy caller — engine self-tests, spg-cli, spg-embedded,
241/// startup replay — implicitly uses through the unchanged
242/// `Engine::execute(sql)` API. v4.41.1 keeps at most one active slot at
243/// runtime (dispatch holds `engine.write()` across the wrap, same as
244/// v4.34); the map shape is here to let v4.42 turn on N in-flight
245/// implicit TXs without reshuffling the engine internals.
246#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
247pub struct TxId(pub u64);
248
249/// Reserved slot used by `Engine::execute(sql)` — the legacy single-
250/// global-shadow path. New `alloc_tx_id` handles start at 1.
251pub const IMPLICIT_TX: TxId = TxId(0);
252
253/// v6.7.3 — default segment-size threshold used by `COMPACT COLD
254/// SEGMENTS` when no explicit target is supplied. Segments whose
255/// `OwnedSegment::bytes().len()` is **strictly** less than this
256/// value are eligible to merge. spg-server reads
257/// `SPG_COMPACTION_TARGET_SEGMENT_BYTES` to override.
258pub const COMPACTION_TARGET_DEFAULT_BYTES: u64 = 4 * 1024 * 1024;
259
260/// Per-slot transaction state. Held inside `tx_catalogs[tx_id]` for the
261/// lifetime of a BEGIN..COMMIT (or BEGIN..ROLLBACK) window. Drops when
262/// the TX commits (its `catalog` is moved over `Engine.catalog`) or
263/// rolls back (slot removed, catalog discarded).
264#[derive(Debug, Default, Clone)]
265struct TxState {
266    /// The TX's shadow copy of the catalog. Started as a clone of
267    /// `Engine.catalog` at BEGIN time; writes flow into it; COMMIT
268    /// installs it over `Engine.catalog`. `Catalog::clone()` is O(1)
269    /// since v4.40 (`PersistentVec` rows + `PersistentBTreeMap` indices).
270    catalog: Catalog,
271    /// Per-TX savepoint stack. Each entry pairs the savepoint name with
272    /// a clone of `catalog` at the moment `SAVEPOINT <name>` fired.
273    /// `ROLLBACK TO <name>` restores from the entry and pops everything
274    /// after it; `RELEASE <name>` discards the entry and everything
275    /// after; COMMIT/ROLLBACK clears the whole stack.
276    savepoints: Vec<(String, Catalog)>,
277}
278
279/// v7.11.0 — frozen read-only view of the engine's committed state.
280/// Constructed via [`Engine::clone_snapshot`]. Holds clones of the
281/// catalog, statistics, clock function, and row-cap config — the
282/// four fields the `execute_readonly` path actually reads. Cheap to
283/// `Clone` (each clone shares the underlying `PersistentVec` row
284/// storage; only the trie root pointers copy). Send + Sync so a
285/// snapshot can be moved across `tokio::task::spawn_blocking`
286/// boundaries without coordination.
287///
288/// The contract: a snapshot reflects the engine's state at the
289/// moment `clone_snapshot()` returned. Subsequent writes to the
290/// engine are NOT visible. Callers who need fresher data take a
291/// new snapshot.
292#[derive(Debug, Clone)]
293pub struct CatalogSnapshot {
294    catalog: Catalog,
295    statistics: statistics::Statistics,
296    clock: Option<ClockFn>,
297    max_query_rows: Option<usize>,
298}
299
300/// CoW-1 (v7.34) — frozen view of the *persisted* committed engine
301/// state. Carries every field the `snapshot()` envelope serializes;
302/// `Clone` is O(1) on the catalog (Arc bump) and cheap typed-clones
303/// on the trailers. Decouples "capture state" from "serialize bytes"
304/// so the background-checkpoint worker can hold the snapshot and
305/// produce bytes off the engine write lock.
306#[derive(Debug, Clone)]
307pub struct EngineSnapshot {
308    catalog: Catalog,
309    users: UserStore,
310    publications: publications::Publications,
311    subscriptions: subscriptions::Subscriptions,
312    statistics: statistics::Statistics,
313}
314
315impl EngineSnapshot {
316    /// Same envelope rules as `Engine::snapshot()`: bare catalog when
317    /// every trailer is empty, full envelope otherwise.
318    pub fn serialize(&self) -> Vec<u8> {
319        if self.users.is_empty()
320            && self.publications.is_empty()
321            && self.subscriptions.is_empty()
322            && self.statistics.is_empty()
323        {
324            self.catalog.serialize()
325        } else {
326            build_envelope(
327                &self.catalog.serialize(),
328                &users::serialize_users(&self.users),
329                &self.publications.serialize(),
330                &self.subscriptions.serialize(),
331                &self.statistics.serialize(),
332            )
333        }
334    }
335}
336
337// The engine carries several independent session/capture flags (dialect,
338// FK-checks, meta-view materialisation, redo capture); they're orthogonal
339// switches, not a state enum begging to be modelled.
340#[allow(clippy::struct_excessive_bools)]
341#[derive(Debug, Default)]
342pub struct Engine {
343    /// Committed catalog — what survives `Engine::snapshot()` and what
344    /// outside-TX `SELECT`s read.
345    catalog: Catalog,
346    /// Active TX slots, keyed by `TxId`. Empty when no TX is in flight.
347    /// v4.41.1 runtime invariant: at most one entry (single-writer
348    /// model unchanged). v4.42 will let dispatch hold multiple entries
349    /// concurrently for group commit + engine MVCC.
350    tx_catalogs: BTreeMap<TxId, TxState>,
351    /// Which slot the next exec_* call should mutate. Set by
352    /// `execute_in(sql, tx_id)` at the entry point; legacy `execute(sql)`
353    /// sets it to `IMPLICIT_TX`. None when no TX is in flight (read /
354    /// write goes straight against `catalog`).
355    current_tx: Option<TxId>,
356    /// Monotonic counter for `alloc_tx_id`. Starts at 1 — slot 0 is
357    /// reserved for `IMPLICIT_TX`.
358    next_tx_id: u64,
359    /// v7.22 (round-13 T3) — session string-literal dialect. `false`
360    /// (default) = PG semantics (backslash literal, `''` escape);
361    /// `true` = MySQL semantics (`\'` etc.). Flipped by the
362    /// deterministic session signals each dump emits: `SET sql_mode`
363    /// (only MySQL clients/dumps send it) turns it on,
364    /// `SET standard_conforming_strings = on` (every pg_dump
365    /// preamble) turns it off. The plan cache is cleared on every
366    /// flip — the same SQL text lexes differently per dialect.
367    backslash_escapes: bool,
368    /// Optional wall clock used to satisfy `NOW()` / `CURRENT_TIMESTAMP`
369    /// / `CURRENT_DATE`. Set by the host environment.
370    clock: Option<ClockFn>,
371    /// v4.1 cryptographic RNG for per-user password salt. Set by the
372    /// host. `None` means SQL-driven `CREATE USER` uses a
373    /// deterministic fallback — see `SaltFn`.
374    salt_fn: Option<SaltFn>,
375    /// v4.2 per-query row cap. `None` = unlimited. When set, a
376    /// SELECT that materialises more than `n` rows returns
377    /// `EngineError::RowLimitExceeded`. Enforced before the result
378    /// is shaped into wire frames so a runaway scan can't blow the
379    /// server's heap.
380    max_query_rows: Option<usize>,
381    /// v7.30.3 (mailrs round-26) per-query byte cap on join/filter
382    /// materialisation. `None` = unlimited. Approximate net
383    /// accounting (Value heap payloads + per-cell enum overhead)
384    /// charged at every point the join pipeline clones rows;
385    /// crossing the cap raises `EngineError::QueryBytesExceeded`
386    /// instead of pressuring the host into reclaim livelock. The
387    /// host wires this to `SPG_MAX_QUERY_BYTES` (embed defaults it
388    /// ON; the server keeps its allocator-precise budget as the
389    /// outer layer).
390    pub(crate) max_query_bytes: Option<usize>,
391    /// v4.1 RBAC user table. Empty means "no RBAC configured yet" —
392    /// the server decides what that means at the auth boundary
393    /// (open mode vs legacy single-password mode). User CRUD goes
394    /// through `create_user`/`drop_user`/`verify_user`; persistence
395    /// rides the snapshot envelope alongside the catalog.
396    pub(crate) users: UserStore,
397    /// v6.1.2 logical-replication publication catalog. Empty until
398    /// `CREATE PUBLICATION` runs. Persistence rides the v3 envelope
399    /// trailer (see `build_envelope`).
400    publications: publications::Publications,
401    /// v6.1.4 logical-replication subscription catalog. Empty until
402    /// `CREATE SUBSCRIPTION` runs. Persistence rides the v4 envelope
403    /// trailer.
404    subscriptions: subscriptions::Subscriptions,
405    /// v6.2.0 — per-column statistics for the cost-based optimizer.
406    /// Populated by `ANALYZE`; queried via `spg_statistic` virtual
407    /// table. Persistence rides the v5 envelope trailer.
408    statistics: statistics::Statistics,
409    /// v6.3.0 — engine-level plan cache. Caches the post-`prepare()`
410    /// `Statement` keyed on SQL text. In-memory only — does NOT ride
411    /// the snapshot envelope (rebuilt on demand after restart).
412    plan_cache: plan_cache::PlanCache,
413    /// v6.5.1 — per-distinct-SQL execution stats. In-memory only,
414    /// surfaced via `spg_stat_query` virtual table. Updated by the
415    /// `execute_*` paths after a successful execute.
416    query_stats: query_stats::QueryStats,
417    /// v6.5.2 — connection-state provider callback. spg-server
418    /// registers a function at startup that snapshots its
419    /// per-pgwire-connection registry into `ActivityRow`s; engine
420    /// reads through it on every `SELECT * FROM spg_stat_activity`.
421    /// `None` ⇒ no-data (returns empty rows; matches the no_std
422    /// embedded callers that don't run pgwire).
423    activity_provider: Option<ActivityProvider>,
424    /// v6.5.3 — audit-chain provider + verifier. Same pattern as
425    /// activity_provider: spg-server registers both at startup;
426    /// engine reads through on `SELECT * FROM spg_audit_chain` and
427    /// `SELECT * FROM spg_audit_verify`. `None` ⇒ no-data.
428    audit_chain_provider: Option<AuditChainProvider>,
429    audit_verifier: Option<AuditVerifier>,
430    /// v6.5.6 — slow-query log threshold in microseconds. When set,
431    /// every successful execute whose elapsed exceeds the threshold
432    /// gets fed to the registered slow-query log callback (so
433    /// spg-server can emit a structured log line). Default `None`
434    /// = no slow-query logging.
435    slow_query_threshold_us: Option<u64>,
436    slow_query_logger: Option<SlowQueryLogger>,
437    /// v7.12.1 — session parameters set via `SET <name> = <value>`.
438    /// Only `default_text_search_config` is consumed by the engine
439    /// today (the FTS function dispatcher reads it when
440    /// `to_tsvector(text)` is called without an explicit config).
441    /// All other names are accepted + recorded so PG-dump output
442    /// loads, but have no behavioural effect.
443    pub(crate) session_params: BTreeMap<String, String>,
444    /// v7.12.7 — depth counter for trigger-emitted embedded SQL.
445    /// Each time the engine executes a `DeferredEmbeddedStmt` it
446    /// increments this; the recursive `execute_stmt_with_cancel`
447    /// inside that path checks against [`MAX_TRIGGER_RECURSION`]
448    /// to bound runaway cascades (trigger A's UPDATE on table B
449    /// fires trigger B which UPDATEs table A which fires trigger
450    /// A again…). Reset to 0 once the original DML returns.
451    trigger_recursion_depth: u32,
452    /// v7.14.0 — when `SET FOREIGN_KEY_CHECKS=0` is in effect
453    /// (mysqldump preamble), the FK existence + arity check at
454    /// CREATE TABLE time is deferred. FKs referencing a
455    /// not-yet-existing parent land in `pending_foreign_keys`
456    /// keyed by child table; `SET FOREIGN_KEY_CHECKS=1` drains
457    /// the queue and resolves each FK against the now-complete
458    /// catalog. Empty by default; the queue is drained on every
459    /// `RESET ALL` too.
460    foreign_key_checks: bool,
461    /// v7.16.2 — true on the temp Engine an outer
462    /// `exec_select_with_meta_views` builds, telling that
463    /// temp engine "stop short-circuiting into the meta-view
464    /// path — your catalog already has the materialised
465    /// tables; just run the regular SELECT." Without this we'd
466    /// infinite-loop since the meta-view name (e.g.
467    /// `__spg_info_columns`) still triggers
468    /// `select_references_meta_view`.
469    meta_views_materialised: bool,
470    pending_foreign_keys: Vec<(alloc::string::String, spg_sql::ast::ForeignKeyConstraint)>,
471    /// v7.34 (crash-recovery P0 #2) — row-level redo capture. When the
472    /// embedding layer turns this on (persistence enabled), each mutating
473    /// `execute` records the physical [`RowChange`]s it applied; the
474    /// engine drains them into `last_redo` on success, and the embedded
475    /// layer reads them via [`Engine::take_redo`] to write the WAL in
476    /// place of the SQL text. Off (default) = zero capture overhead.
477    redo_capture: bool,
478    /// Redo captured by the most recent successful mutating `execute`,
479    /// awaiting drain by the embedding layer. Cleared on each capture.
480    last_redo: Vec<RowChange>,
481}
482
483/// v7.12.7 — hard cap on nested trigger-emitted embedded SQL
484/// fires. 16 deep is well past anything a normal trigger graph
485/// uses while still preventing infinite-loop wedging.
486const MAX_TRIGGER_RECURSION: u32 = 16;
487
488/// v6.5.6 — callback signature for slow-query log emission. Called
489/// with `(sql, elapsed_us)` once per successful execute that crosses
490/// the threshold.
491pub type SlowQueryLogger = fn(&str, u64);
492
493/// v6.5.2 — one row of `spg_stat_activity`. Engine-public so
494/// spg-server can construct rows without re-exporting internal
495/// dispatch types.
496#[derive(Debug, Clone)]
497pub struct ActivityRow {
498    pub pid: u32,
499    pub user: String,
500    pub started_at_us: i64,
501    pub current_sql: String,
502    pub wait_event: String,
503    pub elapsed_us: i64,
504    pub in_transaction: bool,
505    /// v7.17 Phase 2.4 — startup-param `application_name` (or the
506    /// last value the client sent via `SET application_name = '...'`).
507    /// Empty when the client never declared one.
508    pub application_name: String,
509}
510
511/// v6.5.2 — provider callback type. Fresh snapshot returned each
512/// call; engine doesn't cache the slice.
513pub type ActivityProvider = fn() -> Vec<ActivityRow>;
514
515/// v6.5.3 — one row of `spg_audit_chain`. Engine-public so
516/// spg-server can construct rows directly from `AuditEntry`.
517#[derive(Debug, Clone)]
518pub struct AuditRow {
519    pub seq: i64,
520    pub ts_ms: i64,
521    pub prev_hash_hex: String,
522    pub entry_hash_hex: String,
523    pub sql: String,
524}
525
526/// v6.5.3 — chain-table provider + verifier. spg-server registers
527/// fn pointers that snapshot / verify the audit log. `verify`
528/// returns `(verified_count, broken_at_seq)` — `broken_at_seq` is
529/// `-1` on a clean chain.
530pub type AuditChainProvider = fn() -> Vec<AuditRow>;
531pub type AuditVerifier = fn() -> (i64, i64);
532
533impl Engine {
534    pub fn new() -> Self {
535        Self {
536            catalog: Catalog::new(),
537            tx_catalogs: BTreeMap::new(),
538            current_tx: None,
539            backslash_escapes: false,
540            next_tx_id: 1,
541            clock: None,
542            salt_fn: None,
543            max_query_rows: None,
544            max_query_bytes: None,
545            users: UserStore::new(),
546            publications: publications::Publications::new(),
547            subscriptions: subscriptions::Subscriptions::new(),
548            statistics: statistics::Statistics::new(),
549            plan_cache: plan_cache::PlanCache::new(),
550            query_stats: query_stats::QueryStats::new(),
551            activity_provider: None,
552            audit_chain_provider: None,
553            audit_verifier: None,
554            slow_query_threshold_us: None,
555            slow_query_logger: None,
556            session_params: BTreeMap::new(),
557            trigger_recursion_depth: 0,
558            foreign_key_checks: true,
559            meta_views_materialised: false,
560            pending_foreign_keys: Vec::new(),
561            redo_capture: false,
562            last_redo: Vec::new(),
563        }
564    }
565
566    /// v7.11.0 — clone the engine's committed catalog + read-time
567    /// state into a frozen `CatalogSnapshot`. Cheap (`Catalog` is
568    /// backed by `PersistentVec`; cloning is O(log n) per table).
569    /// Subsequent writes to this engine are invisible to the
570    /// snapshot; the snapshot is self-contained and can be moved
571    /// to another thread for concurrent `execute_readonly_on_snapshot`
572    /// calls. The basis for [`AsyncReadHandle`] in spg-embedded-tokio
573    /// and any other read-fanout pattern.
574    #[must_use]
575    pub fn clone_snapshot(&self) -> CatalogSnapshot {
576        CatalogSnapshot {
577            catalog: self.active_catalog().clone(),
578            statistics: self.statistics.clone(),
579            clock: self.clock,
580            max_query_rows: self.max_query_rows,
581        }
582    }
583
584    /// Construct an engine restored from a previously-snapshotted catalog
585    /// (see `snapshot()`).
586    pub fn restore(catalog: Catalog) -> Self {
587        Self {
588            catalog,
589            tx_catalogs: BTreeMap::new(),
590            current_tx: None,
591            backslash_escapes: false,
592            next_tx_id: 1,
593            clock: None,
594            salt_fn: None,
595            max_query_rows: None,
596            max_query_bytes: None,
597            users: UserStore::new(),
598            publications: publications::Publications::new(),
599            subscriptions: subscriptions::Subscriptions::new(),
600            statistics: statistics::Statistics::new(),
601            plan_cache: plan_cache::PlanCache::new(),
602            query_stats: query_stats::QueryStats::new(),
603            activity_provider: None,
604            audit_chain_provider: None,
605            audit_verifier: None,
606            slow_query_threshold_us: None,
607            slow_query_logger: None,
608            session_params: BTreeMap::new(),
609            trigger_recursion_depth: 0,
610            foreign_key_checks: true,
611            meta_views_materialised: false,
612            pending_foreign_keys: Vec::new(),
613            redo_capture: false,
614            last_redo: Vec::new(),
615        }
616    }
617
618    /// Restore an engine + user table from a v4.1 envelope produced
619    /// by `snapshot_with_users()`. Falls back to plain catalog-only
620    /// restore if the envelope magic isn't present (so v3.x snapshot
621    /// files still load). v6.1.2 adds the optional publications
622    /// trailer (envelope v3); a v1/v2 envelope deserialises to an
623    /// empty publication table.
624    pub fn restore_envelope(buf: &[u8]) -> Result<Self, EngineError> {
625        match split_envelope(buf) {
626            EnvelopeParse::Pair {
627                catalog: catalog_bytes,
628                users: user_bytes,
629                publications: pub_bytes,
630                subscriptions: sub_bytes,
631                statistics: stats_bytes,
632            } => {
633                let catalog = Catalog::deserialize(catalog_bytes).map_err(EngineError::Storage)?;
634                let users = users::deserialize_users(user_bytes)
635                    .map_err(|e| EngineError::Unsupported(alloc::format!("users restore: {e}")))?;
636                let publications = match pub_bytes {
637                    Some(b) => publications::Publications::deserialize(b).map_err(|e| {
638                        EngineError::Unsupported(alloc::format!("publications restore: {e:?}"))
639                    })?,
640                    None => publications::Publications::new(),
641                };
642                let subscriptions = match sub_bytes {
643                    Some(b) => subscriptions::Subscriptions::deserialize(b).map_err(|e| {
644                        EngineError::Unsupported(alloc::format!("subscriptions restore: {e:?}"))
645                    })?,
646                    None => subscriptions::Subscriptions::new(),
647                };
648                let statistics = match stats_bytes {
649                    Some(b) => statistics::Statistics::deserialize(b).map_err(|e| {
650                        EngineError::Unsupported(alloc::format!("statistics restore: {e:?}"))
651                    })?,
652                    None => statistics::Statistics::new(),
653                };
654                Ok(Self {
655                    catalog,
656                    tx_catalogs: BTreeMap::new(),
657                    current_tx: None,
658                    backslash_escapes: false,
659                    next_tx_id: 1,
660                    clock: None,
661                    salt_fn: None,
662                    max_query_rows: None,
663                    max_query_bytes: None,
664                    users,
665                    publications,
666                    subscriptions,
667                    statistics,
668                    plan_cache: plan_cache::PlanCache::new(),
669                    query_stats: query_stats::QueryStats::new(),
670                    activity_provider: None,
671                    audit_chain_provider: None,
672                    audit_verifier: None,
673                    slow_query_threshold_us: None,
674                    slow_query_logger: None,
675                    session_params: BTreeMap::new(),
676                    trigger_recursion_depth: 0,
677                    foreign_key_checks: true,
678                    meta_views_materialised: false,
679                    pending_foreign_keys: Vec::new(),
680                    redo_capture: false,
681                    last_redo: Vec::new(),
682                })
683            }
684            EnvelopeParse::CrcMismatch { expected, computed } => {
685                Err(EngineError::Storage(StorageError::Corrupt(alloc::format!(
686                    "snapshot envelope CRC32 mismatch (expected={expected:#010x}, computed={computed:#010x})"
687                ))))
688            }
689            EnvelopeParse::Bare => {
690                let catalog = Catalog::deserialize(buf).map_err(EngineError::Storage)?;
691                Ok(Self::restore(catalog))
692            }
693        }
694    }
695
696    pub const fn users(&self) -> &UserStore {
697        &self.users
698    }
699
700    /// Builder: attach a wall clock so `NOW()` / `CURRENT_TIMESTAMP` /
701    /// `CURRENT_DATE` evaluate to a real value instead of erroring out.
702    #[must_use]
703    pub const fn with_clock(mut self, clock: ClockFn) -> Self {
704        self.clock = Some(clock);
705        self
706    }
707
708    /// Builder: attach an OS-backed RNG for per-user password salts.
709    /// The host (`spg-server`) typically wires this to `/dev/urandom`.
710    #[must_use]
711    pub const fn with_salt_fn(mut self, f: SaltFn) -> Self {
712        self.salt_fn = Some(f);
713        self
714    }
715
716    /// Builder: cap the number of rows a single SELECT may return.
717    /// Exceeding the cap raises `EngineError::RowLimitExceeded` —
718    /// the bound is checked inside the executor so a runaway
719    /// catalog scan can't allocate millions of rows before the
720    /// server gets a chance to reject the result.
721    #[must_use]
722    pub const fn with_max_query_rows(mut self, n: usize) -> Self {
723        self.max_query_rows = Some(n);
724        self
725    }
726
727    /// Builder: cap the approximate heap bytes a single SELECT's
728    /// join/filter materialisation may hold. Exceeding the cap
729    /// raises `EngineError::QueryBytesExceeded`. Rows are the wrong
730    /// unit when one row carries a multi-MB body (mailrs round-26:
731    /// 1000-row batches of full mail text walked a 15 GiB host into
732    /// reclaim livelock without ever tripping a row ceiling).
733    #[must_use]
734    pub const fn with_max_query_bytes(mut self, n: usize) -> Self {
735        self.max_query_bytes = Some(n);
736        self
737    }
738
739    /// The *committed* catalog. Note: during a transaction this returns the
740    /// pre-TX state — `SELECT` inside a TX goes through `execute()` and reads
741    /// the shadow. Tests that inspect outside-TX state should use this.
742    pub const fn catalog(&self) -> &Catalog {
743        &self.catalog
744    }
745
746    /// Capture a frozen view of the committed engine state. Catalog
747    /// is O(1) Arc bump; trailers are cheap clones. Decouples "capture"
748    /// (needs &Engine) from "serialize" (CPU, no engine access) — the
749    /// seam the background-checkpoint worker rides in CoW-2.
750    pub fn snapshot_data(&self) -> EngineSnapshot {
751        EngineSnapshot {
752            catalog: self.catalog.clone(),
753            users: self.users.clone(),
754            publications: self.publications.clone(),
755            subscriptions: self.subscriptions.clone(),
756            statistics: self.statistics.clone(),
757        }
758    }
759
760    /// Serialize the *committed* catalog to bytes. v0.6 was full-snapshot; v0.9
761    /// adds the rule that an open TX's shadow is never snapshotted — only the
762    /// post-COMMIT state is persisted. v4.1 wraps the catalog in an envelope
763    /// when there are users to persist; an empty user table snapshots as the
764    /// bare catalog format (backwards-compat with v3.x readers). v6.1.2
765    /// adds publications to the envelope condition: either non-empty
766    /// users OR non-empty publications now triggers the envelope path.
767    pub fn snapshot(&self) -> Vec<u8> {
768        self.snapshot_data().serialize()
769    }
770
771    /// True when at least one TX slot is in flight. v4.41.1 runtime
772    /// invariant: at most one slot active at a time (dispatch holds
773    /// `engine.write()` across the entire wrap). v4.42 will let this
774    /// return true with multiple slots concurrently.
775    pub fn in_transaction(&self) -> bool {
776        !self.tx_catalogs.is_empty()
777    }
778
779    /// v4.41.1 allocate a fresh TX handle. Used by spg-server dispatch
780    /// to scope each implicit-wrap BEGIN..stmt..COMMIT to its own slot
781    /// in `tx_catalogs`. v4.42 — the commit-barrier leader allocates
782    /// one of these per task in its group, runs `BEGIN`+sql+`COMMIT`
783    /// sequentially under a single `engine.write()` so each task's
784    /// mutations accumulate into shared state, then either keeps the
785    /// accumulated state (fsync OK) or restores the pre-image via
786    /// `replace_catalog` (fsync err).
787    pub fn alloc_tx_id(&mut self) -> TxId {
788        let id = TxId(self.next_tx_id);
789        self.next_tx_id = self.next_tx_id.saturating_add(1);
790        id
791    }
792
793    /// v4.42 — atomically replace the live catalog. Used by the
794    /// commit-barrier leader to roll back a group whose batched
795    /// fsync failed: the leader snapshots `engine.catalog().clone()`
796    /// (O(1) Arc bump after the v4.39/v4.40 persistent migration)
797    /// at group start, sequentially applies each task's BEGIN+sql+
798    /// COMMIT under the same write lock to accumulate mutations
799    /// into shared state, batches the WAL bytes, fsyncs once, and
800    /// on failure calls this with the pre-image to undo every
801    /// task in the group at once.
802    ///
803    /// **Does NOT touch `tx_catalogs` / `current_tx`.** Any
804    /// explicit-TX slot from a concurrent client (created via the
805    /// legacy `IMPLICIT_TX`-less dispatch path or via the future
806    /// MVCC-readers v5+ work) has its own snapshot baked into the
807    /// slot — restoring `self.catalog` to the pre-image leaves
808    /// those slots untouched, exactly as they were when the leader
809    /// took the lock. The leader's own implicit-TX slots are all
810    /// already discarded (`exec_commit` removed them as each
811    /// task's COMMIT ran) by the time this is reached.
812    pub fn replace_catalog(&mut self, catalog: Catalog) {
813        self.catalog = catalog;
814    }
815
816    /// v6.7.0 — public shim around `Catalog::freeze_oldest_to_cold`
817    /// so tests + the spg-server freezer can drive a freeze without
818    /// reaching into the private `active_catalog_mut`. v6.7.4
819    /// parallel freezer will build on this surface.
820    ///
821    /// Marks the table's cached `cold_row_count` stale because the
822    /// freeze added cold locators that ANALYZE hasn't yet refreshed.
823    pub fn freeze_oldest_to_cold(
824        &mut self,
825        table_name: &str,
826        index_name: &str,
827        max_rows: usize,
828    ) -> Result<spg_storage::FreezeReport, EngineError> {
829        let report = self
830            .active_catalog_mut()
831            .freeze_oldest_to_cold(table_name, index_name, max_rows)
832            .map_err(EngineError::Storage)?;
833        if let Some(t) = self.active_catalog_mut().get_mut(table_name) {
834            t.mark_cold_row_count_stale();
835        }
836        Ok(report)
837    }
838
839    /// v6.7.5 — public shim used by the spg-server follower's
840    /// segment-forwarding receiver. Registers a cold-tier segment
841    /// at a specific id (the master's id, as transmitted on the
842    /// wire) so the follower's BTree-Cold locators stay byte-
843    /// identical with the master's. Wraps
844    /// `Catalog::load_segment_bytes_at` under the standard
845    /// clone-mutate-replace pattern.
846    ///
847    /// Returns `Ok(())` on success **and** on the "slot already
848    /// occupied" case — a follower mid-reconnect may receive a
849    /// segment chunk for a segment_id it already has on disk
850    /// (forwarded last session); the caller should treat that
851    /// path as a no-op rather than a fatal error.
852    pub fn receive_cold_segment(
853        &mut self,
854        segment_id: u32,
855        bytes: Vec<u8>,
856    ) -> Result<(), EngineError> {
857        let mut new_cat = self.catalog.clone();
858        match new_cat.load_segment_bytes_at(segment_id, bytes) {
859            Ok(()) => {
860                self.replace_catalog(new_cat);
861                Ok(())
862            }
863            Err(StorageError::Corrupt(msg)) if msg.contains("already occupied") => Ok(()),
864            Err(e) => Err(EngineError::Storage(e)),
865        }
866    }
867
868    pub(crate) fn active_catalog(&self) -> &Catalog {
869        match self.current_tx {
870            Some(t) => self
871                .tx_catalogs
872                .get(&t)
873                .map_or(&self.catalog, |s| &s.catalog),
874            None => &self.catalog,
875        }
876    }
877
878    fn active_catalog_mut(&mut self) -> &mut Catalog {
879        let tx = self.current_tx;
880        match tx {
881            Some(t) => match self.tx_catalogs.get_mut(&t) {
882                Some(s) => &mut s.catalog,
883                None => &mut self.catalog,
884            },
885            None => &mut self.catalog,
886        }
887    }
888
889    /// v7.34 (crash-recovery P0 #2) — turn row-level redo capture on/off.
890    /// The embedding layer enables it when persistence is on so each
891    /// mutating `execute` records the physical [`RowChange`]s it applied
892    /// (drained via [`Engine::take_redo`]). Off = zero capture overhead.
893    pub fn set_redo_capture(&mut self, on: bool) {
894        self.redo_capture = on;
895    }
896
897    /// v7.34 — take the redo captured by the most recent successful
898    /// mutating `execute` (empty when capture is off, the statement was a
899    /// read, or it changed nothing). The embedding layer writes these to
900    /// the WAL in place of the SQL text.
901    pub fn take_redo(&mut self) -> Vec<RowChange> {
902        core::mem::take(&mut self.last_redo)
903    }
904
905    /// v7.34 (crash-recovery P0 #2) — replay a row-level redo log onto the
906    /// committed catalog (the row-level WAL recovery primitive: apply the
907    /// captured physical changes from a checkpoint baseline, in place of
908    /// re-executing the SQL). Trusts the log — no uniqueness/FK/parse.
909    pub fn apply_redo(&mut self, changes: &[RowChange]) -> Result<(), EngineError> {
910        self.catalog
911            .apply_redo(changes)
912            .map_err(EngineError::Storage)
913    }
914
915    /// Read-only execute path. Succeeds for `SELECT` / `SHOW TABLES`
916    /// / `SHOW COLUMNS`; returns `EngineError::WriteRequired` for
917    /// every other statement, so the caller can fall through to the
918    /// `&mut self` `execute` path under a write lock. Engine state is
919    /// not mutated even on the success path (`rewrite_clock_calls`
920    /// and `resolve_order_by_position` both mutate the locally-owned
921    /// AST, not `self`).
922    ///
923    /// v4.2: cap result-set size. Applied after the executor
924    /// materialises rows but before they leave the engine — wrapping
925    /// every Rows-returning exec_* function would scatter the check.
926    ///
927    /// v7.31 (memory campaign, bucket A) — the same choke point now
928    /// also enforces the BYTE budget on the final result set, so
929    /// single-table and aggregate paths (which don't route through
930    /// the join materialiser's incremental accounting) still cannot
931    /// hand the host an unbounded result. Intermediate single-table
932    /// clones are the 7.31.x follow-up (design doc, bucket A).
933    fn enforce_row_limit(
934        &self,
935        result: Result<QueryResult, EngineError>,
936    ) -> Result<QueryResult, EngineError> {
937        if let Ok(QueryResult::Rows { rows, .. }) = &result {
938            if let Some(cap) = self.max_query_rows
939                && rows.len() > cap
940            {
941                return Err(EngineError::RowLimitExceeded(cap));
942            }
943            if let Some(byte_cap) = self.max_query_bytes
944                && approx_rows_bytes(rows) > byte_cap
945            {
946                return Err(EngineError::QueryBytesExceeded(byte_cap));
947            }
948        }
949        result
950    }
951}
952
953/// v7.31 (memory campaign — ceiling-first / never-die, design v1) —
954/// per-table slice of the engine's resident-memory accounting.
955/// `hot_encoded_bytes` is the storage layer's maintained meter (what
956/// the rows encode to); `approx_resident_bytes` is what they COST in
957/// RAM (per-cell enum slots + heap payloads via `approx_row_bytes`)
958/// — the gap between the two is the representation multiplier the
959/// round-26 report measured at ~11× end-to-end.
960#[derive(Debug, Clone)]
961pub struct TableMemoryStats {
962    pub name: String,
963    pub hot_rows: u64,
964    /// Cached cold-row count (refreshed by ANALYZE — see
965    /// `Table::cold_row_count`'s staleness contract).
966    pub cold_rows: u64,
967    pub hot_encoded_bytes: u64,
968    pub approx_resident_bytes: u64,
969    pub index_count: u64,
970    /// v7.31 C2 — sum of `IndexKind::approx_resident_bytes()` over the
971    /// table's indices: every variant (BTree / NSW / BRIN / GIN family)
972    /// walks its own structure, so the GIN posting lists and NSW layer
973    /// adjacency that dominate text/vector tables are counted honestly
974    /// instead of the old flat-token estimate.
975    pub approx_index_bytes: u64,
976}
977
978/// v7.31 — whole-engine memory snapshot: the polling form of the
979/// round-26 ask-4 watermark signal. Hosts compare
980/// `total_approx_resident_bytes` (+ their own WAL/file accounting)
981/// against their deployment ceiling and shed/shrink before the
982/// kernel does it for them.
983#[derive(Debug, Clone)]
984pub struct MemoryStats {
985    pub tables: Vec<TableMemoryStats>,
986    pub total_hot_encoded_bytes: u64,
987    pub total_approx_resident_bytes: u64,
988    pub total_approx_index_bytes: u64,
989    /// The active per-query materialisation budget (bucket A), so a
990    /// monitoring host sees ceiling and usage through one call.
991    pub max_query_bytes: Option<usize>,
992    /// v7.31 C2 — bucket D: live WAL bytes (active chunk + buffered,
993    /// uncheckpointed). `None` from the engine itself — it has no WAL;
994    /// the durable hosts (embed `Database`, server) fill it in from
995    /// their own WAL accounting. `Some(0)` means "host has a WAL and
996    /// it is empty"; `None` means "no WAL on this path" (in-memory).
997    pub wal_bytes: Option<u64>,
998}
999
1000/// v6.2.0 — true for engine-managed catalog tables that the bare
1001/// `ANALYZE` (no target) should skip. v6.2.0 has no internal
1002/// tables yet (publications / subscriptions / users / statistics
1003/// all live as engine fields, not catalog tables), so this is a
1004/// reserved future-proofing hook — every existing user table is
1005/// analysed.
1006const fn is_internal_table_name(_name: &str) -> bool {
1007    false
1008}
1009
1010#[cfg(test)]
1011mod tests;