Skip to main content

spg_engine/
execute.rs

1//! Statement execution + prepared-statement dispatch, split out of
2//! `lib.rs` (lib.rs split 17). The public `execute` / `execute_in` /
3//! `execute_with_cancel` entry points, the `prepare` / `prepare_cached`
4//! / `describe_prepared` / `execute_prepared` prepared-statement path,
5//! and the internal pipeline (`execute_inner_with_cancel` →
6//! `execute_stmt_with_cancel`) that pre-resolves clock / sequence /
7//! placeholder rewrites and routes each parsed Statement to its domain
8//! handler (DDL / DML / SELECT / transaction / SHOW / …). Whole
9//! `impl Engine` methods reached via the Engine type, so the public
10//! surface is unchanged; `execute_stmt_with_cancel` is pub(crate) for
11//! the plpgsql + trigger re-entry paths.
12
13use alloc::string::String;
14use alloc::vec::Vec;
15
16use spg_sql::ast::Statement;
17use spg_sql::parser::{self, ParseError};
18use spg_storage::{ColumnSchema, Value};
19
20use crate::describe;
21use crate::{
22    CancelToken, Engine, EngineError, IMPLICIT_TX, QueryResult, TxId, expand_group_by_all,
23    plan_cache, reorder, resolve_order_by_position, rewrite_clock_calls, substitute_placeholders,
24};
25
26impl Engine {
27    pub fn execute(&mut self, sql: &str) -> Result<QueryResult, EngineError> {
28        self.execute_in_with_cancel(sql, IMPLICIT_TX, CancelToken::none())
29    }
30
31    /// v4.5 — write path with cooperative cancellation. Same dispatch
32    /// as `execute_in_with_cancel(sql, IMPLICIT_TX, cancel)`. Kept as
33    /// a separate entry point for backward-compat with the v4.5
34    /// public API.
35    pub fn execute_with_cancel(
36        &mut self,
37        sql: &str,
38        cancel: CancelToken<'_>,
39    ) -> Result<QueryResult, EngineError> {
40        self.execute_in_with_cancel(sql, IMPLICIT_TX, cancel)
41    }
42
43    /// v4.41.1 multi-slot write entry. Routes `sql` through the TX
44    /// slot identified by `tx_id` so spg-server dispatch can scope
45    /// each implicit-wrap BEGIN..stmt..COMMIT to its own slot in
46    /// `tx_catalogs`. `IMPLICIT_TX` is the legacy single-slot path
47    /// every other caller (engine self-tests, replay, spg-embedded)
48    /// implicitly takes via `execute()` / `execute_with_cancel()`.
49    pub fn execute_in(&mut self, sql: &str, tx_id: TxId) -> Result<QueryResult, EngineError> {
50        self.execute_in_with_cancel(sql, tx_id, CancelToken::none())
51    }
52
53    /// v4.41.1 write path with cooperative cancellation + explicit TX
54    /// scope. Sets `self.current_tx` for the duration of the call so
55    /// every `exec_*` helper transparently sees its TX's shadow
56    /// catalog and savepoint stack; restores on exit so the field is
57    /// only valid mid-call (no leakage across calls).
58    pub fn execute_in_with_cancel(
59        &mut self,
60        sql: &str,
61        tx_id: TxId,
62        cancel: CancelToken<'_>,
63    ) -> Result<QueryResult, EngineError> {
64        let saved = self.current_tx;
65        self.current_tx = Some(tx_id);
66        let result = self.execute_inner_with_cancel(sql, cancel);
67        self.current_tx = saved;
68        result
69    }
70
71    /// v6.1.1 — parse and pre-process a SQL string ONCE so the
72    /// resulting [`Statement`] can be cached and re-executed via
73    /// [`Engine::execute_prepared`]. Returns the same `Statement`
74    /// the simple-query path would synthesise internally (clock
75    /// rewrites + ORDER BY position-ref resolution applied at
76    /// prepare time, since both are session-independent). The
77    /// `$N` placeholders in the SQL stay as `Expr::Placeholder(n)`
78    /// nodes; they're resolved to concrete values per-call by
79    /// `execute_prepared`'s substitution walk.
80    ///
81    /// Pgwire's `Parse` (P) message lands here.
82    pub fn prepare(&self, sql: &str) -> Result<Statement, ParseError> {
83        let mut stmt = parser::parse_statement_with(sql, self.backslash_escapes)?;
84        let now_micros = self.clock.map(|f| f());
85        rewrite_clock_calls(&mut stmt, now_micros);
86        if let Statement::Select(s) = &mut stmt {
87            // v6.4.1 — expand `GROUP BY ALL` to every non-aggregate
88            // SELECT-list item BEFORE position / alias resolution so
89            // downstream passes see the explicit list.
90            expand_group_by_all(s);
91            resolve_order_by_position(s);
92            // v6.2.3 — cost-based JOIN reorder. No-op for
93            // single-table FROMs or any non-INNER join shape.
94            reorder::reorder_joins(s, &self.catalog, &self.statistics);
95        }
96        Ok(stmt)
97    }
98
99    /// v6.3.0 — cached prepare. Returns a cloned `Statement` from
100    /// the plan cache on hit, runs the full `prepare()` path on miss
101    /// and inserts the resulting plan before returning. Skipping the
102    /// parse + JOIN-reorder pipeline on hit is the dominant win for
103    /// JDBC / sqlx / pgx clients that reuse the same SQL string.
104    ///
105    /// Returns a cloned `Statement` (not a borrow) because the
106    /// pgwire layer owns its `PreparedStmt` map per-session and the
107    /// engine-level cache must stay available for other sessions.
108    /// Clone cost on a 5-table JOIN AST is well under the parse cost
109    /// it replaces.
110    pub fn prepare_cached(&mut self, sql: &str) -> Result<Statement, ParseError> {
111        // v6.3.1 — version-aware lookup. If the cached plan was
112        // prepared before the most recent ANALYZE, evict and replan.
113        let current_version = self.statistics.version();
114        if let Some(plan) = self.plan_cache.get(sql) {
115            if plan.statistics_version == current_version {
116                return Ok(plan.stmt.clone());
117            }
118            // Stale entry — fall through to evict + re-prepare.
119        }
120        self.plan_cache.evict(sql);
121        let stmt = self.prepare(sql)?;
122        let source_tables = plan_cache::collect_source_tables(&stmt);
123        let plan = plan_cache::PreparedPlan {
124            stmt: stmt.clone(),
125            statistics_version: current_version,
126            source_tables,
127            describe_columns: alloc::vec::Vec::new(),
128        };
129        self.plan_cache.insert(String::from(sql), plan);
130        Ok(stmt)
131    }
132
133    /// v6.3.0 — read-only accessor for tests and v6.3.1 invalidation.
134    pub fn plan_cache(&self) -> &plan_cache::PlanCache {
135        &self.plan_cache
136    }
137
138    /// v6.3.0 — mutable accessor for v6.3.1 invalidation hooks.
139    pub fn plan_cache_mut(&mut self) -> &mut plan_cache::PlanCache {
140        &mut self.plan_cache
141    }
142
143    /// v6.3.3 — Describe a prepared `Statement` without executing.
144    /// Returns `(parameter_oids, output_columns)`. Empty
145    /// `output_columns` means the statement has no row-producing
146    /// shape we could resolve here (JOIN, subquery, non-SELECT, …)
147    /// — pgwire layer maps that to a `NoData` reply.
148    pub fn describe_prepared(&self, stmt: &Statement) -> (Vec<u32>, Vec<ColumnSchema>) {
149        describe::describe_prepared(stmt, self.active_catalog())
150    }
151
152    /// v6.1.1 — execute a [`Statement`] previously returned by
153    /// [`Engine::prepare`], substituting `Expr::Placeholder(n)`
154    /// nodes for the corresponding [`Value`] in `params` (1-based
155    /// per PG: `$1` → `params[0]`). Bind-time string parameters
156    /// are decoded into typed `Value`s by the pgwire layer before
157    /// this call so the resulting AST hits the same execution
158    /// path as a simple query — no SQL re-parse.
159    ///
160    /// Pgwire's `Execute` (E) message after a `Bind` (B) lands here.
161    pub fn execute_prepared(
162        &mut self,
163        stmt: Statement,
164        params: &[Value],
165    ) -> Result<QueryResult, EngineError> {
166        self.execute_prepared_with_cancel(stmt, params, CancelToken::none())
167    }
168
169    /// v7.17.0 Phase 2.3 — prepared-statement entry that honors a
170    /// caller-supplied `CancelToken`. Mirrors `execute_prepared`'s
171    /// `current_tx` save/restore so the extended-query path stays
172    /// transactionally consistent with the simple-query path.
173    pub fn execute_prepared_with_cancel(
174        &mut self,
175        mut stmt: Statement,
176        params: &[Value],
177        cancel: CancelToken<'_>,
178    ) -> Result<QueryResult, EngineError> {
179        substitute_placeholders(&mut stmt, params)?;
180        // v7.16.0 — set `current_tx` for the duration of the
181        // dispatch so the `exec_*` helpers see the right TX
182        // slot (matches what `execute_in_with_cancel` does for
183        // simple-query). Pre-v7.16 the simple-query path
184        // worked because every public entry point routed
185        // through `execute_in_with_cancel`; the prepared path
186        // skipped the wrap and so its INSERTs/UPDATEs landed
187        // in the no-tx default slot, silently invisible to a
188        // BEGIN/COMMIT-bracketed flow. Caught by spg-sqlx's
189        // first transaction-visibility test.
190        let saved = self.current_tx;
191        self.current_tx = Some(IMPLICIT_TX);
192        let result = self.execute_stmt_with_cancel(stmt, cancel);
193        self.current_tx = saved;
194        result
195    }
196
197    fn execute_inner_with_cancel(
198        &mut self,
199        sql: &str,
200        cancel: CancelToken<'_>,
201    ) -> Result<QueryResult, EngineError> {
202        cancel.check()?;
203        let stmt = self.prepare(sql)?;
204        // v6.5.1 — wrap the executor with a wall-clock window so we
205        // can record into spg_stat_query. Skip when the engine has
206        // no clock attached (no_std embedded callers).
207        let start_us = self.clock.map(|f| f());
208        let result = self.execute_stmt_with_cancel(stmt, cancel);
209        if let (Some(t0), Ok(_)) = (start_us, &result) {
210            let now = self.clock.map_or(t0, |f| f());
211            let elapsed = now.saturating_sub(t0).max(0) as u64;
212            self.query_stats.record(sql, elapsed, now as u64);
213            // v6.5.6 — slow-query log: fire callback when elapsed
214            // exceeds the configured floor.
215            if let (Some(threshold), Some(logger)) =
216                (self.slow_query_threshold_us, self.slow_query_logger)
217                && elapsed >= threshold
218            {
219                logger(sql, elapsed);
220            }
221        }
222        result
223    }
224
225    pub(crate) fn execute_stmt_with_cancel(
226        &mut self,
227        stmt: Statement,
228        cancel: CancelToken<'_>,
229    ) -> Result<QueryResult, EngineError> {
230        cancel.check()?;
231        // v7.17.0 Phase 1.1 — pre-resolve nextval / currval /
232        // setval calls in the statement tree. Walks SELECT
233        // projection, INSERT VALUES, UPDATE SET, DELETE WHERE,
234        // and DEFAULT exprs; replaces sequence FunctionCall
235        // nodes with concrete Literal values minted against the
236        // catalog. This is the only place that mutates sequence
237        // state from a SELECT-shaped path (exec_select_cancel is
238        // `&self` and can't reach the catalog mutably).
239        //
240        // Fast-path: when no sequences exist anywhere in the
241        // catalog (the typical hot-path INSERT load), skip the
242        // walker entirely. Single map-emptiness check on the
243        // catalog beats walking every expression on every call.
244        let mut stmt = stmt;
245        // v7.17 dump-compat — the fast-path check
246        // `sequences().is_empty()` skips pre-resolve when no
247        // sequence exists in the *currently active* catalog
248        // snapshot. The committed catalog or the implicit-TX
249        // catalog may legitimately disagree on this between
250        // CREATE SEQUENCE and a later setval(): always run the
251        // resolver — the walk is O(expr-count) and dwarfed by
252        // the parse cost we just paid.
253        self.pre_resolve_sequence_calls_in_statement(&mut stmt)?;
254        let result = match stmt {
255            Statement::CreateTable(s) => self.exec_create_table(s),
256            // v7.9.15 — CREATE EXTENSION is a no-op on SPG. Returns
257            // CommandOk with affected=0; modified_catalog=false so
258            // the WAL doesn't grow a useless entry. mailrs F3.
259            Statement::CreateExtension(_) => Ok(QueryResult::CommandOk {
260                affected: 0,
261                modified_catalog: false,
262            }),
263            // v7.16.2 — DO $$ ... $$ block. mailrs round-10 A.2
264            // — the pre-v7.9.27 no-op SILENTLY swallowed every
265            // mailrs migrate-038/-040/-042 idempotent rename
266            // (the IF EXISTS … THEN ALTER … END block never
267            // ran). v7.16.2 dispatches to exec_do_block which
268            // runs the PlPgSqlBlock at top level via the same
269            // execute_stmts machinery the trigger executor
270            // uses (NEW=None, OLD=None — DO blocks have no
271            // row context).
272            Statement::DoBlock(body) => self.exec_do_block(body),
273            // v7.14.0 — empty-statement no-op for pg_dump /
274            // mysqldump preamble lines that collapse to nothing
275            // after comment-stripping.
276            Statement::Empty => Ok(QueryResult::CommandOk {
277                affected: 0,
278                modified_catalog: false,
279            }),
280            Statement::DropTable { names, if_exists } => self.exec_drop_table(names, if_exists),
281            Statement::DropIndex { name, if_exists } => self.exec_drop_index(name, if_exists),
282            Statement::CreateIndex(s) => self.exec_create_index(s),
283            Statement::Insert(s) => self.exec_insert(s),
284            Statement::Update(mut s) => {
285                // Materialise uncorrelated subqueries in SET / WHERE
286                // before the row walk — the SELECT path has done this
287                // since v4.10; UPDATE gained it for mailrs's
288                // `UPDATE … WHERE id IN (SELECT … FOR UPDATE SKIP
289                // LOCKED)` claim pattern (embed round-12).
290                for (_, e) in &mut s.assignments {
291                    self.resolve_expr_subqueries(e, cancel)?;
292                }
293                if let Some(w) = &mut s.where_ {
294                    self.resolve_expr_subqueries(w, cancel)?;
295                }
296                self.exec_update_cancel(&s, cancel)
297            }
298            Statement::Delete(mut s) => {
299                if let Some(w) = &mut s.where_ {
300                    self.resolve_expr_subqueries(w, cancel)?;
301                }
302                self.exec_delete_cancel(&s, cancel)
303            }
304            Statement::Merge(s) => self.exec_merge_cancel(&s, cancel),
305            Statement::Select(s) => self.exec_select_cancel(&s, cancel),
306            Statement::Begin => self.exec_begin(),
307            Statement::Commit => self.exec_commit(),
308            Statement::Rollback => self.exec_rollback(),
309            Statement::Savepoint(name) => self.exec_savepoint(name),
310            Statement::RollbackToSavepoint(name) => self.exec_rollback_to_savepoint(&name),
311            Statement::ReleaseSavepoint(name) => self.exec_release_savepoint(&name),
312            Statement::ShowTables => Ok(self.exec_show_tables()),
313            Statement::ShowDatabases => Ok(self.exec_show_databases()),
314            Statement::ShowCreateTable(name) => self.exec_show_create_table(&name),
315            Statement::ShowIndexes(name) => self.exec_show_indexes(&name),
316            Statement::ShowStatus => Ok(self.exec_show_status()),
317            Statement::ShowVariables => Ok(self.exec_show_variables()),
318            Statement::ShowProcesslist => Ok(self.exec_show_processlist()),
319            Statement::ShowColumns(table) => self.exec_show_columns(&table),
320            Statement::ShowUsers => Ok(self.exec_show_users()),
321            Statement::ShowPublications => Ok(self.exec_show_publications()),
322            Statement::ShowSubscriptions => Ok(self.exec_show_subscriptions()),
323            Statement::CreateUser(s) => self.exec_create_user(&s),
324            Statement::DropUser(name) => self.exec_drop_user(&name),
325            Statement::Explain(e) => self.exec_explain(&e, cancel),
326            Statement::AlterIndex(s) => self.exec_alter_index(s),
327            Statement::AlterTable(s) => self.exec_alter_table(s),
328            Statement::CreatePublication(s) => self.exec_create_publication(s),
329            Statement::DropPublication(name) => self.exec_drop_publication(&name),
330            Statement::CreateSubscription(s) => self.exec_create_subscription(s),
331            Statement::DropSubscription(name) => self.exec_drop_subscription(&name),
332            // v6.1.7 — WAIT FOR WAL POSITION needs `lag_state`,
333            // which lives in spg-server's ServerState. The engine
334            // surfaces a clear error; the server-layer dispatch
335            // intercepts the SQL before it reaches the engine on
336            // a server build, so this arm only fires for
337            // engine-only callers (spg-embedded, lib tests).
338            Statement::WaitForWalPosition { .. } => Err(EngineError::Unsupported(
339                "WAIT FOR WAL POSITION must be handled by the server layer".into(),
340            )),
341            // v6.2.0 — ANALYZE recomputes per-column histograms.
342            Statement::Analyze(target) => self.exec_analyze(target.as_deref()),
343            // v6.7.3 — COMPACT COLD SEGMENTS.
344            Statement::CompactColdSegments => self.exec_compact_cold_segments(),
345            // v7.12.1 — SET / RESET session parameter. Engine
346            // tracks the value in `session_params`; FTS dispatcher
347            // reads `default_text_search_config`. Everything else
348            // is a recorded no-op (PG dump compat).
349            Statement::SetParameter { name, value } => {
350                self.set_session_param(name, value);
351                Ok(QueryResult::CommandOk {
352                    affected: 0,
353                    modified_catalog: false,
354                })
355            }
356            // v7.14.0 — MySQL multi-assignment SET. Each pair runs
357            // through `set_session_param` so engine-known params
358            // (FOREIGN_KEY_CHECKS, session_replication_role, …) take
359            // effect; unknown pairs (including `@VAR` LHS from the
360            // mysqldump preamble) are recorded then ignored.
361            Statement::SetParameterList(pairs) => {
362                for (name, value) in pairs {
363                    self.set_session_param(name, value);
364                }
365                Ok(QueryResult::CommandOk {
366                    affected: 0,
367                    modified_catalog: false,
368                })
369            }
370            // v7.12.4 — CREATE FUNCTION / CREATE TRIGGER / DROP …
371            // for the PL/pgSQL trigger surface. exec_* methods are
372            // defined alongside the existing CREATE handlers below.
373            Statement::CreateFunction(s) => self.exec_create_function(s),
374            Statement::CreateTrigger(s) => self.exec_create_trigger(s),
375            Statement::DropTrigger {
376                name,
377                table,
378                if_exists,
379            } => self.exec_drop_trigger(&name, &table, if_exists),
380            Statement::DropFunction { name, if_exists } => {
381                self.exec_drop_function(&name, if_exists)
382            }
383            Statement::CreateSequence(s) => self.exec_create_sequence(s),
384            Statement::AlterSequence(s) => self.exec_alter_sequence(s),
385            Statement::DropSequence { names, if_exists } => {
386                self.exec_drop_sequence(&names, if_exists)
387            }
388            Statement::CreateView(s) => self.exec_create_view(s),
389            Statement::DropView { names, if_exists } => self.exec_drop_view(&names, if_exists),
390            Statement::CreateMaterializedView(s) => self.exec_create_materialized_view(s),
391            Statement::RefreshMaterializedView { name, with_data } => {
392                self.exec_refresh_materialized_view(&name, with_data)
393            }
394            Statement::DropMaterializedView { names, if_exists } => {
395                self.exec_drop_materialized_view(&names, if_exists)
396            }
397            Statement::CreateType(s) => self.exec_create_type(s),
398            Statement::DropType { names, if_exists } => self.exec_drop_type(&names, if_exists),
399            Statement::CreateDomain(s) => self.exec_create_domain(s),
400            Statement::DropDomain { names, if_exists } => self.exec_drop_domain(&names, if_exists),
401            Statement::CreateSchema {
402                name,
403                if_not_exists,
404            } => self.exec_create_schema(name, if_not_exists),
405            Statement::DropSchema { names, if_exists } => self.exec_drop_schema(&names, if_exists),
406            Statement::ResetParameter(target) => {
407                match target {
408                    None => self.session_params.clear(),
409                    Some(name) => {
410                        self.session_params.remove(&name.to_ascii_lowercase());
411                    }
412                }
413                Ok(QueryResult::CommandOk {
414                    affected: 0,
415                    modified_catalog: false,
416                })
417            }
418        };
419        self.enforce_row_limit(result)
420    }
421}