Skip to main content

spg_engine/
execute.rs

1//! Statement execution + prepared-statement dispatch, split out of
2//! `lib.rs` (lib.rs split 17). The public `execute` / `execute_in` /
3//! `execute_with_cancel` entry points, the `prepare` / `prepare_cached`
4//! / `describe_prepared` / `execute_prepared` prepared-statement path,
5//! and the internal pipeline (`execute_inner_with_cancel` →
6//! `execute_stmt_with_cancel`) that pre-resolves clock / sequence /
7//! placeholder rewrites and routes each parsed Statement to its domain
8//! handler (DDL / DML / SELECT / transaction / SHOW / …). Whole
9//! `impl Engine` methods reached via the Engine type, so the public
10//! surface is unchanged; `execute_stmt_with_cancel` is pub(crate) for
11//! the plpgsql + trigger re-entry paths.
12
13use alloc::string::String;
14use alloc::vec::Vec;
15
16use spg_sql::ast::Statement;
17use spg_sql::parser::{self, ParseError};
18use spg_storage::{ColumnSchema, Value};
19
20use crate::describe;
21use crate::{
22    CancelToken, Engine, EngineError, IMPLICIT_TX, QueryResult, TxId, expand_group_by_all,
23    plan_cache, reorder, resolve_order_by_position, rewrite_clock_calls, substitute_placeholders,
24};
25
26impl Engine {
27    pub fn execute(&mut self, sql: &str) -> Result<QueryResult, EngineError> {
28        self.execute_in_with_cancel(sql, IMPLICIT_TX, CancelToken::none())
29    }
30
31    /// v4.5 — write path with cooperative cancellation. Same dispatch
32    /// as `execute_in_with_cancel(sql, IMPLICIT_TX, cancel)`. Kept as
33    /// a separate entry point for backward-compat with the v4.5
34    /// public API.
35    pub fn execute_with_cancel(
36        &mut self,
37        sql: &str,
38        cancel: CancelToken<'_>,
39    ) -> Result<QueryResult, EngineError> {
40        self.execute_in_with_cancel(sql, IMPLICIT_TX, cancel)
41    }
42
43    /// v4.41.1 multi-slot write entry. Routes `sql` through the TX
44    /// slot identified by `tx_id` so spg-server dispatch can scope
45    /// each implicit-wrap BEGIN..stmt..COMMIT to its own slot in
46    /// `tx_catalogs`. `IMPLICIT_TX` is the legacy single-slot path
47    /// every other caller (engine self-tests, replay, spg-embedded)
48    /// implicitly takes via `execute()` / `execute_with_cancel()`.
49    pub fn execute_in(&mut self, sql: &str, tx_id: TxId) -> Result<QueryResult, EngineError> {
50        self.execute_in_with_cancel(sql, tx_id, CancelToken::none())
51    }
52
53    /// v4.41.1 write path with cooperative cancellation + explicit TX
54    /// scope. Sets `self.current_tx` for the duration of the call so
55    /// every `exec_*` helper transparently sees its TX's shadow
56    /// catalog and savepoint stack; restores on exit so the field is
57    /// only valid mid-call (no leakage across calls).
58    pub fn execute_in_with_cancel(
59        &mut self,
60        sql: &str,
61        tx_id: TxId,
62        cancel: CancelToken<'_>,
63    ) -> Result<QueryResult, EngineError> {
64        let saved = self.current_tx;
65        self.current_tx = Some(tx_id);
66        // v7.34 (crash-recovery P0 #2) — row-level redo capture. Arm the
67        // active catalog before dispatch; on success drain the physical
68        // changes into `last_redo` for the embedding layer's WAL, on
69        // failure discard them (a failed statement leaves no redo; the
70        // drain clears the tables' capture buffers either way).
71        if self.redo_capture {
72            self.active_catalog_mut().enable_redo_all();
73        }
74        let result = self.execute_inner_with_cancel(sql, cancel);
75        if self.redo_capture {
76            let drained = self.active_catalog_mut().drain_redo();
77            if result.is_ok() {
78                self.last_redo = drained;
79            }
80        }
81        self.current_tx = saved;
82        result
83    }
84
85    /// v6.1.1 — parse and pre-process a SQL string ONCE so the
86    /// resulting [`Statement`] can be cached and re-executed via
87    /// [`Engine::execute_prepared`]. Returns the same `Statement`
88    /// the simple-query path would synthesise internally (clock
89    /// rewrites + ORDER BY position-ref resolution applied at
90    /// prepare time, since both are session-independent). The
91    /// `$N` placeholders in the SQL stay as `Expr::Placeholder(n)`
92    /// nodes; they're resolved to concrete values per-call by
93    /// `execute_prepared`'s substitution walk.
94    ///
95    /// Pgwire's `Parse` (P) message lands here.
96    pub fn prepare(&self, sql: &str) -> Result<Statement, ParseError> {
97        let mut stmt = parser::parse_statement_with(sql, self.backslash_escapes)?;
98        let now_micros = self.clock.map(|f| f());
99        rewrite_clock_calls(&mut stmt, now_micros);
100        if let Statement::Select(s) = &mut stmt {
101            // v6.4.1 — expand `GROUP BY ALL` to every non-aggregate
102            // SELECT-list item BEFORE position / alias resolution so
103            // downstream passes see the explicit list.
104            expand_group_by_all(s);
105            resolve_order_by_position(s);
106            // v6.2.3 — cost-based JOIN reorder. No-op for
107            // single-table FROMs or any non-INNER join shape.
108            reorder::reorder_joins(s, &self.catalog, &self.statistics);
109        }
110        Ok(stmt)
111    }
112
113    /// v6.3.0 — cached prepare. Returns a cloned `Statement` from
114    /// the plan cache on hit, runs the full `prepare()` path on miss
115    /// and inserts the resulting plan before returning. Skipping the
116    /// parse + JOIN-reorder pipeline on hit is the dominant win for
117    /// JDBC / sqlx / pgx clients that reuse the same SQL string.
118    ///
119    /// Returns a cloned `Statement` (not a borrow) because the
120    /// pgwire layer owns its `PreparedStmt` map per-session and the
121    /// engine-level cache must stay available for other sessions.
122    /// Clone cost on a 5-table JOIN AST is well under the parse cost
123    /// it replaces.
124    pub fn prepare_cached(&mut self, sql: &str) -> Result<Statement, ParseError> {
125        // v6.3.1 — version-aware lookup. If the cached plan was
126        // prepared before the most recent ANALYZE, evict and replan.
127        let current_version = self.statistics.version();
128        if let Some(plan) = self.plan_cache.get(sql) {
129            if plan.statistics_version == current_version {
130                return Ok(plan.stmt.clone());
131            }
132            // Stale entry — fall through to evict + re-prepare.
133        }
134        self.plan_cache.evict(sql);
135        let stmt = self.prepare(sql)?;
136        let source_tables = plan_cache::collect_source_tables(&stmt);
137        let plan = plan_cache::PreparedPlan {
138            stmt: stmt.clone(),
139            statistics_version: current_version,
140            source_tables,
141            describe_columns: alloc::vec::Vec::new(),
142        };
143        self.plan_cache.insert(String::from(sql), plan);
144        Ok(stmt)
145    }
146
147    /// v6.3.0 — read-only accessor for tests and v6.3.1 invalidation.
148    pub fn plan_cache(&self) -> &plan_cache::PlanCache {
149        &self.plan_cache
150    }
151
152    /// v7.38 (mailrs prod 7.35 pool-exhaustion incident) — boot-time
153    /// plan-IR cache warm-up. Walks `sqls`, calls `prepare_cached`
154    /// on each one. Each successful prepare leaves the parsed +
155    /// reordered + clock-rewritten `Statement` in the engine-wide
156    /// plan cache; subsequent `Engine::execute` / `execute_prepared`
157    /// for the same SQL skips parse + JOIN reorder. Returns the
158    /// count of successfully cached statements.
159    ///
160    /// The mailrs `Database::new` boot path is the expected caller:
161    /// pre-warm the top-N query shapes (inbox listing, contacts
162    /// search, stats) so the first user-facing request doesn't
163    /// pay the 2-3 s first-fire cost on the readonly-blocking
164    /// sqlx pool — which (under prod concurrency) exhausts the
165    /// pool and stalls the whole UI.
166    pub fn warm_up_plan_cache(&mut self, sqls: &[&str]) -> usize {
167        let mut warmed = 0;
168        for sql in sqls {
169            if self.prepare_cached(sql).is_ok() {
170                warmed += 1;
171            }
172        }
173        warmed
174    }
175
176    /// v7.38 (mailrs prod 7.35 pool-exhaustion incident) — boot-time
177    /// cold-tier OS page-cache warm-up. Walks every table in the
178    /// active catalog, iterates the cold rows via the existing
179    /// BTree-driven `iter_cold_rows_of_table`, drops the rows on
180    /// the floor. The walk's side effect is that every cold
181    /// segment file gets mmap-read once — the OS page cache then
182    /// serves subsequent queries without disk I/O.
183    ///
184    /// Returns the total cold rows touched across all tables.
185    /// On a hot-only catalog (no `cold_segments` populated) the
186    /// call is a near-no-op.
187    pub fn warm_up_cold_tier(&self) -> usize {
188        let catalog = self.active_catalog();
189        let mut total = 0;
190        for name in catalog.table_names() {
191            if let Some(table) = catalog.get(&name) {
192                let rows = self.iter_cold_rows_of_table(table);
193                total += rows.len();
194            }
195        }
196        total
197    }
198
199    /// v6.3.0 — mutable accessor for v6.3.1 invalidation hooks.
200    pub fn plan_cache_mut(&mut self) -> &mut plan_cache::PlanCache {
201        &mut self.plan_cache
202    }
203
204    /// v6.3.3 — Describe a prepared `Statement` without executing.
205    /// Returns `(parameter_oids, output_columns)`. Empty
206    /// `output_columns` means the statement has no row-producing
207    /// shape we could resolve here (JOIN, subquery, non-SELECT, …)
208    /// — pgwire layer maps that to a `NoData` reply.
209    pub fn describe_prepared(&self, stmt: &Statement) -> (Vec<u32>, Vec<ColumnSchema>) {
210        describe::describe_prepared(stmt, self.active_catalog())
211    }
212
213    /// v6.1.1 — execute a [`Statement`] previously returned by
214    /// [`Engine::prepare`], substituting `Expr::Placeholder(n)`
215    /// nodes for the corresponding [`Value`] in `params` (1-based
216    /// per PG: `$1` → `params[0]`). Bind-time string parameters
217    /// are decoded into typed `Value`s by the pgwire layer before
218    /// this call so the resulting AST hits the same execution
219    /// path as a simple query — no SQL re-parse.
220    ///
221    /// Pgwire's `Execute` (E) message after a `Bind` (B) lands here.
222    pub fn execute_prepared(
223        &mut self,
224        stmt: Statement,
225        params: &[Value],
226    ) -> Result<QueryResult, EngineError> {
227        self.execute_prepared_with_cancel(stmt, params, CancelToken::none())
228    }
229
230    /// v7.37 (SPGS small-query bar) — borrow-based SELECT entry for
231    /// the pgwire `Execute` hot path when the portal has no bound
232    /// parameters. Skips both the AST clone the prepared path used
233    /// to do at the pgwire call site AND the `substitute_
234    /// placeholders` walk (a no-op when params are empty). Caller
235    /// must already hold the engine write lock — read would be
236    /// cleaner, but `current_tx` mutation keeps it `&mut`.
237    pub fn execute_prepared_select_no_params(
238        &mut self,
239        stmt: &spg_sql::ast::SelectStatement,
240        cancel: CancelToken<'_>,
241    ) -> Result<QueryResult, EngineError> {
242        let saved = self.current_tx;
243        self.current_tx = Some(IMPLICIT_TX);
244        let result = self.exec_select_cancel(stmt, cancel);
245        self.current_tx = saved;
246        result
247    }
248
249    /// v7.37 — streaming SELECT for the pgwire `Execute` hot path.
250    /// Emits one `StreamItem::Header(cols)` then one
251    /// `StreamItem::Row(&[&Value])` per surviving row. Returns the
252    /// total row count for the `CommandComplete` tag.
253    ///
254    /// For shapes where the engine can stream directly (non-aggregate
255    /// join projection of bound columns, no ORDER BY / DISTINCT / etc.)
256    /// no `Vec<Row>` is materialised — cell references come straight
257    /// out of the source tables. For non-streamable shapes the engine
258    /// runs the full `exec_select_cancel`, then walks the materialised
259    /// `Vec<Row>` driving the same emit callback (no engine-side win,
260    /// but pgwire dispatches every Execute through one path).
261    pub fn execute_prepared_select_streaming<F>(
262        &mut self,
263        stmt: &spg_sql::ast::SelectStatement,
264        cancel: CancelToken<'_>,
265        mut emit: F,
266    ) -> Result<usize, EngineError>
267    where
268        F: FnMut(StreamItem<'_>) -> Result<(), EngineError>,
269    {
270        let saved = self.current_tx;
271        self.current_tx = Some(IMPLICIT_TX);
272        let inner = self.exec_select_streaming(stmt, cancel, &mut emit);
273        self.current_tx = saved;
274        inner
275    }
276
277    /// v7.37 — internal streaming dispatcher. Phase 1: fall-back path
278    /// only — runs the materialising `exec_select_cancel`, then drives
279    /// the emit callback from the resulting `Vec<Row>`. Phase 2 will
280    /// add a true streaming path for the joined-projection shape.
281    fn exec_select_streaming<F>(
282        &mut self,
283        stmt: &spg_sql::ast::SelectStatement,
284        cancel: CancelToken<'_>,
285        emit: &mut F,
286    ) -> Result<usize, EngineError>
287    where
288        F: FnMut(StreamItem<'_>) -> Result<(), EngineError>,
289    {
290        // v7.37 — true-streaming fast path for joined-non-aggregate
291        // projection of bound columns. Skips `Vec<Row>` + per-cell
292        // `.cloned()` (about 4 ms saved on the 25 k-row PROJ shape).
293        // Unresolved subqueries / pull-up shapes / non-streamable
294        // structure (ORDER BY, DISTINCT, …) fall through to the
295        // materialising path.
296        if !crate::subquery::expr_tree_has_subquery(stmt) {
297            if let Some(n) = self.try_exec_joined_streaming(stmt, cancel, emit)? {
298                return Ok(n);
299            }
300        }
301        // Fall-back: materialise then iterate.
302        let QueryResult::Rows { columns, rows } = self.exec_select_cancel(stmt, cancel)? else {
303            return Err(EngineError::Unsupported(alloc::string::String::from(
304                "streaming SELECT got a non-Rows result",
305            )));
306        };
307        emit(StreamItem::Header(&columns))?;
308        let mut cell_refs: Vec<&Value> = Vec::with_capacity(columns.len());
309        for row in &rows {
310            cell_refs.clear();
311            for v in &row.values {
312                cell_refs.push(v);
313            }
314            emit(StreamItem::Row(&cell_refs))?;
315        }
316        Ok(rows.len())
317    }
318}
319
320/// v7.37 — one item in the streaming SELECT emit channel. The
321/// engine yields exactly one `Header` (before any row) then zero
322/// or more `Row`s. Pgwire (or any other consumer) decides how to
323/// turn those into wire bytes.
324#[derive(Debug)]
325pub enum StreamItem<'a> {
326    Header(&'a [ColumnSchema]),
327    Row(&'a [&'a Value]),
328}
329
330impl Engine {
331
332    /// v7.17.0 Phase 2.3 — prepared-statement entry that honors a
333    /// caller-supplied `CancelToken`. Mirrors `execute_prepared`'s
334    /// `current_tx` save/restore so the extended-query path stays
335    /// transactionally consistent with the simple-query path.
336    pub fn execute_prepared_with_cancel(
337        &mut self,
338        mut stmt: Statement,
339        params: &[Value],
340        cancel: CancelToken<'_>,
341    ) -> Result<QueryResult, EngineError> {
342        substitute_placeholders(&mut stmt, params)?;
343        // v7.16.0 — set `current_tx` for the duration of the
344        // dispatch so the `exec_*` helpers see the right TX
345        // slot (matches what `execute_in_with_cancel` does for
346        // simple-query). Pre-v7.16 the simple-query path
347        // worked because every public entry point routed
348        // through `execute_in_with_cancel`; the prepared path
349        // skipped the wrap and so its INSERTs/UPDATEs landed
350        // in the no-tx default slot, silently invisible to a
351        // BEGIN/COMMIT-bracketed flow. Caught by spg-sqlx's
352        // first transaction-visibility test.
353        let saved = self.current_tx;
354        self.current_tx = Some(IMPLICIT_TX);
355        let result = self.execute_stmt_with_cancel(stmt, cancel);
356        self.current_tx = saved;
357        result
358    }
359
360    fn execute_inner_with_cancel(
361        &mut self,
362        sql: &str,
363        cancel: CancelToken<'_>,
364    ) -> Result<QueryResult, EngineError> {
365        cancel.check()?;
366        let stmt = self.prepare(sql)?;
367        // v6.5.1 — wrap the executor with a wall-clock window so we
368        // can record into spg_stat_query. Skip when the engine has
369        // no clock attached (no_std embedded callers).
370        let start_us = self.clock.map(|f| f());
371        let result = self.execute_stmt_with_cancel(stmt, cancel);
372        if let (Some(t0), Ok(_)) = (start_us, &result) {
373            let now = self.clock.map_or(t0, |f| f());
374            let elapsed = now.saturating_sub(t0).max(0) as u64;
375            self.query_stats.record(sql, elapsed, now as u64);
376            // v6.5.6 — slow-query log: fire callback when elapsed
377            // exceeds the configured floor.
378            if let (Some(threshold), Some(logger)) =
379                (self.slow_query_threshold_us, self.slow_query_logger)
380                && elapsed >= threshold
381            {
382                logger(sql, elapsed);
383            }
384        }
385        result
386    }
387
388    pub(crate) fn execute_stmt_with_cancel(
389        &mut self,
390        stmt: Statement,
391        cancel: CancelToken<'_>,
392    ) -> Result<QueryResult, EngineError> {
393        cancel.check()?;
394        // v7.17.0 Phase 1.1 — pre-resolve nextval / currval /
395        // setval calls in the statement tree. Walks SELECT
396        // projection, INSERT VALUES, UPDATE SET, DELETE WHERE,
397        // and DEFAULT exprs; replaces sequence FunctionCall
398        // nodes with concrete Literal values minted against the
399        // catalog. This is the only place that mutates sequence
400        // state from a SELECT-shaped path (exec_select_cancel is
401        // `&self` and can't reach the catalog mutably).
402        //
403        // Fast-path: when no sequences exist anywhere in the
404        // catalog (the typical hot-path INSERT load), skip the
405        // walker entirely. Single map-emptiness check on the
406        // catalog beats walking every expression on every call.
407        let mut stmt = stmt;
408        // v7.17 dump-compat — the fast-path check
409        // `sequences().is_empty()` skips pre-resolve when no
410        // sequence exists in the *currently active* catalog
411        // snapshot. The committed catalog or the implicit-TX
412        // catalog may legitimately disagree on this between
413        // CREATE SEQUENCE and a later setval(): always run the
414        // resolver — the walk is O(expr-count) and dwarfed by
415        // the parse cost we just paid.
416        self.pre_resolve_sequence_calls_in_statement(&mut stmt)?;
417        let result = match stmt {
418            Statement::CreateTable(s) => self.exec_create_table(s),
419            // v7.9.15 — CREATE EXTENSION is a no-op on SPG. Returns
420            // CommandOk with affected=0; modified_catalog=false so
421            // the WAL doesn't grow a useless entry. mailrs F3.
422            Statement::CreateExtension(_) => Ok(QueryResult::CommandOk {
423                affected: 0,
424                modified_catalog: false,
425            }),
426            // v7.16.2 — DO $$ ... $$ block. mailrs round-10 A.2
427            // — the pre-v7.9.27 no-op SILENTLY swallowed every
428            // mailrs migrate-038/-040/-042 idempotent rename
429            // (the IF EXISTS … THEN ALTER … END block never
430            // ran). v7.16.2 dispatches to exec_do_block which
431            // runs the PlPgSqlBlock at top level via the same
432            // execute_stmts machinery the trigger executor
433            // uses (NEW=None, OLD=None — DO blocks have no
434            // row context).
435            Statement::DoBlock(body) => self.exec_do_block(body),
436            // v7.14.0 — empty-statement no-op for pg_dump /
437            // mysqldump preamble lines that collapse to nothing
438            // after comment-stripping.
439            Statement::Empty => Ok(QueryResult::CommandOk {
440                affected: 0,
441                modified_catalog: false,
442            }),
443            Statement::DropTable { names, if_exists } => self.exec_drop_table(names, if_exists),
444            Statement::DropIndex { name, if_exists } => self.exec_drop_index(name, if_exists),
445            Statement::CreateIndex(s) => self.exec_create_index(s),
446            Statement::Insert(s) => self.exec_insert(s),
447            Statement::Update(mut s) => {
448                // Materialise uncorrelated subqueries in SET / WHERE
449                // before the row walk — the SELECT path has done this
450                // since v4.10; UPDATE gained it for mailrs's
451                // `UPDATE … WHERE id IN (SELECT … FOR UPDATE SKIP
452                // LOCKED)` claim pattern (embed round-12).
453                for (_, e) in &mut s.assignments {
454                    self.resolve_expr_subqueries(e, cancel)?;
455                }
456                if let Some(w) = &mut s.where_ {
457                    self.resolve_expr_subqueries(w, cancel)?;
458                }
459                self.exec_update_cancel(&s, cancel)
460            }
461            Statement::Delete(mut s) => {
462                if let Some(w) = &mut s.where_ {
463                    self.resolve_expr_subqueries(w, cancel)?;
464                }
465                self.exec_delete_cancel(&s, cancel)
466            }
467            Statement::Merge(s) => self.exec_merge_cancel(&s, cancel),
468            Statement::Select(s) => self.exec_select_cancel(&s, cancel),
469            Statement::Begin => self.exec_begin(),
470            Statement::Commit => self.exec_commit(),
471            Statement::Rollback => self.exec_rollback(),
472            Statement::Savepoint(name) => self.exec_savepoint(name),
473            Statement::RollbackToSavepoint(name) => self.exec_rollback_to_savepoint(&name),
474            Statement::ReleaseSavepoint(name) => self.exec_release_savepoint(&name),
475            Statement::ShowTables => Ok(self.exec_show_tables()),
476            Statement::ShowDatabases => Ok(self.exec_show_databases()),
477            Statement::ShowCreateTable(name) => self.exec_show_create_table(&name),
478            Statement::ShowIndexes(name) => self.exec_show_indexes(&name),
479            Statement::ShowStatus => Ok(self.exec_show_status()),
480            Statement::ShowVariables => Ok(self.exec_show_variables()),
481            Statement::ShowProcesslist => Ok(self.exec_show_processlist()),
482            Statement::ShowColumns(table) => self.exec_show_columns(&table),
483            Statement::ShowUsers => Ok(self.exec_show_users()),
484            Statement::ShowPublications => Ok(self.exec_show_publications()),
485            Statement::ShowSubscriptions => Ok(self.exec_show_subscriptions()),
486            Statement::CreateUser(s) => self.exec_create_user(&s),
487            Statement::DropUser(name) => self.exec_drop_user(&name),
488            Statement::Explain(e) => self.exec_explain(&e, cancel),
489            Statement::AlterIndex(s) => self.exec_alter_index(s),
490            Statement::AlterTable(s) => self.exec_alter_table(s),
491            Statement::CreatePublication(s) => self.exec_create_publication(s),
492            Statement::DropPublication(name) => self.exec_drop_publication(&name),
493            Statement::CreateSubscription(s) => self.exec_create_subscription(s),
494            Statement::DropSubscription(name) => self.exec_drop_subscription(&name),
495            // v6.1.7 — WAIT FOR WAL POSITION needs `lag_state`,
496            // which lives in spg-server's ServerState. The engine
497            // surfaces a clear error; the server-layer dispatch
498            // intercepts the SQL before it reaches the engine on
499            // a server build, so this arm only fires for
500            // engine-only callers (spg-embedded, lib tests).
501            Statement::WaitForWalPosition { .. } => Err(EngineError::Unsupported(
502                "WAIT FOR WAL POSITION must be handled by the server layer".into(),
503            )),
504            // v6.2.0 — ANALYZE recomputes per-column histograms.
505            Statement::Analyze(target) => self.exec_analyze(target.as_deref()),
506            // v6.7.3 — COMPACT COLD SEGMENTS.
507            Statement::CompactColdSegments => self.exec_compact_cold_segments(),
508            // v7.12.1 — SET / RESET session parameter. Engine
509            // tracks the value in `session_params`; FTS dispatcher
510            // reads `default_text_search_config`. Everything else
511            // is a recorded no-op (PG dump compat).
512            Statement::SetParameter { name, value } => {
513                self.set_session_param(name, value);
514                Ok(QueryResult::CommandOk {
515                    affected: 0,
516                    modified_catalog: false,
517                })
518            }
519            // v7.14.0 — MySQL multi-assignment SET. Each pair runs
520            // through `set_session_param` so engine-known params
521            // (FOREIGN_KEY_CHECKS, session_replication_role, …) take
522            // effect; unknown pairs (including `@VAR` LHS from the
523            // mysqldump preamble) are recorded then ignored.
524            Statement::SetParameterList(pairs) => {
525                for (name, value) in pairs {
526                    self.set_session_param(name, value);
527                }
528                Ok(QueryResult::CommandOk {
529                    affected: 0,
530                    modified_catalog: false,
531                })
532            }
533            // v7.12.4 — CREATE FUNCTION / CREATE TRIGGER / DROP …
534            // for the PL/pgSQL trigger surface. exec_* methods are
535            // defined alongside the existing CREATE handlers below.
536            Statement::CreateFunction(s) => self.exec_create_function(s),
537            Statement::CreateTrigger(s) => self.exec_create_trigger(s),
538            Statement::DropTrigger {
539                name,
540                table,
541                if_exists,
542            } => self.exec_drop_trigger(&name, &table, if_exists),
543            Statement::DropFunction { name, if_exists } => {
544                self.exec_drop_function(&name, if_exists)
545            }
546            Statement::CreateSequence(s) => self.exec_create_sequence(s),
547            Statement::AlterSequence(s) => self.exec_alter_sequence(s),
548            Statement::DropSequence { names, if_exists } => {
549                self.exec_drop_sequence(&names, if_exists)
550            }
551            Statement::CreateView(s) => self.exec_create_view(s),
552            Statement::DropView { names, if_exists } => self.exec_drop_view(&names, if_exists),
553            Statement::CreateMaterializedView(s) => self.exec_create_materialized_view(s),
554            Statement::RefreshMaterializedView { name, with_data } => {
555                self.exec_refresh_materialized_view(&name, with_data)
556            }
557            Statement::DropMaterializedView { names, if_exists } => {
558                self.exec_drop_materialized_view(&names, if_exists)
559            }
560            Statement::CreateType(s) => self.exec_create_type(s),
561            Statement::DropType { names, if_exists } => self.exec_drop_type(&names, if_exists),
562            Statement::CreateDomain(s) => self.exec_create_domain(s),
563            Statement::DropDomain { names, if_exists } => self.exec_drop_domain(&names, if_exists),
564            Statement::CreateSchema {
565                name,
566                if_not_exists,
567            } => self.exec_create_schema(name, if_not_exists),
568            Statement::DropSchema { names, if_exists } => self.exec_drop_schema(&names, if_exists),
569            Statement::ResetParameter(target) => {
570                match target {
571                    None => self.session_params.clear(),
572                    Some(name) => {
573                        self.session_params.remove(&name.to_ascii_lowercase());
574                    }
575                }
576                Ok(QueryResult::CommandOk {
577                    affected: 0,
578                    modified_catalog: false,
579                })
580            }
581        };
582        self.enforce_row_limit(result)
583    }
584}