spg_engine/lib.rs
1//! SPG execution engine — v0.3 wires the SQL front-end to the in-memory
2//! storage layer. Implements `CREATE TABLE`, single-row `INSERT VALUES`, and
3//! `SELECT * FROM <table>` (no WHERE yet — that lands in v0.4 alongside
4//! expression evaluation against rows).
5#![no_std]
6
7extern crate alloc;
8
9pub mod aggregate;
10mod bytebudget;
11mod cancel;
12mod clock;
13mod constraints;
14mod conversions;
15pub mod copy;
16mod ddl;
17pub mod describe;
18mod dml;
19mod envelope;
20pub mod eval;
21mod execute;
22mod explain;
23mod expr_analysis;
24pub mod fts;
25mod index_access;
26mod join;
27pub mod json;
28mod maintenance;
29pub mod memoize;
30mod numeric;
31mod orderby;
32pub mod plan_cache;
33mod plpgsql;
34pub mod publications;
35pub mod query_stats;
36mod readonly;
37pub mod reorder;
38mod select;
39pub mod selectivity;
40mod sequence;
41mod session;
42mod show;
43mod spg_admin;
44pub mod statistics;
45mod subquery;
46pub mod subscriptions;
47mod substitute;
48mod system_catalog;
49mod table_access;
50mod transaction;
51pub mod triggers;
52pub mod users;
53mod window;
54
55pub use crate::users::{Role, ScramSecrets, UserError, UserStore};
56pub use cancel::{CancelToken, MonotonicNowFn};
57
58use bytebudget::*;
59pub(crate) use clock::{rewrite_clock_calls, value_to_literal};
60use constraints::*;
61use conversions::*;
62pub use conversions::{
63 format_bigint_2d_text_pub, format_hstore_text, format_int_2d_text_pub, format_range_text,
64 format_text_2d_text_pub,
65};
66pub(crate) use ddl::{
67 canonicalize_set_value, enforce_enum_label, eval_runtime_default_free,
68 resolve_column_default_free,
69};
70pub(crate) use envelope::{EnvelopeParse, build_envelope, split_envelope};
71use expr_analysis::*;
72use index_access::*;
73pub(crate) use orderby::{
74 apply_offset_and_limit, apply_offset_and_limit_tagged, build_order_keys, canonical_value_repr,
75 expand_group_by_all, order_by_value_cmp, partial_sort_tagged, render_histogram_bounds,
76 resolve_order_by_position, sort_by_keys, sort_values_for_histogram, value_cmp, value_to_f64,
77};
78pub(crate) use select::{build_projection, infer_column_types, value_to_order_key};
79pub(crate) use show::render_create_table;
80pub(crate) use subquery::{
81 build_in_list_set, collect_scalar_subqueries, expr_has_subquery, expr_tree_has_subquery,
82};
83pub use substitute::substitute_placeholders;
84use substitute::*;
85use system_catalog::*;
86use window::*;
87
88use alloc::collections::BTreeMap;
89use alloc::string::String;
90use alloc::vec::Vec;
91use core::fmt;
92
93// v7.16.0 — re-export the parsed-statement AST so downstream
94// crates (spg-embedded → spg-sqlx) don't need a direct dep on
95// spg-sql for the prepare/bind handle.
96pub use spg_sql::ast::Statement as ParsedStatement;
97use spg_sql::parser::ParseError;
98use spg_storage::{Catalog, ColumnSchema, Row, RowChange, StorageError};
99
100use crate::eval::EvalError;
101
102/// Result of executing one statement.
103#[derive(Debug, Clone, PartialEq)]
104#[non_exhaustive]
105pub enum QueryResult {
106 /// DDL or DML succeeded.
107 ///
108 /// `affected` is the row count for `INSERT` and 0 elsewhere.
109 /// `modified_catalog` tells the server whether this statement
110 /// caused the *committed* catalog to change — it's the signal to
111 /// snapshot/audit. False for `BEGIN`/`ROLLBACK`, false for writeful
112 /// statements executed inside a transaction (those only touch the
113 /// shadow), and true for `COMMIT` and for writes outside a TX.
114 CommandOk {
115 affected: usize,
116 modified_catalog: bool,
117 },
118 /// `SELECT` returned a (possibly empty) row set.
119 Rows {
120 columns: Vec<ColumnSchema>,
121 rows: Vec<Row>,
122 },
123}
124
125/// All errors the engine can return.
126///
127/// Marked `#[non_exhaustive]` from v7.5.0 onward: external `match`
128/// must include a `_` arm so new variants in subsequent v7.x releases
129/// are not breaking changes.
130#[derive(Debug, Clone, PartialEq)]
131#[non_exhaustive]
132pub enum EngineError {
133 Parse(ParseError),
134 Storage(StorageError),
135 Eval(EvalError),
136 /// Front-end accepted a construct that the v0.x executor doesn't support.
137 Unsupported(String),
138 /// `BEGIN` while another transaction is already open.
139 TransactionAlreadyOpen,
140 /// `COMMIT` / `ROLLBACK` with no active transaction.
141 NoActiveTransaction,
142 /// v4.0 sentinel: `execute_readonly` got a statement that
143 /// mutates engine state (INSERT / CREATE / BEGIN / COMMIT / …).
144 /// The caller should retake the write lock and dispatch through
145 /// `execute(&mut self)` instead.
146 WriteRequired,
147 /// v4.2: a SELECT would have returned more rows than the
148 /// configured `max_query_rows` cap. Carries the cap.
149 RowLimitExceeded(usize),
150 /// v7.30.3 (mailrs round-26): a SELECT's join/filter
151 /// materialisation would have held more (approximate) heap
152 /// bytes than the configured `max_query_bytes` cap. The row
153 /// cap above counts rows; this counts bytes, because one row
154 /// can be a multi-MB mail body — 1000 fat rows pressure the
155 /// host long before any row ceiling trips. Carries the cap.
156 QueryBytesExceeded(usize),
157 /// v4.5: cooperative cancellation — the host (server's
158 /// per-query watchdog) set the cancel flag while a long-running
159 /// SELECT / UPDATE / DELETE was scanning rows. The partial work
160 /// is discarded; the caller should surface this as a timeout
161 /// to the client.
162 Cancelled,
163}
164
165impl fmt::Display for EngineError {
166 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
167 match self {
168 Self::Parse(e) => write!(f, "parse: {e}"),
169 Self::Storage(e) => write!(f, "storage: {e}"),
170 Self::Eval(e) => write!(f, "eval: {e}"),
171 Self::Unsupported(s) => write!(f, "unsupported: {s}"),
172 Self::TransactionAlreadyOpen => f.write_str("a transaction is already open"),
173 Self::NoActiveTransaction => f.write_str("no active transaction"),
174 Self::WriteRequired => {
175 f.write_str("statement requires a write lock (use execute, not execute_readonly)")
176 }
177 Self::RowLimitExceeded(n) => {
178 write!(f, "query exceeded max_query_rows={n}")
179 }
180 Self::QueryBytesExceeded(n) => {
181 write!(
182 f,
183 "query materialisation exceeded max_query_bytes={n} (set SPG_MAX_QUERY_BYTES to raise, 0 to disable)"
184 )
185 }
186 Self::Cancelled => f.write_str("query cancelled (timeout or client request)"),
187 }
188 }
189}
190
191impl From<ParseError> for EngineError {
192 fn from(e: ParseError) -> Self {
193 Self::Parse(e)
194 }
195}
196impl From<StorageError> for EngineError {
197 fn from(e: StorageError) -> Self {
198 Self::Storage(e)
199 }
200}
201impl From<EvalError> for EngineError {
202 fn from(e: EvalError) -> Self {
203 Self::Eval(e)
204 }
205}
206
207/// The execution engine. Holds the catalog and (later) other server-scope
208/// state. `Engine::new()` is intentionally cheap so callers can construct one
209/// per database, per test.
210/// Function pointer that returns "now" as microseconds since Unix
211/// epoch. The engine is `no_std`, so it can't reach for `std::time`
212/// itself — callers (`spg-server`, the sqllogictest runner) inject a
213/// concrete implementation. `None` means `NOW()` / `CURRENT_*` raise
214/// `Unsupported`.
215pub type ClockFn = fn() -> i64;
216
217/// Function pointer that produces 16 cryptographically random bytes.
218/// Like `ClockFn`, the engine is `no_std` and can't reach for /dev/urandom
219/// itself — host (`spg-server`) injects an OS-backed source. `None`
220/// means SQL-driven `CREATE USER` falls back to a deterministic salt
221/// derived from the username (acceptable in tests; the server always
222/// installs a real RNG so production paths never see this).
223pub type SaltFn = fn() -> [u8; 16];
224
225/// v4.5 cooperative cancellation token. A long-running SELECT /
226/// UPDATE / DELETE checks `is_cancelled` at row-loop checkpoints
227/// and bails with `EngineError::Cancelled`. The host
228/// (`spg-server`) creates an `AtomicBool` per query, spawns a
229/// watchdog thread that sets it after `SPG_QUERY_TIMEOUT_MS`,
230/// and passes it via `execute_with_cancel` / `execute_readonly_with_cancel`.
231///
232/// `CancelToken::none()` is a no-op — used by the legacy `execute`
233/// and `execute_readonly` entry points so existing callers don't
234/// change.
235/// v4.41.1 opaque transaction handle. Returned by `Engine::alloc_tx_id`,
236/// threaded through `Engine::execute_in` so dispatch can identify which
237/// in-flight TX a statement belongs to. `IMPLICIT_TX` is the reserved
238/// slot every legacy caller — engine self-tests, spg-cli, spg-embedded,
239/// startup replay — implicitly uses through the unchanged
240/// `Engine::execute(sql)` API. v4.41.1 keeps at most one active slot at
241/// runtime (dispatch holds `engine.write()` across the wrap, same as
242/// v4.34); the map shape is here to let v4.42 turn on N in-flight
243/// implicit TXs without reshuffling the engine internals.
244#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
245pub struct TxId(pub u64);
246
247/// Reserved slot used by `Engine::execute(sql)` — the legacy single-
248/// global-shadow path. New `alloc_tx_id` handles start at 1.
249pub const IMPLICIT_TX: TxId = TxId(0);
250
251/// v6.7.3 — default segment-size threshold used by `COMPACT COLD
252/// SEGMENTS` when no explicit target is supplied. Segments whose
253/// `OwnedSegment::bytes().len()` is **strictly** less than this
254/// value are eligible to merge. spg-server reads
255/// `SPG_COMPACTION_TARGET_SEGMENT_BYTES` to override.
256pub const COMPACTION_TARGET_DEFAULT_BYTES: u64 = 4 * 1024 * 1024;
257
258/// Per-slot transaction state. Held inside `tx_catalogs[tx_id]` for the
259/// lifetime of a BEGIN..COMMIT (or BEGIN..ROLLBACK) window. Drops when
260/// the TX commits (its `catalog` is moved over `Engine.catalog`) or
261/// rolls back (slot removed, catalog discarded).
262#[derive(Debug, Default, Clone)]
263struct TxState {
264 /// The TX's shadow copy of the catalog. Started as a clone of
265 /// `Engine.catalog` at BEGIN time; writes flow into it; COMMIT
266 /// installs it over `Engine.catalog`. `Catalog::clone()` is O(1)
267 /// since v4.40 (`PersistentVec` rows + `PersistentBTreeMap` indices).
268 catalog: Catalog,
269 /// Per-TX savepoint stack. Each entry pairs the savepoint name with
270 /// a clone of `catalog` at the moment `SAVEPOINT <name>` fired.
271 /// `ROLLBACK TO <name>` restores from the entry and pops everything
272 /// after it; `RELEASE <name>` discards the entry and everything
273 /// after; COMMIT/ROLLBACK clears the whole stack.
274 savepoints: Vec<(String, Catalog)>,
275}
276
277/// v7.11.0 — frozen read-only view of the engine's committed state.
278/// Constructed via [`Engine::clone_snapshot`]. Holds clones of the
279/// catalog, statistics, clock function, and row-cap config — the
280/// four fields the `execute_readonly` path actually reads. Cheap to
281/// `Clone` (each clone shares the underlying `PersistentVec` row
282/// storage; only the trie root pointers copy). Send + Sync so a
283/// snapshot can be moved across `tokio::task::spawn_blocking`
284/// boundaries without coordination.
285///
286/// The contract: a snapshot reflects the engine's state at the
287/// moment `clone_snapshot()` returned. Subsequent writes to the
288/// engine are NOT visible. Callers who need fresher data take a
289/// new snapshot.
290#[derive(Debug, Clone)]
291pub struct CatalogSnapshot {
292 catalog: Catalog,
293 statistics: statistics::Statistics,
294 clock: Option<ClockFn>,
295 max_query_rows: Option<usize>,
296}
297
298/// CoW-1 (v7.34) — frozen view of the *persisted* committed engine
299/// state. Carries every field the `snapshot()` envelope serializes;
300/// `Clone` is O(1) on the catalog (Arc bump) and cheap typed-clones
301/// on the trailers. Decouples "capture state" from "serialize bytes"
302/// so the background-checkpoint worker can hold the snapshot and
303/// produce bytes off the engine write lock.
304#[derive(Debug, Clone)]
305pub struct EngineSnapshot {
306 catalog: Catalog,
307 users: UserStore,
308 publications: publications::Publications,
309 subscriptions: subscriptions::Subscriptions,
310 statistics: statistics::Statistics,
311}
312
313impl EngineSnapshot {
314 /// Same envelope rules as `Engine::snapshot()`: bare catalog when
315 /// every trailer is empty, full envelope otherwise.
316 pub fn serialize(&self) -> Vec<u8> {
317 if self.users.is_empty()
318 && self.publications.is_empty()
319 && self.subscriptions.is_empty()
320 && self.statistics.is_empty()
321 {
322 self.catalog.serialize()
323 } else {
324 build_envelope(
325 &self.catalog.serialize(),
326 &users::serialize_users(&self.users),
327 &self.publications.serialize(),
328 &self.subscriptions.serialize(),
329 &self.statistics.serialize(),
330 )
331 }
332 }
333}
334
335// The engine carries several independent session/capture flags (dialect,
336// FK-checks, meta-view materialisation, redo capture); they're orthogonal
337// switches, not a state enum begging to be modelled.
338#[allow(clippy::struct_excessive_bools)]
339#[derive(Debug, Default)]
340pub struct Engine {
341 /// Committed catalog — what survives `Engine::snapshot()` and what
342 /// outside-TX `SELECT`s read.
343 catalog: Catalog,
344 /// Active TX slots, keyed by `TxId`. Empty when no TX is in flight.
345 /// v4.41.1 runtime invariant: at most one entry (single-writer
346 /// model unchanged). v4.42 will let dispatch hold multiple entries
347 /// concurrently for group commit + engine MVCC.
348 tx_catalogs: BTreeMap<TxId, TxState>,
349 /// Which slot the next exec_* call should mutate. Set by
350 /// `execute_in(sql, tx_id)` at the entry point; legacy `execute(sql)`
351 /// sets it to `IMPLICIT_TX`. None when no TX is in flight (read /
352 /// write goes straight against `catalog`).
353 current_tx: Option<TxId>,
354 /// Monotonic counter for `alloc_tx_id`. Starts at 1 — slot 0 is
355 /// reserved for `IMPLICIT_TX`.
356 next_tx_id: u64,
357 /// v7.22 (round-13 T3) — session string-literal dialect. `false`
358 /// (default) = PG semantics (backslash literal, `''` escape);
359 /// `true` = MySQL semantics (`\'` etc.). Flipped by the
360 /// deterministic session signals each dump emits: `SET sql_mode`
361 /// (only MySQL clients/dumps send it) turns it on,
362 /// `SET standard_conforming_strings = on` (every pg_dump
363 /// preamble) turns it off. The plan cache is cleared on every
364 /// flip — the same SQL text lexes differently per dialect.
365 backslash_escapes: bool,
366 /// Optional wall clock used to satisfy `NOW()` / `CURRENT_TIMESTAMP`
367 /// / `CURRENT_DATE`. Set by the host environment.
368 clock: Option<ClockFn>,
369 /// v4.1 cryptographic RNG for per-user password salt. Set by the
370 /// host. `None` means SQL-driven `CREATE USER` uses a
371 /// deterministic fallback — see `SaltFn`.
372 salt_fn: Option<SaltFn>,
373 /// v4.2 per-query row cap. `None` = unlimited. When set, a
374 /// SELECT that materialises more than `n` rows returns
375 /// `EngineError::RowLimitExceeded`. Enforced before the result
376 /// is shaped into wire frames so a runaway scan can't blow the
377 /// server's heap.
378 max_query_rows: Option<usize>,
379 /// v7.30.3 (mailrs round-26) per-query byte cap on join/filter
380 /// materialisation. `None` = unlimited. Approximate net
381 /// accounting (Value heap payloads + per-cell enum overhead)
382 /// charged at every point the join pipeline clones rows;
383 /// crossing the cap raises `EngineError::QueryBytesExceeded`
384 /// instead of pressuring the host into reclaim livelock. The
385 /// host wires this to `SPG_MAX_QUERY_BYTES` (embed defaults it
386 /// ON; the server keeps its allocator-precise budget as the
387 /// outer layer).
388 pub(crate) max_query_bytes: Option<usize>,
389 /// v4.1 RBAC user table. Empty means "no RBAC configured yet" —
390 /// the server decides what that means at the auth boundary
391 /// (open mode vs legacy single-password mode). User CRUD goes
392 /// through `create_user`/`drop_user`/`verify_user`; persistence
393 /// rides the snapshot envelope alongside the catalog.
394 pub(crate) users: UserStore,
395 /// v6.1.2 logical-replication publication catalog. Empty until
396 /// `CREATE PUBLICATION` runs. Persistence rides the v3 envelope
397 /// trailer (see `build_envelope`).
398 publications: publications::Publications,
399 /// v6.1.4 logical-replication subscription catalog. Empty until
400 /// `CREATE SUBSCRIPTION` runs. Persistence rides the v4 envelope
401 /// trailer.
402 subscriptions: subscriptions::Subscriptions,
403 /// v6.2.0 — per-column statistics for the cost-based optimizer.
404 /// Populated by `ANALYZE`; queried via `spg_statistic` virtual
405 /// table. Persistence rides the v5 envelope trailer.
406 statistics: statistics::Statistics,
407 /// v6.3.0 — engine-level plan cache. Caches the post-`prepare()`
408 /// `Statement` keyed on SQL text. In-memory only — does NOT ride
409 /// the snapshot envelope (rebuilt on demand after restart).
410 plan_cache: plan_cache::PlanCache,
411 /// v6.5.1 — per-distinct-SQL execution stats. In-memory only,
412 /// surfaced via `spg_stat_query` virtual table. Updated by the
413 /// `execute_*` paths after a successful execute.
414 query_stats: query_stats::QueryStats,
415 /// v6.5.2 — connection-state provider callback. spg-server
416 /// registers a function at startup that snapshots its
417 /// per-pgwire-connection registry into `ActivityRow`s; engine
418 /// reads through it on every `SELECT * FROM spg_stat_activity`.
419 /// `None` ⇒ no-data (returns empty rows; matches the no_std
420 /// embedded callers that don't run pgwire).
421 activity_provider: Option<ActivityProvider>,
422 /// v6.5.3 — audit-chain provider + verifier. Same pattern as
423 /// activity_provider: spg-server registers both at startup;
424 /// engine reads through on `SELECT * FROM spg_audit_chain` and
425 /// `SELECT * FROM spg_audit_verify`. `None` ⇒ no-data.
426 audit_chain_provider: Option<AuditChainProvider>,
427 audit_verifier: Option<AuditVerifier>,
428 /// v6.5.6 — slow-query log threshold in microseconds. When set,
429 /// every successful execute whose elapsed exceeds the threshold
430 /// gets fed to the registered slow-query log callback (so
431 /// spg-server can emit a structured log line). Default `None`
432 /// = no slow-query logging.
433 slow_query_threshold_us: Option<u64>,
434 slow_query_logger: Option<SlowQueryLogger>,
435 /// v7.12.1 — session parameters set via `SET <name> = <value>`.
436 /// Only `default_text_search_config` is consumed by the engine
437 /// today (the FTS function dispatcher reads it when
438 /// `to_tsvector(text)` is called without an explicit config).
439 /// All other names are accepted + recorded so PG-dump output
440 /// loads, but have no behavioural effect.
441 pub(crate) session_params: BTreeMap<String, String>,
442 /// v7.12.7 — depth counter for trigger-emitted embedded SQL.
443 /// Each time the engine executes a `DeferredEmbeddedStmt` it
444 /// increments this; the recursive `execute_stmt_with_cancel`
445 /// inside that path checks against [`MAX_TRIGGER_RECURSION`]
446 /// to bound runaway cascades (trigger A's UPDATE on table B
447 /// fires trigger B which UPDATEs table A which fires trigger
448 /// A again…). Reset to 0 once the original DML returns.
449 trigger_recursion_depth: u32,
450 /// v7.14.0 — when `SET FOREIGN_KEY_CHECKS=0` is in effect
451 /// (mysqldump preamble), the FK existence + arity check at
452 /// CREATE TABLE time is deferred. FKs referencing a
453 /// not-yet-existing parent land in `pending_foreign_keys`
454 /// keyed by child table; `SET FOREIGN_KEY_CHECKS=1` drains
455 /// the queue and resolves each FK against the now-complete
456 /// catalog. Empty by default; the queue is drained on every
457 /// `RESET ALL` too.
458 foreign_key_checks: bool,
459 /// v7.16.2 — true on the temp Engine an outer
460 /// `exec_select_with_meta_views` builds, telling that
461 /// temp engine "stop short-circuiting into the meta-view
462 /// path — your catalog already has the materialised
463 /// tables; just run the regular SELECT." Without this we'd
464 /// infinite-loop since the meta-view name (e.g.
465 /// `__spg_info_columns`) still triggers
466 /// `select_references_meta_view`.
467 meta_views_materialised: bool,
468 pending_foreign_keys: Vec<(alloc::string::String, spg_sql::ast::ForeignKeyConstraint)>,
469 /// v7.34 (crash-recovery P0 #2) — row-level redo capture. When the
470 /// embedding layer turns this on (persistence enabled), each mutating
471 /// `execute` records the physical [`RowChange`]s it applied; the
472 /// engine drains them into `last_redo` on success, and the embedded
473 /// layer reads them via [`Engine::take_redo`] to write the WAL in
474 /// place of the SQL text. Off (default) = zero capture overhead.
475 redo_capture: bool,
476 /// Redo captured by the most recent successful mutating `execute`,
477 /// awaiting drain by the embedding layer. Cleared on each capture.
478 last_redo: Vec<RowChange>,
479}
480
481/// v7.12.7 — hard cap on nested trigger-emitted embedded SQL
482/// fires. 16 deep is well past anything a normal trigger graph
483/// uses while still preventing infinite-loop wedging.
484const MAX_TRIGGER_RECURSION: u32 = 16;
485
486/// v6.5.6 — callback signature for slow-query log emission. Called
487/// with `(sql, elapsed_us)` once per successful execute that crosses
488/// the threshold.
489pub type SlowQueryLogger = fn(&str, u64);
490
491/// v6.5.2 — one row of `spg_stat_activity`. Engine-public so
492/// spg-server can construct rows without re-exporting internal
493/// dispatch types.
494#[derive(Debug, Clone)]
495pub struct ActivityRow {
496 pub pid: u32,
497 pub user: String,
498 pub started_at_us: i64,
499 pub current_sql: String,
500 pub wait_event: String,
501 pub elapsed_us: i64,
502 pub in_transaction: bool,
503 /// v7.17 Phase 2.4 — startup-param `application_name` (or the
504 /// last value the client sent via `SET application_name = '...'`).
505 /// Empty when the client never declared one.
506 pub application_name: String,
507}
508
509/// v6.5.2 — provider callback type. Fresh snapshot returned each
510/// call; engine doesn't cache the slice.
511pub type ActivityProvider = fn() -> Vec<ActivityRow>;
512
513/// v6.5.3 — one row of `spg_audit_chain`. Engine-public so
514/// spg-server can construct rows directly from `AuditEntry`.
515#[derive(Debug, Clone)]
516pub struct AuditRow {
517 pub seq: i64,
518 pub ts_ms: i64,
519 pub prev_hash_hex: String,
520 pub entry_hash_hex: String,
521 pub sql: String,
522}
523
524/// v6.5.3 — chain-table provider + verifier. spg-server registers
525/// fn pointers that snapshot / verify the audit log. `verify`
526/// returns `(verified_count, broken_at_seq)` — `broken_at_seq` is
527/// `-1` on a clean chain.
528pub type AuditChainProvider = fn() -> Vec<AuditRow>;
529pub type AuditVerifier = fn() -> (i64, i64);
530
531impl Engine {
532 pub fn new() -> Self {
533 Self {
534 catalog: Catalog::new(),
535 tx_catalogs: BTreeMap::new(),
536 current_tx: None,
537 backslash_escapes: false,
538 next_tx_id: 1,
539 clock: None,
540 salt_fn: None,
541 max_query_rows: None,
542 max_query_bytes: None,
543 users: UserStore::new(),
544 publications: publications::Publications::new(),
545 subscriptions: subscriptions::Subscriptions::new(),
546 statistics: statistics::Statistics::new(),
547 plan_cache: plan_cache::PlanCache::new(),
548 query_stats: query_stats::QueryStats::new(),
549 activity_provider: None,
550 audit_chain_provider: None,
551 audit_verifier: None,
552 slow_query_threshold_us: None,
553 slow_query_logger: None,
554 session_params: BTreeMap::new(),
555 trigger_recursion_depth: 0,
556 foreign_key_checks: true,
557 meta_views_materialised: false,
558 pending_foreign_keys: Vec::new(),
559 redo_capture: false,
560 last_redo: Vec::new(),
561 }
562 }
563
564 /// v7.11.0 — clone the engine's committed catalog + read-time
565 /// state into a frozen `CatalogSnapshot`. Cheap (`Catalog` is
566 /// backed by `PersistentVec`; cloning is O(log n) per table).
567 /// Subsequent writes to this engine are invisible to the
568 /// snapshot; the snapshot is self-contained and can be moved
569 /// to another thread for concurrent `execute_readonly_on_snapshot`
570 /// calls. The basis for [`AsyncReadHandle`] in spg-embedded-tokio
571 /// and any other read-fanout pattern.
572 #[must_use]
573 pub fn clone_snapshot(&self) -> CatalogSnapshot {
574 CatalogSnapshot {
575 catalog: self.active_catalog().clone(),
576 statistics: self.statistics.clone(),
577 clock: self.clock,
578 max_query_rows: self.max_query_rows,
579 }
580 }
581
582 /// Construct an engine restored from a previously-snapshotted catalog
583 /// (see `snapshot()`).
584 pub fn restore(catalog: Catalog) -> Self {
585 Self {
586 catalog,
587 tx_catalogs: BTreeMap::new(),
588 current_tx: None,
589 backslash_escapes: false,
590 next_tx_id: 1,
591 clock: None,
592 salt_fn: None,
593 max_query_rows: None,
594 max_query_bytes: None,
595 users: UserStore::new(),
596 publications: publications::Publications::new(),
597 subscriptions: subscriptions::Subscriptions::new(),
598 statistics: statistics::Statistics::new(),
599 plan_cache: plan_cache::PlanCache::new(),
600 query_stats: query_stats::QueryStats::new(),
601 activity_provider: None,
602 audit_chain_provider: None,
603 audit_verifier: None,
604 slow_query_threshold_us: None,
605 slow_query_logger: None,
606 session_params: BTreeMap::new(),
607 trigger_recursion_depth: 0,
608 foreign_key_checks: true,
609 meta_views_materialised: false,
610 pending_foreign_keys: Vec::new(),
611 redo_capture: false,
612 last_redo: Vec::new(),
613 }
614 }
615
616 /// Restore an engine + user table from a v4.1 envelope produced
617 /// by `snapshot_with_users()`. Falls back to plain catalog-only
618 /// restore if the envelope magic isn't present (so v3.x snapshot
619 /// files still load). v6.1.2 adds the optional publications
620 /// trailer (envelope v3); a v1/v2 envelope deserialises to an
621 /// empty publication table.
622 pub fn restore_envelope(buf: &[u8]) -> Result<Self, EngineError> {
623 match split_envelope(buf) {
624 EnvelopeParse::Pair {
625 catalog: catalog_bytes,
626 users: user_bytes,
627 publications: pub_bytes,
628 subscriptions: sub_bytes,
629 statistics: stats_bytes,
630 } => {
631 let catalog = Catalog::deserialize(catalog_bytes).map_err(EngineError::Storage)?;
632 let users = users::deserialize_users(user_bytes)
633 .map_err(|e| EngineError::Unsupported(alloc::format!("users restore: {e}")))?;
634 let publications = match pub_bytes {
635 Some(b) => publications::Publications::deserialize(b).map_err(|e| {
636 EngineError::Unsupported(alloc::format!("publications restore: {e:?}"))
637 })?,
638 None => publications::Publications::new(),
639 };
640 let subscriptions = match sub_bytes {
641 Some(b) => subscriptions::Subscriptions::deserialize(b).map_err(|e| {
642 EngineError::Unsupported(alloc::format!("subscriptions restore: {e:?}"))
643 })?,
644 None => subscriptions::Subscriptions::new(),
645 };
646 let statistics = match stats_bytes {
647 Some(b) => statistics::Statistics::deserialize(b).map_err(|e| {
648 EngineError::Unsupported(alloc::format!("statistics restore: {e:?}"))
649 })?,
650 None => statistics::Statistics::new(),
651 };
652 Ok(Self {
653 catalog,
654 tx_catalogs: BTreeMap::new(),
655 current_tx: None,
656 backslash_escapes: false,
657 next_tx_id: 1,
658 clock: None,
659 salt_fn: None,
660 max_query_rows: None,
661 max_query_bytes: None,
662 users,
663 publications,
664 subscriptions,
665 statistics,
666 plan_cache: plan_cache::PlanCache::new(),
667 query_stats: query_stats::QueryStats::new(),
668 activity_provider: None,
669 audit_chain_provider: None,
670 audit_verifier: None,
671 slow_query_threshold_us: None,
672 slow_query_logger: None,
673 session_params: BTreeMap::new(),
674 trigger_recursion_depth: 0,
675 foreign_key_checks: true,
676 meta_views_materialised: false,
677 pending_foreign_keys: Vec::new(),
678 redo_capture: false,
679 last_redo: Vec::new(),
680 })
681 }
682 EnvelopeParse::CrcMismatch { expected, computed } => {
683 Err(EngineError::Storage(StorageError::Corrupt(alloc::format!(
684 "snapshot envelope CRC32 mismatch (expected={expected:#010x}, computed={computed:#010x})"
685 ))))
686 }
687 EnvelopeParse::Bare => {
688 let catalog = Catalog::deserialize(buf).map_err(EngineError::Storage)?;
689 Ok(Self::restore(catalog))
690 }
691 }
692 }
693
694 pub const fn users(&self) -> &UserStore {
695 &self.users
696 }
697
698 /// Builder: attach a wall clock so `NOW()` / `CURRENT_TIMESTAMP` /
699 /// `CURRENT_DATE` evaluate to a real value instead of erroring out.
700 #[must_use]
701 pub const fn with_clock(mut self, clock: ClockFn) -> Self {
702 self.clock = Some(clock);
703 self
704 }
705
706 /// Builder: attach an OS-backed RNG for per-user password salts.
707 /// The host (`spg-server`) typically wires this to `/dev/urandom`.
708 #[must_use]
709 pub const fn with_salt_fn(mut self, f: SaltFn) -> Self {
710 self.salt_fn = Some(f);
711 self
712 }
713
714 /// Builder: cap the number of rows a single SELECT may return.
715 /// Exceeding the cap raises `EngineError::RowLimitExceeded` —
716 /// the bound is checked inside the executor so a runaway
717 /// catalog scan can't allocate millions of rows before the
718 /// server gets a chance to reject the result.
719 #[must_use]
720 pub const fn with_max_query_rows(mut self, n: usize) -> Self {
721 self.max_query_rows = Some(n);
722 self
723 }
724
725 /// Builder: cap the approximate heap bytes a single SELECT's
726 /// join/filter materialisation may hold. Exceeding the cap
727 /// raises `EngineError::QueryBytesExceeded`. Rows are the wrong
728 /// unit when one row carries a multi-MB body (mailrs round-26:
729 /// 1000-row batches of full mail text walked a 15 GiB host into
730 /// reclaim livelock without ever tripping a row ceiling).
731 #[must_use]
732 pub const fn with_max_query_bytes(mut self, n: usize) -> Self {
733 self.max_query_bytes = Some(n);
734 self
735 }
736
737 /// The *committed* catalog. Note: during a transaction this returns the
738 /// pre-TX state — `SELECT` inside a TX goes through `execute()` and reads
739 /// the shadow. Tests that inspect outside-TX state should use this.
740 pub const fn catalog(&self) -> &Catalog {
741 &self.catalog
742 }
743
744 /// Capture a frozen view of the committed engine state. Catalog
745 /// is O(1) Arc bump; trailers are cheap clones. Decouples "capture"
746 /// (needs &Engine) from "serialize" (CPU, no engine access) — the
747 /// seam the background-checkpoint worker rides in CoW-2.
748 pub fn snapshot_data(&self) -> EngineSnapshot {
749 EngineSnapshot {
750 catalog: self.catalog.clone(),
751 users: self.users.clone(),
752 publications: self.publications.clone(),
753 subscriptions: self.subscriptions.clone(),
754 statistics: self.statistics.clone(),
755 }
756 }
757
758 /// Serialize the *committed* catalog to bytes. v0.6 was full-snapshot; v0.9
759 /// adds the rule that an open TX's shadow is never snapshotted — only the
760 /// post-COMMIT state is persisted. v4.1 wraps the catalog in an envelope
761 /// when there are users to persist; an empty user table snapshots as the
762 /// bare catalog format (backwards-compat with v3.x readers). v6.1.2
763 /// adds publications to the envelope condition: either non-empty
764 /// users OR non-empty publications now triggers the envelope path.
765 pub fn snapshot(&self) -> Vec<u8> {
766 self.snapshot_data().serialize()
767 }
768
769 /// True when at least one TX slot is in flight. v4.41.1 runtime
770 /// invariant: at most one slot active at a time (dispatch holds
771 /// `engine.write()` across the entire wrap). v4.42 will let this
772 /// return true with multiple slots concurrently.
773 pub fn in_transaction(&self) -> bool {
774 !self.tx_catalogs.is_empty()
775 }
776
777 /// v4.41.1 allocate a fresh TX handle. Used by spg-server dispatch
778 /// to scope each implicit-wrap BEGIN..stmt..COMMIT to its own slot
779 /// in `tx_catalogs`. v4.42 — the commit-barrier leader allocates
780 /// one of these per task in its group, runs `BEGIN`+sql+`COMMIT`
781 /// sequentially under a single `engine.write()` so each task's
782 /// mutations accumulate into shared state, then either keeps the
783 /// accumulated state (fsync OK) or restores the pre-image via
784 /// `replace_catalog` (fsync err).
785 pub fn alloc_tx_id(&mut self) -> TxId {
786 let id = TxId(self.next_tx_id);
787 self.next_tx_id = self.next_tx_id.saturating_add(1);
788 id
789 }
790
791 /// v4.42 — atomically replace the live catalog. Used by the
792 /// commit-barrier leader to roll back a group whose batched
793 /// fsync failed: the leader snapshots `engine.catalog().clone()`
794 /// (O(1) Arc bump after the v4.39/v4.40 persistent migration)
795 /// at group start, sequentially applies each task's BEGIN+sql+
796 /// COMMIT under the same write lock to accumulate mutations
797 /// into shared state, batches the WAL bytes, fsyncs once, and
798 /// on failure calls this with the pre-image to undo every
799 /// task in the group at once.
800 ///
801 /// **Does NOT touch `tx_catalogs` / `current_tx`.** Any
802 /// explicit-TX slot from a concurrent client (created via the
803 /// legacy `IMPLICIT_TX`-less dispatch path or via the future
804 /// MVCC-readers v5+ work) has its own snapshot baked into the
805 /// slot — restoring `self.catalog` to the pre-image leaves
806 /// those slots untouched, exactly as they were when the leader
807 /// took the lock. The leader's own implicit-TX slots are all
808 /// already discarded (`exec_commit` removed them as each
809 /// task's COMMIT ran) by the time this is reached.
810 pub fn replace_catalog(&mut self, catalog: Catalog) {
811 self.catalog = catalog;
812 }
813
814 /// v6.7.0 — public shim around `Catalog::freeze_oldest_to_cold`
815 /// so tests + the spg-server freezer can drive a freeze without
816 /// reaching into the private `active_catalog_mut`. v6.7.4
817 /// parallel freezer will build on this surface.
818 ///
819 /// Marks the table's cached `cold_row_count` stale because the
820 /// freeze added cold locators that ANALYZE hasn't yet refreshed.
821 pub fn freeze_oldest_to_cold(
822 &mut self,
823 table_name: &str,
824 index_name: &str,
825 max_rows: usize,
826 ) -> Result<spg_storage::FreezeReport, EngineError> {
827 let report = self
828 .active_catalog_mut()
829 .freeze_oldest_to_cold(table_name, index_name, max_rows)
830 .map_err(EngineError::Storage)?;
831 if let Some(t) = self.active_catalog_mut().get_mut(table_name) {
832 t.mark_cold_row_count_stale();
833 }
834 Ok(report)
835 }
836
837 /// v6.7.5 — public shim used by the spg-server follower's
838 /// segment-forwarding receiver. Registers a cold-tier segment
839 /// at a specific id (the master's id, as transmitted on the
840 /// wire) so the follower's BTree-Cold locators stay byte-
841 /// identical with the master's. Wraps
842 /// `Catalog::load_segment_bytes_at` under the standard
843 /// clone-mutate-replace pattern.
844 ///
845 /// Returns `Ok(())` on success **and** on the "slot already
846 /// occupied" case — a follower mid-reconnect may receive a
847 /// segment chunk for a segment_id it already has on disk
848 /// (forwarded last session); the caller should treat that
849 /// path as a no-op rather than a fatal error.
850 pub fn receive_cold_segment(
851 &mut self,
852 segment_id: u32,
853 bytes: Vec<u8>,
854 ) -> Result<(), EngineError> {
855 let mut new_cat = self.catalog.clone();
856 match new_cat.load_segment_bytes_at(segment_id, bytes) {
857 Ok(()) => {
858 self.replace_catalog(new_cat);
859 Ok(())
860 }
861 Err(StorageError::Corrupt(msg)) if msg.contains("already occupied") => Ok(()),
862 Err(e) => Err(EngineError::Storage(e)),
863 }
864 }
865
866 pub(crate) fn active_catalog(&self) -> &Catalog {
867 match self.current_tx {
868 Some(t) => self
869 .tx_catalogs
870 .get(&t)
871 .map_or(&self.catalog, |s| &s.catalog),
872 None => &self.catalog,
873 }
874 }
875
876 fn active_catalog_mut(&mut self) -> &mut Catalog {
877 let tx = self.current_tx;
878 match tx {
879 Some(t) => match self.tx_catalogs.get_mut(&t) {
880 Some(s) => &mut s.catalog,
881 None => &mut self.catalog,
882 },
883 None => &mut self.catalog,
884 }
885 }
886
887 /// v7.34 (crash-recovery P0 #2) — turn row-level redo capture on/off.
888 /// The embedding layer enables it when persistence is on so each
889 /// mutating `execute` records the physical [`RowChange`]s it applied
890 /// (drained via [`Engine::take_redo`]). Off = zero capture overhead.
891 pub fn set_redo_capture(&mut self, on: bool) {
892 self.redo_capture = on;
893 }
894
895 /// v7.34 — take the redo captured by the most recent successful
896 /// mutating `execute` (empty when capture is off, the statement was a
897 /// read, or it changed nothing). The embedding layer writes these to
898 /// the WAL in place of the SQL text.
899 pub fn take_redo(&mut self) -> Vec<RowChange> {
900 core::mem::take(&mut self.last_redo)
901 }
902
903 /// v7.34 (crash-recovery P0 #2) — replay a row-level redo log onto the
904 /// committed catalog (the row-level WAL recovery primitive: apply the
905 /// captured physical changes from a checkpoint baseline, in place of
906 /// re-executing the SQL). Trusts the log — no uniqueness/FK/parse.
907 pub fn apply_redo(&mut self, changes: &[RowChange]) -> Result<(), EngineError> {
908 self.catalog
909 .apply_redo(changes)
910 .map_err(EngineError::Storage)
911 }
912
913 /// Read-only execute path. Succeeds for `SELECT` / `SHOW TABLES`
914 /// / `SHOW COLUMNS`; returns `EngineError::WriteRequired` for
915 /// every other statement, so the caller can fall through to the
916 /// `&mut self` `execute` path under a write lock. Engine state is
917 /// not mutated even on the success path (`rewrite_clock_calls`
918 /// and `resolve_order_by_position` both mutate the locally-owned
919 /// AST, not `self`).
920 ///
921 /// v4.2: cap result-set size. Applied after the executor
922 /// materialises rows but before they leave the engine — wrapping
923 /// every Rows-returning exec_* function would scatter the check.
924 ///
925 /// v7.31 (memory campaign, bucket A) — the same choke point now
926 /// also enforces the BYTE budget on the final result set, so
927 /// single-table and aggregate paths (which don't route through
928 /// the join materialiser's incremental accounting) still cannot
929 /// hand the host an unbounded result. Intermediate single-table
930 /// clones are the 7.31.x follow-up (design doc, bucket A).
931 fn enforce_row_limit(
932 &self,
933 result: Result<QueryResult, EngineError>,
934 ) -> Result<QueryResult, EngineError> {
935 if let Ok(QueryResult::Rows { rows, .. }) = &result {
936 if let Some(cap) = self.max_query_rows
937 && rows.len() > cap
938 {
939 return Err(EngineError::RowLimitExceeded(cap));
940 }
941 if let Some(byte_cap) = self.max_query_bytes
942 && approx_rows_bytes(rows) > byte_cap
943 {
944 return Err(EngineError::QueryBytesExceeded(byte_cap));
945 }
946 }
947 result
948 }
949}
950
951/// v7.31 (memory campaign — ceiling-first / never-die, design v1) —
952/// per-table slice of the engine's resident-memory accounting.
953/// `hot_encoded_bytes` is the storage layer's maintained meter (what
954/// the rows encode to); `approx_resident_bytes` is what they COST in
955/// RAM (per-cell enum slots + heap payloads via `approx_row_bytes`)
956/// — the gap between the two is the representation multiplier the
957/// round-26 report measured at ~11× end-to-end.
958#[derive(Debug, Clone)]
959pub struct TableMemoryStats {
960 pub name: String,
961 pub hot_rows: u64,
962 /// Cached cold-row count (refreshed by ANALYZE — see
963 /// `Table::cold_row_count`'s staleness contract).
964 pub cold_rows: u64,
965 pub hot_encoded_bytes: u64,
966 pub approx_resident_bytes: u64,
967 pub index_count: u64,
968 /// v7.31 C2 — sum of `IndexKind::approx_resident_bytes()` over the
969 /// table's indices: every variant (BTree / NSW / BRIN / GIN family)
970 /// walks its own structure, so the GIN posting lists and NSW layer
971 /// adjacency that dominate text/vector tables are counted honestly
972 /// instead of the old flat-token estimate.
973 pub approx_index_bytes: u64,
974}
975
976/// v7.31 — whole-engine memory snapshot: the polling form of the
977/// round-26 ask-4 watermark signal. Hosts compare
978/// `total_approx_resident_bytes` (+ their own WAL/file accounting)
979/// against their deployment ceiling and shed/shrink before the
980/// kernel does it for them.
981#[derive(Debug, Clone)]
982pub struct MemoryStats {
983 pub tables: Vec<TableMemoryStats>,
984 pub total_hot_encoded_bytes: u64,
985 pub total_approx_resident_bytes: u64,
986 pub total_approx_index_bytes: u64,
987 /// The active per-query materialisation budget (bucket A), so a
988 /// monitoring host sees ceiling and usage through one call.
989 pub max_query_bytes: Option<usize>,
990 /// v7.31 C2 — bucket D: live WAL bytes (active chunk + buffered,
991 /// uncheckpointed). `None` from the engine itself — it has no WAL;
992 /// the durable hosts (embed `Database`, server) fill it in from
993 /// their own WAL accounting. `Some(0)` means "host has a WAL and
994 /// it is empty"; `None` means "no WAL on this path" (in-memory).
995 pub wal_bytes: Option<u64>,
996}
997
998/// v6.2.0 — true for engine-managed catalog tables that the bare
999/// `ANALYZE` (no target) should skip. v6.2.0 has no internal
1000/// tables yet (publications / subscriptions / users / statistics
1001/// all live as engine fields, not catalog tables), so this is a
1002/// reserved future-proofing hook — every existing user table is
1003/// analysed.
1004const fn is_internal_table_name(_name: &str) -> bool {
1005 false
1006}
1007
1008#[cfg(test)]
1009mod tests;