spg_engine/lib.rs
1//! SPG execution engine — v0.3 wires the SQL front-end to the in-memory
2//! storage layer. Implements `CREATE TABLE`, single-row `INSERT VALUES`, and
3//! `SELECT * FROM <table>` (no WHERE yet — that lands in v0.4 alongside
4//! expression evaluation against rows).
5#![no_std]
6
7extern crate alloc;
8
9pub mod aggregate;
10mod bytebudget;
11mod cancel;
12mod clock;
13mod constraints;
14mod conversions;
15pub mod copy;
16mod ddl;
17pub mod describe;
18mod dml;
19mod envelope;
20pub mod eval;
21mod execute;
22mod explain;
23mod expr_analysis;
24pub mod fts;
25mod index_access;
26mod join;
27mod joinfold;
28pub mod json;
29mod maintenance;
30pub mod memoize;
31mod numeric;
32mod orderby;
33pub mod plan_cache;
34mod plpgsql;
35pub mod publications;
36pub mod query_stats;
37mod readonly;
38pub mod reorder;
39mod select;
40pub mod selectivity;
41mod sequence;
42mod session;
43mod show;
44mod spg_admin;
45pub mod statistics;
46mod subquery;
47pub mod subscriptions;
48mod substitute;
49mod system_catalog;
50mod table_access;
51mod transaction;
52pub mod triggers;
53pub mod users;
54mod window;
55
56pub use crate::users::{Role, ScramSecrets, UserError, UserStore};
57pub use cancel::{CancelToken, MonotonicNowFn};
58pub use execute::StreamItem;
59
60use bytebudget::*;
61pub(crate) use clock::{rewrite_clock_calls, value_to_literal};
62use constraints::*;
63use conversions::*;
64pub use conversions::{
65 format_bigint_2d_text_pub, format_hstore_text, format_int_2d_text_pub, format_range_text,
66 format_text_2d_text_pub,
67};
68pub(crate) use ddl::{
69 canonicalize_set_value, enforce_enum_label, eval_runtime_default_free,
70 resolve_column_default_free,
71};
72pub(crate) use envelope::{EnvelopeParse, build_envelope, split_envelope};
73use expr_analysis::*;
74use index_access::*;
75pub(crate) use orderby::{
76 apply_offset_and_limit, apply_offset_and_limit_tagged, build_order_keys, canonical_value_repr,
77 expand_group_by_all, order_by_value_cmp, partial_sort_tagged, render_histogram_bounds,
78 resolve_order_by_position, sort_by_keys, sort_values_for_histogram, value_cmp, value_to_f64,
79};
80pub(crate) use select::{build_projection, infer_column_types, value_to_order_key};
81pub(crate) use show::render_create_table;
82pub(crate) use subquery::{
83 build_in_list_set, collect_scalar_subqueries, expr_has_subquery, expr_tree_has_subquery,
84};
85pub use substitute::substitute_placeholders;
86use substitute::*;
87use system_catalog::*;
88use window::*;
89
90use alloc::collections::BTreeMap;
91use alloc::string::String;
92use alloc::vec::Vec;
93use core::fmt;
94
95// v7.16.0 — re-export the parsed-statement AST so downstream
96// crates (spg-embedded → spg-sqlx) don't need a direct dep on
97// spg-sql for the prepare/bind handle.
98pub use spg_sql::ast::Statement as ParsedStatement;
99use spg_sql::parser::ParseError;
100use spg_storage::{Catalog, ColumnSchema, Row, RowChange, StorageError};
101
102use crate::eval::EvalError;
103
104/// Result of executing one statement.
105#[derive(Debug, Clone, PartialEq)]
106#[non_exhaustive]
107pub enum QueryResult {
108 /// DDL or DML succeeded.
109 ///
110 /// `affected` is the row count for `INSERT` and 0 elsewhere.
111 /// `modified_catalog` tells the server whether this statement
112 /// caused the *committed* catalog to change — it's the signal to
113 /// snapshot/audit. False for `BEGIN`/`ROLLBACK`, false for writeful
114 /// statements executed inside a transaction (those only touch the
115 /// shadow), and true for `COMMIT` and for writes outside a TX.
116 CommandOk {
117 affected: usize,
118 modified_catalog: bool,
119 },
120 /// `SELECT` returned a (possibly empty) row set.
121 Rows {
122 columns: Vec<ColumnSchema>,
123 rows: Vec<Row>,
124 },
125}
126
127/// All errors the engine can return.
128///
129/// Marked `#[non_exhaustive]` from v7.5.0 onward: external `match`
130/// must include a `_` arm so new variants in subsequent v7.x releases
131/// are not breaking changes.
132#[derive(Debug, Clone, PartialEq)]
133#[non_exhaustive]
134pub enum EngineError {
135 Parse(ParseError),
136 Storage(StorageError),
137 Eval(EvalError),
138 /// Front-end accepted a construct that the v0.x executor doesn't support.
139 Unsupported(String),
140 /// `BEGIN` while another transaction is already open.
141 TransactionAlreadyOpen,
142 /// `COMMIT` / `ROLLBACK` with no active transaction.
143 NoActiveTransaction,
144 /// v4.0 sentinel: `execute_readonly` got a statement that
145 /// mutates engine state (INSERT / CREATE / BEGIN / COMMIT / …).
146 /// The caller should retake the write lock and dispatch through
147 /// `execute(&mut self)` instead.
148 WriteRequired,
149 /// v4.2: a SELECT would have returned more rows than the
150 /// configured `max_query_rows` cap. Carries the cap.
151 RowLimitExceeded(usize),
152 /// v7.30.3 (mailrs round-26): a SELECT's join/filter
153 /// materialisation would have held more (approximate) heap
154 /// bytes than the configured `max_query_bytes` cap. The row
155 /// cap above counts rows; this counts bytes, because one row
156 /// can be a multi-MB mail body — 1000 fat rows pressure the
157 /// host long before any row ceiling trips. Carries the cap.
158 QueryBytesExceeded(usize),
159 /// v4.5: cooperative cancellation — the host (server's
160 /// per-query watchdog) set the cancel flag while a long-running
161 /// SELECT / UPDATE / DELETE was scanning rows. The partial work
162 /// is discarded; the caller should surface this as a timeout
163 /// to the client.
164 Cancelled,
165}
166
167impl fmt::Display for EngineError {
168 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
169 match self {
170 Self::Parse(e) => write!(f, "parse: {e}"),
171 Self::Storage(e) => write!(f, "storage: {e}"),
172 Self::Eval(e) => write!(f, "eval: {e}"),
173 Self::Unsupported(s) => write!(f, "unsupported: {s}"),
174 Self::TransactionAlreadyOpen => f.write_str("a transaction is already open"),
175 Self::NoActiveTransaction => f.write_str("no active transaction"),
176 Self::WriteRequired => {
177 f.write_str("statement requires a write lock (use execute, not execute_readonly)")
178 }
179 Self::RowLimitExceeded(n) => {
180 write!(f, "query exceeded max_query_rows={n}")
181 }
182 Self::QueryBytesExceeded(n) => {
183 write!(
184 f,
185 "query materialisation exceeded max_query_bytes={n} (set SPG_MAX_QUERY_BYTES to raise, 0 to disable)"
186 )
187 }
188 Self::Cancelled => f.write_str("query cancelled (timeout or client request)"),
189 }
190 }
191}
192
193impl From<ParseError> for EngineError {
194 fn from(e: ParseError) -> Self {
195 Self::Parse(e)
196 }
197}
198impl From<StorageError> for EngineError {
199 fn from(e: StorageError) -> Self {
200 Self::Storage(e)
201 }
202}
203impl From<EvalError> for EngineError {
204 fn from(e: EvalError) -> Self {
205 Self::Eval(e)
206 }
207}
208
209/// The execution engine. Holds the catalog and (later) other server-scope
210/// state. `Engine::new()` is intentionally cheap so callers can construct one
211/// per database, per test.
212/// Function pointer that returns "now" as microseconds since Unix
213/// epoch. The engine is `no_std`, so it can't reach for `std::time`
214/// itself — callers (`spg-server`, the sqllogictest runner) inject a
215/// concrete implementation. `None` means `NOW()` / `CURRENT_*` raise
216/// `Unsupported`.
217pub type ClockFn = fn() -> i64;
218
219/// Function pointer that produces 16 cryptographically random bytes.
220/// Like `ClockFn`, the engine is `no_std` and can't reach for /dev/urandom
221/// itself — host (`spg-server`) injects an OS-backed source. `None`
222/// means SQL-driven `CREATE USER` falls back to a deterministic salt
223/// derived from the username (acceptable in tests; the server always
224/// installs a real RNG so production paths never see this).
225pub type SaltFn = fn() -> [u8; 16];
226
227/// v4.5 cooperative cancellation token. A long-running SELECT /
228/// UPDATE / DELETE checks `is_cancelled` at row-loop checkpoints
229/// and bails with `EngineError::Cancelled`. The host
230/// (`spg-server`) creates an `AtomicBool` per query, spawns a
231/// watchdog thread that sets it after `SPG_QUERY_TIMEOUT_MS`,
232/// and passes it via `execute_with_cancel` / `execute_readonly_with_cancel`.
233///
234/// `CancelToken::none()` is a no-op — used by the legacy `execute`
235/// and `execute_readonly` entry points so existing callers don't
236/// change.
237/// v4.41.1 opaque transaction handle. Returned by `Engine::alloc_tx_id`,
238/// threaded through `Engine::execute_in` so dispatch can identify which
239/// in-flight TX a statement belongs to. `IMPLICIT_TX` is the reserved
240/// slot every legacy caller — engine self-tests, spg-cli, spg-embedded,
241/// startup replay — implicitly uses through the unchanged
242/// `Engine::execute(sql)` API. v4.41.1 keeps at most one active slot at
243/// runtime (dispatch holds `engine.write()` across the wrap, same as
244/// v4.34); the map shape is here to let v4.42 turn on N in-flight
245/// implicit TXs without reshuffling the engine internals.
246#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
247pub struct TxId(pub u64);
248
249/// Reserved slot used by `Engine::execute(sql)` — the legacy single-
250/// global-shadow path. New `alloc_tx_id` handles start at 1.
251pub const IMPLICIT_TX: TxId = TxId(0);
252
253/// v6.7.3 — default segment-size threshold used by `COMPACT COLD
254/// SEGMENTS` when no explicit target is supplied. Segments whose
255/// `OwnedSegment::bytes().len()` is **strictly** less than this
256/// value are eligible to merge. spg-server reads
257/// `SPG_COMPACTION_TARGET_SEGMENT_BYTES` to override.
258pub const COMPACTION_TARGET_DEFAULT_BYTES: u64 = 4 * 1024 * 1024;
259
260/// Per-slot transaction state. Held inside `tx_catalogs[tx_id]` for the
261/// lifetime of a BEGIN..COMMIT (or BEGIN..ROLLBACK) window. Drops when
262/// the TX commits (its `catalog` is moved over `Engine.catalog`) or
263/// rolls back (slot removed, catalog discarded).
264#[derive(Debug, Default, Clone)]
265struct TxState {
266 /// The TX's shadow copy of the catalog. Started as a clone of
267 /// `Engine.catalog` at BEGIN time; writes flow into it; COMMIT
268 /// installs it over `Engine.catalog`. `Catalog::clone()` is O(1)
269 /// since v4.40 (`PersistentVec` rows + `PersistentBTreeMap` indices).
270 catalog: Catalog,
271 /// Per-TX savepoint stack. Each entry pairs the savepoint name with
272 /// a clone of `catalog` at the moment `SAVEPOINT <name>` fired.
273 /// `ROLLBACK TO <name>` restores from the entry and pops everything
274 /// after it; `RELEASE <name>` discards the entry and everything
275 /// after; COMMIT/ROLLBACK clears the whole stack.
276 savepoints: Vec<(String, Catalog)>,
277}
278
279/// v7.11.0 — frozen read-only view of the engine's committed state.
280/// Constructed via [`Engine::clone_snapshot`]. Holds clones of the
281/// catalog, statistics, clock function, and row-cap config — the
282/// four fields the `execute_readonly` path actually reads. Cheap to
283/// `Clone` (each clone shares the underlying `PersistentVec` row
284/// storage; only the trie root pointers copy). Send + Sync so a
285/// snapshot can be moved across `tokio::task::spawn_blocking`
286/// boundaries without coordination.
287///
288/// The contract: a snapshot reflects the engine's state at the
289/// moment `clone_snapshot()` returned. Subsequent writes to the
290/// engine are NOT visible. Callers who need fresher data take a
291/// new snapshot.
292#[derive(Debug, Clone)]
293pub struct CatalogSnapshot {
294 catalog: Catalog,
295 statistics: statistics::Statistics,
296 clock: Option<ClockFn>,
297 max_query_rows: Option<usize>,
298}
299
300/// CoW-1 (v7.34) — frozen view of the *persisted* committed engine
301/// state. Carries every field the `snapshot()` envelope serializes;
302/// `Clone` is O(1) on the catalog (Arc bump) and cheap typed-clones
303/// on the trailers. Decouples "capture state" from "serialize bytes"
304/// so the background-checkpoint worker can hold the snapshot and
305/// produce bytes off the engine write lock.
306#[derive(Debug, Clone)]
307pub struct EngineSnapshot {
308 catalog: Catalog,
309 users: UserStore,
310 publications: publications::Publications,
311 subscriptions: subscriptions::Subscriptions,
312 statistics: statistics::Statistics,
313}
314
315impl EngineSnapshot {
316 /// Same envelope rules as `Engine::snapshot()`: bare catalog when
317 /// every trailer is empty, full envelope otherwise.
318 pub fn serialize(&self) -> Vec<u8> {
319 if self.users.is_empty()
320 && self.publications.is_empty()
321 && self.subscriptions.is_empty()
322 && self.statistics.is_empty()
323 {
324 self.catalog.serialize()
325 } else {
326 build_envelope(
327 &self.catalog.serialize(),
328 &users::serialize_users(&self.users),
329 &self.publications.serialize(),
330 &self.subscriptions.serialize(),
331 &self.statistics.serialize(),
332 )
333 }
334 }
335}
336
337// The engine carries several independent session/capture flags (dialect,
338// FK-checks, meta-view materialisation, redo capture); they're orthogonal
339// switches, not a state enum begging to be modelled.
340#[allow(clippy::struct_excessive_bools)]
341#[derive(Debug, Default)]
342pub struct Engine {
343 /// Committed catalog — what survives `Engine::snapshot()` and what
344 /// outside-TX `SELECT`s read.
345 catalog: Catalog,
346 /// Active TX slots, keyed by `TxId`. Empty when no TX is in flight.
347 /// v4.41.1 runtime invariant: at most one entry (single-writer
348 /// model unchanged). v4.42 will let dispatch hold multiple entries
349 /// concurrently for group commit + engine MVCC.
350 tx_catalogs: BTreeMap<TxId, TxState>,
351 /// Which slot the next exec_* call should mutate. Set by
352 /// `execute_in(sql, tx_id)` at the entry point; legacy `execute(sql)`
353 /// sets it to `IMPLICIT_TX`. None when no TX is in flight (read /
354 /// write goes straight against `catalog`).
355 current_tx: Option<TxId>,
356 /// Monotonic counter for `alloc_tx_id`. Starts at 1 — slot 0 is
357 /// reserved for `IMPLICIT_TX`.
358 next_tx_id: u64,
359 /// v7.22 (round-13 T3) — session string-literal dialect. `false`
360 /// (default) = PG semantics (backslash literal, `''` escape);
361 /// `true` = MySQL semantics (`\'` etc.). Flipped by the
362 /// deterministic session signals each dump emits: `SET sql_mode`
363 /// (only MySQL clients/dumps send it) turns it on,
364 /// `SET standard_conforming_strings = on` (every pg_dump
365 /// preamble) turns it off. The plan cache is cleared on every
366 /// flip — the same SQL text lexes differently per dialect.
367 backslash_escapes: bool,
368 /// Optional wall clock used to satisfy `NOW()` / `CURRENT_TIMESTAMP`
369 /// / `CURRENT_DATE`. Set by the host environment.
370 clock: Option<ClockFn>,
371 /// v4.1 cryptographic RNG for per-user password salt. Set by the
372 /// host. `None` means SQL-driven `CREATE USER` uses a
373 /// deterministic fallback — see `SaltFn`.
374 salt_fn: Option<SaltFn>,
375 /// v4.2 per-query row cap. `None` = unlimited. When set, a
376 /// SELECT that materialises more than `n` rows returns
377 /// `EngineError::RowLimitExceeded`. Enforced before the result
378 /// is shaped into wire frames so a runaway scan can't blow the
379 /// server's heap.
380 max_query_rows: Option<usize>,
381 /// v7.30.3 (mailrs round-26) per-query byte cap on join/filter
382 /// materialisation. `None` = unlimited. Approximate net
383 /// accounting (Value heap payloads + per-cell enum overhead)
384 /// charged at every point the join pipeline clones rows;
385 /// crossing the cap raises `EngineError::QueryBytesExceeded`
386 /// instead of pressuring the host into reclaim livelock. The
387 /// host wires this to `SPG_MAX_QUERY_BYTES` (embed defaults it
388 /// ON; the server keeps its allocator-precise budget as the
389 /// outer layer).
390 pub(crate) max_query_bytes: Option<usize>,
391 /// v4.1 RBAC user table. Empty means "no RBAC configured yet" —
392 /// the server decides what that means at the auth boundary
393 /// (open mode vs legacy single-password mode). User CRUD goes
394 /// through `create_user`/`drop_user`/`verify_user`; persistence
395 /// rides the snapshot envelope alongside the catalog.
396 pub(crate) users: UserStore,
397 /// v6.1.2 logical-replication publication catalog. Empty until
398 /// `CREATE PUBLICATION` runs. Persistence rides the v3 envelope
399 /// trailer (see `build_envelope`).
400 publications: publications::Publications,
401 /// v6.1.4 logical-replication subscription catalog. Empty until
402 /// `CREATE SUBSCRIPTION` runs. Persistence rides the v4 envelope
403 /// trailer.
404 subscriptions: subscriptions::Subscriptions,
405 /// v6.2.0 — per-column statistics for the cost-based optimizer.
406 /// Populated by `ANALYZE`; queried via `spg_statistic` virtual
407 /// table. Persistence rides the v5 envelope trailer.
408 statistics: statistics::Statistics,
409 /// v6.3.0 — engine-level plan cache. Caches the post-`prepare()`
410 /// `Statement` keyed on SQL text. In-memory only — does NOT ride
411 /// the snapshot envelope (rebuilt on demand after restart).
412 plan_cache: plan_cache::PlanCache,
413 /// v6.5.1 — per-distinct-SQL execution stats. In-memory only,
414 /// surfaced via `spg_stat_query` virtual table. Updated by the
415 /// `execute_*` paths after a successful execute.
416 query_stats: query_stats::QueryStats,
417 /// v6.5.2 — connection-state provider callback. spg-server
418 /// registers a function at startup that snapshots its
419 /// per-pgwire-connection registry into `ActivityRow`s; engine
420 /// reads through it on every `SELECT * FROM spg_stat_activity`.
421 /// `None` ⇒ no-data (returns empty rows; matches the no_std
422 /// embedded callers that don't run pgwire).
423 activity_provider: Option<ActivityProvider>,
424 /// v6.5.3 — audit-chain provider + verifier. Same pattern as
425 /// activity_provider: spg-server registers both at startup;
426 /// engine reads through on `SELECT * FROM spg_audit_chain` and
427 /// `SELECT * FROM spg_audit_verify`. `None` ⇒ no-data.
428 audit_chain_provider: Option<AuditChainProvider>,
429 audit_verifier: Option<AuditVerifier>,
430 /// v6.5.6 — slow-query log threshold in microseconds. When set,
431 /// every successful execute whose elapsed exceeds the threshold
432 /// gets fed to the registered slow-query log callback (so
433 /// spg-server can emit a structured log line). Default `None`
434 /// = no slow-query logging.
435 slow_query_threshold_us: Option<u64>,
436 slow_query_logger: Option<SlowQueryLogger>,
437 /// v7.12.1 — session parameters set via `SET <name> = <value>`.
438 /// Only `default_text_search_config` is consumed by the engine
439 /// today (the FTS function dispatcher reads it when
440 /// `to_tsvector(text)` is called without an explicit config).
441 /// All other names are accepted + recorded so PG-dump output
442 /// loads, but have no behavioural effect.
443 pub(crate) session_params: BTreeMap<String, String>,
444 /// v7.12.7 — depth counter for trigger-emitted embedded SQL.
445 /// Each time the engine executes a `DeferredEmbeddedStmt` it
446 /// increments this; the recursive `execute_stmt_with_cancel`
447 /// inside that path checks against [`MAX_TRIGGER_RECURSION`]
448 /// to bound runaway cascades (trigger A's UPDATE on table B
449 /// fires trigger B which UPDATEs table A which fires trigger
450 /// A again…). Reset to 0 once the original DML returns.
451 trigger_recursion_depth: u32,
452 /// v7.14.0 — when `SET FOREIGN_KEY_CHECKS=0` is in effect
453 /// (mysqldump preamble), the FK existence + arity check at
454 /// CREATE TABLE time is deferred. FKs referencing a
455 /// not-yet-existing parent land in `pending_foreign_keys`
456 /// keyed by child table; `SET FOREIGN_KEY_CHECKS=1` drains
457 /// the queue and resolves each FK against the now-complete
458 /// catalog. Empty by default; the queue is drained on every
459 /// `RESET ALL` too.
460 foreign_key_checks: bool,
461 /// v7.16.2 — true on the temp Engine an outer
462 /// `exec_select_with_meta_views` builds, telling that
463 /// temp engine "stop short-circuiting into the meta-view
464 /// path — your catalog already has the materialised
465 /// tables; just run the regular SELECT." Without this we'd
466 /// infinite-loop since the meta-view name (e.g.
467 /// `__spg_info_columns`) still triggers
468 /// `select_references_meta_view`.
469 meta_views_materialised: bool,
470 pending_foreign_keys: Vec<(alloc::string::String, spg_sql::ast::ForeignKeyConstraint)>,
471 /// v7.34 (crash-recovery P0 #2) — row-level redo capture. When the
472 /// embedding layer turns this on (persistence enabled), each mutating
473 /// `execute` records the physical [`RowChange`]s it applied; the
474 /// engine drains them into `last_redo` on success, and the embedded
475 /// layer reads them via [`Engine::take_redo`] to write the WAL in
476 /// place of the SQL text. Off (default) = zero capture overhead.
477 redo_capture: bool,
478 /// Redo captured by the most recent successful mutating `execute`,
479 /// awaiting drain by the embedding layer. Cleared on each capture.
480 last_redo: Vec<RowChange>,
481}
482
483/// v7.12.7 — hard cap on nested trigger-emitted embedded SQL
484/// fires. 16 deep is well past anything a normal trigger graph
485/// uses while still preventing infinite-loop wedging.
486const MAX_TRIGGER_RECURSION: u32 = 16;
487
488/// v6.5.6 — callback signature for slow-query log emission. Called
489/// with `(sql, elapsed_us)` once per successful execute that crosses
490/// the threshold.
491pub type SlowQueryLogger = fn(&str, u64);
492
493/// v6.5.2 — one row of `spg_stat_activity`. Engine-public so
494/// spg-server can construct rows without re-exporting internal
495/// dispatch types.
496#[derive(Debug, Clone)]
497pub struct ActivityRow {
498 pub pid: u32,
499 pub user: String,
500 pub started_at_us: i64,
501 pub current_sql: String,
502 pub wait_event: String,
503 pub elapsed_us: i64,
504 pub in_transaction: bool,
505 /// v7.17 Phase 2.4 — startup-param `application_name` (or the
506 /// last value the client sent via `SET application_name = '...'`).
507 /// Empty when the client never declared one.
508 pub application_name: String,
509}
510
511/// v6.5.2 — provider callback type. Fresh snapshot returned each
512/// call; engine doesn't cache the slice.
513pub type ActivityProvider = fn() -> Vec<ActivityRow>;
514
515/// v6.5.3 — one row of `spg_audit_chain`. Engine-public so
516/// spg-server can construct rows directly from `AuditEntry`.
517#[derive(Debug, Clone)]
518pub struct AuditRow {
519 pub seq: i64,
520 pub ts_ms: i64,
521 pub prev_hash_hex: String,
522 pub entry_hash_hex: String,
523 pub sql: String,
524}
525
526/// v6.5.3 — chain-table provider + verifier. spg-server registers
527/// fn pointers that snapshot / verify the audit log. `verify`
528/// returns `(verified_count, broken_at_seq)` — `broken_at_seq` is
529/// `-1` on a clean chain.
530pub type AuditChainProvider = fn() -> Vec<AuditRow>;
531pub type AuditVerifier = fn() -> (i64, i64);
532
533impl Engine {
534 pub fn new() -> Self {
535 Self {
536 catalog: Catalog::new(),
537 tx_catalogs: BTreeMap::new(),
538 current_tx: None,
539 backslash_escapes: false,
540 next_tx_id: 1,
541 clock: None,
542 salt_fn: None,
543 max_query_rows: None,
544 max_query_bytes: None,
545 users: UserStore::new(),
546 publications: publications::Publications::new(),
547 subscriptions: subscriptions::Subscriptions::new(),
548 statistics: statistics::Statistics::new(),
549 plan_cache: plan_cache::PlanCache::new(),
550 query_stats: query_stats::QueryStats::new(),
551 activity_provider: None,
552 audit_chain_provider: None,
553 audit_verifier: None,
554 slow_query_threshold_us: None,
555 slow_query_logger: None,
556 session_params: BTreeMap::new(),
557 trigger_recursion_depth: 0,
558 foreign_key_checks: true,
559 meta_views_materialised: false,
560 pending_foreign_keys: Vec::new(),
561 redo_capture: false,
562 last_redo: Vec::new(),
563 }
564 }
565
566 /// v7.11.0 — clone the engine's committed catalog + read-time
567 /// state into a frozen `CatalogSnapshot`. Cheap (`Catalog` is
568 /// backed by `PersistentVec`; cloning is O(log n) per table).
569 /// Subsequent writes to this engine are invisible to the
570 /// snapshot; the snapshot is self-contained and can be moved
571 /// to another thread for concurrent `execute_readonly_on_snapshot`
572 /// calls. The basis for [`AsyncReadHandle`] in spg-embedded-tokio
573 /// and any other read-fanout pattern.
574 #[must_use]
575 pub fn clone_snapshot(&self) -> CatalogSnapshot {
576 CatalogSnapshot {
577 catalog: self.active_catalog().clone(),
578 statistics: self.statistics.clone(),
579 clock: self.clock,
580 max_query_rows: self.max_query_rows,
581 }
582 }
583
584 /// Construct an engine restored from a previously-snapshotted catalog
585 /// (see `snapshot()`).
586 pub fn restore(catalog: Catalog) -> Self {
587 Self {
588 catalog,
589 tx_catalogs: BTreeMap::new(),
590 current_tx: None,
591 backslash_escapes: false,
592 next_tx_id: 1,
593 clock: None,
594 salt_fn: None,
595 max_query_rows: None,
596 max_query_bytes: None,
597 users: UserStore::new(),
598 publications: publications::Publications::new(),
599 subscriptions: subscriptions::Subscriptions::new(),
600 statistics: statistics::Statistics::new(),
601 plan_cache: plan_cache::PlanCache::new(),
602 query_stats: query_stats::QueryStats::new(),
603 activity_provider: None,
604 audit_chain_provider: None,
605 audit_verifier: None,
606 slow_query_threshold_us: None,
607 slow_query_logger: None,
608 session_params: BTreeMap::new(),
609 trigger_recursion_depth: 0,
610 foreign_key_checks: true,
611 meta_views_materialised: false,
612 pending_foreign_keys: Vec::new(),
613 redo_capture: false,
614 last_redo: Vec::new(),
615 }
616 }
617
618 /// Restore an engine + user table from a v4.1 envelope produced
619 /// by `snapshot_with_users()`. Falls back to plain catalog-only
620 /// restore if the envelope magic isn't present (so v3.x snapshot
621 /// files still load). v6.1.2 adds the optional publications
622 /// trailer (envelope v3); a v1/v2 envelope deserialises to an
623 /// empty publication table.
624 pub fn restore_envelope(buf: &[u8]) -> Result<Self, EngineError> {
625 match split_envelope(buf) {
626 EnvelopeParse::Pair {
627 catalog: catalog_bytes,
628 users: user_bytes,
629 publications: pub_bytes,
630 subscriptions: sub_bytes,
631 statistics: stats_bytes,
632 } => {
633 let catalog = Catalog::deserialize(catalog_bytes).map_err(EngineError::Storage)?;
634 let users = users::deserialize_users(user_bytes)
635 .map_err(|e| EngineError::Unsupported(alloc::format!("users restore: {e}")))?;
636 let publications = match pub_bytes {
637 Some(b) => publications::Publications::deserialize(b).map_err(|e| {
638 EngineError::Unsupported(alloc::format!("publications restore: {e:?}"))
639 })?,
640 None => publications::Publications::new(),
641 };
642 let subscriptions = match sub_bytes {
643 Some(b) => subscriptions::Subscriptions::deserialize(b).map_err(|e| {
644 EngineError::Unsupported(alloc::format!("subscriptions restore: {e:?}"))
645 })?,
646 None => subscriptions::Subscriptions::new(),
647 };
648 let statistics = match stats_bytes {
649 Some(b) => statistics::Statistics::deserialize(b).map_err(|e| {
650 EngineError::Unsupported(alloc::format!("statistics restore: {e:?}"))
651 })?,
652 None => statistics::Statistics::new(),
653 };
654 Ok(Self {
655 catalog,
656 tx_catalogs: BTreeMap::new(),
657 current_tx: None,
658 backslash_escapes: false,
659 next_tx_id: 1,
660 clock: None,
661 salt_fn: None,
662 max_query_rows: None,
663 max_query_bytes: None,
664 users,
665 publications,
666 subscriptions,
667 statistics,
668 plan_cache: plan_cache::PlanCache::new(),
669 query_stats: query_stats::QueryStats::new(),
670 activity_provider: None,
671 audit_chain_provider: None,
672 audit_verifier: None,
673 slow_query_threshold_us: None,
674 slow_query_logger: None,
675 session_params: BTreeMap::new(),
676 trigger_recursion_depth: 0,
677 foreign_key_checks: true,
678 meta_views_materialised: false,
679 pending_foreign_keys: Vec::new(),
680 redo_capture: false,
681 last_redo: Vec::new(),
682 })
683 }
684 EnvelopeParse::CrcMismatch { expected, computed } => {
685 Err(EngineError::Storage(StorageError::Corrupt(alloc::format!(
686 "snapshot envelope CRC32 mismatch (expected={expected:#010x}, computed={computed:#010x})"
687 ))))
688 }
689 EnvelopeParse::Bare => {
690 let catalog = Catalog::deserialize(buf).map_err(EngineError::Storage)?;
691 Ok(Self::restore(catalog))
692 }
693 }
694 }
695
696 pub const fn users(&self) -> &UserStore {
697 &self.users
698 }
699
700 /// Builder: attach a wall clock so `NOW()` / `CURRENT_TIMESTAMP` /
701 /// `CURRENT_DATE` evaluate to a real value instead of erroring out.
702 #[must_use]
703 pub const fn with_clock(mut self, clock: ClockFn) -> Self {
704 self.clock = Some(clock);
705 self
706 }
707
708 /// Builder: attach an OS-backed RNG for per-user password salts.
709 /// The host (`spg-server`) typically wires this to `/dev/urandom`.
710 #[must_use]
711 pub const fn with_salt_fn(mut self, f: SaltFn) -> Self {
712 self.salt_fn = Some(f);
713 self
714 }
715
716 /// Builder: cap the number of rows a single SELECT may return.
717 /// Exceeding the cap raises `EngineError::RowLimitExceeded` —
718 /// the bound is checked inside the executor so a runaway
719 /// catalog scan can't allocate millions of rows before the
720 /// server gets a chance to reject the result.
721 #[must_use]
722 pub const fn with_max_query_rows(mut self, n: usize) -> Self {
723 self.max_query_rows = Some(n);
724 self
725 }
726
727 /// Builder: cap the approximate heap bytes a single SELECT's
728 /// join/filter materialisation may hold. Exceeding the cap
729 /// raises `EngineError::QueryBytesExceeded`. Rows are the wrong
730 /// unit when one row carries a multi-MB body (mailrs round-26:
731 /// 1000-row batches of full mail text walked a 15 GiB host into
732 /// reclaim livelock without ever tripping a row ceiling).
733 #[must_use]
734 pub const fn with_max_query_bytes(mut self, n: usize) -> Self {
735 self.max_query_bytes = Some(n);
736 self
737 }
738
739 /// The *committed* catalog. Note: during a transaction this returns the
740 /// pre-TX state — `SELECT` inside a TX goes through `execute()` and reads
741 /// the shadow. Tests that inspect outside-TX state should use this.
742 pub const fn catalog(&self) -> &Catalog {
743 &self.catalog
744 }
745
746 /// Capture a frozen view of the committed engine state. Catalog
747 /// is O(1) Arc bump; trailers are cheap clones. Decouples "capture"
748 /// (needs &Engine) from "serialize" (CPU, no engine access) — the
749 /// seam the background-checkpoint worker rides in CoW-2.
750 pub fn snapshot_data(&self) -> EngineSnapshot {
751 EngineSnapshot {
752 catalog: self.catalog.clone(),
753 users: self.users.clone(),
754 publications: self.publications.clone(),
755 subscriptions: self.subscriptions.clone(),
756 statistics: self.statistics.clone(),
757 }
758 }
759
760 /// Serialize the *committed* catalog to bytes. v0.6 was full-snapshot; v0.9
761 /// adds the rule that an open TX's shadow is never snapshotted — only the
762 /// post-COMMIT state is persisted. v4.1 wraps the catalog in an envelope
763 /// when there are users to persist; an empty user table snapshots as the
764 /// bare catalog format (backwards-compat with v3.x readers). v6.1.2
765 /// adds publications to the envelope condition: either non-empty
766 /// users OR non-empty publications now triggers the envelope path.
767 pub fn snapshot(&self) -> Vec<u8> {
768 self.snapshot_data().serialize()
769 }
770
771 /// True when at least one TX slot is in flight. v4.41.1 runtime
772 /// invariant: at most one slot active at a time (dispatch holds
773 /// `engine.write()` across the entire wrap). v4.42 will let this
774 /// return true with multiple slots concurrently.
775 pub fn in_transaction(&self) -> bool {
776 !self.tx_catalogs.is_empty()
777 }
778
779 /// v4.41.1 allocate a fresh TX handle. Used by spg-server dispatch
780 /// to scope each implicit-wrap BEGIN..stmt..COMMIT to its own slot
781 /// in `tx_catalogs`. v4.42 — the commit-barrier leader allocates
782 /// one of these per task in its group, runs `BEGIN`+sql+`COMMIT`
783 /// sequentially under a single `engine.write()` so each task's
784 /// mutations accumulate into shared state, then either keeps the
785 /// accumulated state (fsync OK) or restores the pre-image via
786 /// `replace_catalog` (fsync err).
787 pub fn alloc_tx_id(&mut self) -> TxId {
788 let id = TxId(self.next_tx_id);
789 self.next_tx_id = self.next_tx_id.saturating_add(1);
790 id
791 }
792
793 /// v4.42 — atomically replace the live catalog. Used by the
794 /// commit-barrier leader to roll back a group whose batched
795 /// fsync failed: the leader snapshots `engine.catalog().clone()`
796 /// (O(1) Arc bump after the v4.39/v4.40 persistent migration)
797 /// at group start, sequentially applies each task's BEGIN+sql+
798 /// COMMIT under the same write lock to accumulate mutations
799 /// into shared state, batches the WAL bytes, fsyncs once, and
800 /// on failure calls this with the pre-image to undo every
801 /// task in the group at once.
802 ///
803 /// **Does NOT touch `tx_catalogs` / `current_tx`.** Any
804 /// explicit-TX slot from a concurrent client (created via the
805 /// legacy `IMPLICIT_TX`-less dispatch path or via the future
806 /// MVCC-readers v5+ work) has its own snapshot baked into the
807 /// slot — restoring `self.catalog` to the pre-image leaves
808 /// those slots untouched, exactly as they were when the leader
809 /// took the lock. The leader's own implicit-TX slots are all
810 /// already discarded (`exec_commit` removed them as each
811 /// task's COMMIT ran) by the time this is reached.
812 pub fn replace_catalog(&mut self, catalog: Catalog) {
813 self.catalog = catalog;
814 }
815
816 /// v6.7.0 — public shim around `Catalog::freeze_oldest_to_cold`
817 /// so tests + the spg-server freezer can drive a freeze without
818 /// reaching into the private `active_catalog_mut`. v6.7.4
819 /// parallel freezer will build on this surface.
820 ///
821 /// Marks the table's cached `cold_row_count` stale because the
822 /// freeze added cold locators that ANALYZE hasn't yet refreshed.
823 pub fn freeze_oldest_to_cold(
824 &mut self,
825 table_name: &str,
826 index_name: &str,
827 max_rows: usize,
828 ) -> Result<spg_storage::FreezeReport, EngineError> {
829 let report = self
830 .active_catalog_mut()
831 .freeze_oldest_to_cold(table_name, index_name, max_rows)
832 .map_err(EngineError::Storage)?;
833 if let Some(t) = self.active_catalog_mut().get_mut(table_name) {
834 t.mark_cold_row_count_stale();
835 }
836 Ok(report)
837 }
838
839 /// v6.7.5 — public shim used by the spg-server follower's
840 /// segment-forwarding receiver. Registers a cold-tier segment
841 /// at a specific id (the master's id, as transmitted on the
842 /// wire) so the follower's BTree-Cold locators stay byte-
843 /// identical with the master's. Wraps
844 /// `Catalog::load_segment_bytes_at` under the standard
845 /// clone-mutate-replace pattern.
846 ///
847 /// Returns `Ok(())` on success **and** on the "slot already
848 /// occupied" case — a follower mid-reconnect may receive a
849 /// segment chunk for a segment_id it already has on disk
850 /// (forwarded last session); the caller should treat that
851 /// path as a no-op rather than a fatal error.
852 pub fn receive_cold_segment(
853 &mut self,
854 segment_id: u32,
855 bytes: Vec<u8>,
856 ) -> Result<(), EngineError> {
857 let mut new_cat = self.catalog.clone();
858 match new_cat.load_segment_bytes_at(segment_id, bytes) {
859 Ok(()) => {
860 self.replace_catalog(new_cat);
861 Ok(())
862 }
863 Err(StorageError::Corrupt(msg)) if msg.contains("already occupied") => Ok(()),
864 Err(e) => Err(EngineError::Storage(e)),
865 }
866 }
867
868 pub(crate) fn active_catalog(&self) -> &Catalog {
869 match self.current_tx {
870 Some(t) => self
871 .tx_catalogs
872 .get(&t)
873 .map_or(&self.catalog, |s| &s.catalog),
874 None => &self.catalog,
875 }
876 }
877
878 fn active_catalog_mut(&mut self) -> &mut Catalog {
879 let tx = self.current_tx;
880 match tx {
881 Some(t) => match self.tx_catalogs.get_mut(&t) {
882 Some(s) => &mut s.catalog,
883 None => &mut self.catalog,
884 },
885 None => &mut self.catalog,
886 }
887 }
888
889 /// v7.34 (crash-recovery P0 #2) — turn row-level redo capture on/off.
890 /// The embedding layer enables it when persistence is on so each
891 /// mutating `execute` records the physical [`RowChange`]s it applied
892 /// (drained via [`Engine::take_redo`]). Off = zero capture overhead.
893 pub fn set_redo_capture(&mut self, on: bool) {
894 self.redo_capture = on;
895 }
896
897 /// v7.34 — take the redo captured by the most recent successful
898 /// mutating `execute` (empty when capture is off, the statement was a
899 /// read, or it changed nothing). The embedding layer writes these to
900 /// the WAL in place of the SQL text.
901 pub fn take_redo(&mut self) -> Vec<RowChange> {
902 core::mem::take(&mut self.last_redo)
903 }
904
905 /// v7.34 (crash-recovery P0 #2) — replay a row-level redo log onto the
906 /// committed catalog (the row-level WAL recovery primitive: apply the
907 /// captured physical changes from a checkpoint baseline, in place of
908 /// re-executing the SQL). Trusts the log — no uniqueness/FK/parse.
909 pub fn apply_redo(&mut self, changes: &[RowChange]) -> Result<(), EngineError> {
910 self.catalog
911 .apply_redo(changes)
912 .map_err(EngineError::Storage)
913 }
914
915 /// Read-only execute path. Succeeds for `SELECT` / `SHOW TABLES`
916 /// / `SHOW COLUMNS`; returns `EngineError::WriteRequired` for
917 /// every other statement, so the caller can fall through to the
918 /// `&mut self` `execute` path under a write lock. Engine state is
919 /// not mutated even on the success path (`rewrite_clock_calls`
920 /// and `resolve_order_by_position` both mutate the locally-owned
921 /// AST, not `self`).
922 ///
923 /// v4.2: cap result-set size. Applied after the executor
924 /// materialises rows but before they leave the engine — wrapping
925 /// every Rows-returning exec_* function would scatter the check.
926 ///
927 /// v7.31 (memory campaign, bucket A) — the same choke point now
928 /// also enforces the BYTE budget on the final result set, so
929 /// single-table and aggregate paths (which don't route through
930 /// the join materialiser's incremental accounting) still cannot
931 /// hand the host an unbounded result. Intermediate single-table
932 /// clones are the 7.31.x follow-up (design doc, bucket A).
933 fn enforce_row_limit(
934 &self,
935 result: Result<QueryResult, EngineError>,
936 ) -> Result<QueryResult, EngineError> {
937 if let Ok(QueryResult::Rows { rows, .. }) = &result {
938 if let Some(cap) = self.max_query_rows
939 && rows.len() > cap
940 {
941 return Err(EngineError::RowLimitExceeded(cap));
942 }
943 if let Some(byte_cap) = self.max_query_bytes
944 && approx_rows_bytes(rows) > byte_cap
945 {
946 return Err(EngineError::QueryBytesExceeded(byte_cap));
947 }
948 }
949 result
950 }
951}
952
953/// v7.31 (memory campaign — ceiling-first / never-die, design v1) —
954/// per-table slice of the engine's resident-memory accounting.
955/// `hot_encoded_bytes` is the storage layer's maintained meter (what
956/// the rows encode to); `approx_resident_bytes` is what they COST in
957/// RAM (per-cell enum slots + heap payloads via `approx_row_bytes`)
958/// — the gap between the two is the representation multiplier the
959/// round-26 report measured at ~11× end-to-end.
960#[derive(Debug, Clone)]
961pub struct TableMemoryStats {
962 pub name: String,
963 pub hot_rows: u64,
964 /// Cached cold-row count (refreshed by ANALYZE — see
965 /// `Table::cold_row_count`'s staleness contract).
966 pub cold_rows: u64,
967 pub hot_encoded_bytes: u64,
968 pub approx_resident_bytes: u64,
969 pub index_count: u64,
970 /// v7.31 C2 — sum of `IndexKind::approx_resident_bytes()` over the
971 /// table's indices: every variant (BTree / NSW / BRIN / GIN family)
972 /// walks its own structure, so the GIN posting lists and NSW layer
973 /// adjacency that dominate text/vector tables are counted honestly
974 /// instead of the old flat-token estimate.
975 pub approx_index_bytes: u64,
976}
977
978/// v7.31 — whole-engine memory snapshot: the polling form of the
979/// round-26 ask-4 watermark signal. Hosts compare
980/// `total_approx_resident_bytes` (+ their own WAL/file accounting)
981/// against their deployment ceiling and shed/shrink before the
982/// kernel does it for them.
983#[derive(Debug, Clone)]
984pub struct MemoryStats {
985 pub tables: Vec<TableMemoryStats>,
986 pub total_hot_encoded_bytes: u64,
987 pub total_approx_resident_bytes: u64,
988 pub total_approx_index_bytes: u64,
989 /// The active per-query materialisation budget (bucket A), so a
990 /// monitoring host sees ceiling and usage through one call.
991 pub max_query_bytes: Option<usize>,
992 /// v7.31 C2 — bucket D: live WAL bytes (active chunk + buffered,
993 /// uncheckpointed). `None` from the engine itself — it has no WAL;
994 /// the durable hosts (embed `Database`, server) fill it in from
995 /// their own WAL accounting. `Some(0)` means "host has a WAL and
996 /// it is empty"; `None` means "no WAL on this path" (in-memory).
997 pub wal_bytes: Option<u64>,
998}
999
1000/// v6.2.0 — true for engine-managed catalog tables that the bare
1001/// `ANALYZE` (no target) should skip. v6.2.0 has no internal
1002/// tables yet (publications / subscriptions / users / statistics
1003/// all live as engine fields, not catalog tables), so this is a
1004/// reserved future-proofing hook — every existing user table is
1005/// analysed.
1006const fn is_internal_table_name(_name: &str) -> bool {
1007 false
1008}
1009
1010#[cfg(test)]
1011mod tests;