spg_engine/execute.rs
1//! Statement execution + prepared-statement dispatch, split out of
2//! `lib.rs` (lib.rs split 17). The public `execute` / `execute_in` /
3//! `execute_with_cancel` entry points, the `prepare` / `prepare_cached`
4//! / `describe_prepared` / `execute_prepared` prepared-statement path,
5//! and the internal pipeline (`execute_inner_with_cancel` →
6//! `execute_stmt_with_cancel`) that pre-resolves clock / sequence /
7//! placeholder rewrites and routes each parsed Statement to its domain
8//! handler (DDL / DML / SELECT / transaction / SHOW / …). Whole
9//! `impl Engine` methods reached via the Engine type, so the public
10//! surface is unchanged; `execute_stmt_with_cancel` is pub(crate) for
11//! the plpgsql + trigger re-entry paths.
12
13use alloc::string::String;
14use alloc::vec::Vec;
15
16use spg_sql::ast::Statement;
17use spg_sql::parser::{self, ParseError};
18use spg_storage::{ColumnSchema, Value};
19
20use crate::describe;
21use crate::{
22 CancelToken, Engine, EngineError, IMPLICIT_TX, QueryResult, TxId, expand_group_by_all,
23 plan_cache, reorder, resolve_order_by_position, rewrite_clock_calls, substitute_placeholders,
24};
25
26impl Engine {
27 pub fn execute(&mut self, sql: &str) -> Result<QueryResult, EngineError> {
28 self.execute_in_with_cancel(sql, IMPLICIT_TX, CancelToken::none())
29 }
30
31 /// v4.5 — write path with cooperative cancellation. Same dispatch
32 /// as `execute_in_with_cancel(sql, IMPLICIT_TX, cancel)`. Kept as
33 /// a separate entry point for backward-compat with the v4.5
34 /// public API.
35 pub fn execute_with_cancel(
36 &mut self,
37 sql: &str,
38 cancel: CancelToken<'_>,
39 ) -> Result<QueryResult, EngineError> {
40 self.execute_in_with_cancel(sql, IMPLICIT_TX, cancel)
41 }
42
43 /// v4.41.1 multi-slot write entry. Routes `sql` through the TX
44 /// slot identified by `tx_id` so spg-server dispatch can scope
45 /// each implicit-wrap BEGIN..stmt..COMMIT to its own slot in
46 /// `tx_catalogs`. `IMPLICIT_TX` is the legacy single-slot path
47 /// every other caller (engine self-tests, replay, spg-embedded)
48 /// implicitly takes via `execute()` / `execute_with_cancel()`.
49 pub fn execute_in(&mut self, sql: &str, tx_id: TxId) -> Result<QueryResult, EngineError> {
50 self.execute_in_with_cancel(sql, tx_id, CancelToken::none())
51 }
52
53 /// v4.41.1 write path with cooperative cancellation + explicit TX
54 /// scope. Sets `self.current_tx` for the duration of the call so
55 /// every `exec_*` helper transparently sees its TX's shadow
56 /// catalog and savepoint stack; restores on exit so the field is
57 /// only valid mid-call (no leakage across calls).
58 pub fn execute_in_with_cancel(
59 &mut self,
60 sql: &str,
61 tx_id: TxId,
62 cancel: CancelToken<'_>,
63 ) -> Result<QueryResult, EngineError> {
64 let saved = self.current_tx;
65 self.current_tx = Some(tx_id);
66 // v7.34 (crash-recovery P0 #2) — row-level redo capture. Arm the
67 // active catalog before dispatch; on success drain the physical
68 // changes into `last_redo` for the embedding layer's WAL, on
69 // failure discard them (a failed statement leaves no redo; the
70 // drain clears the tables' capture buffers either way).
71 if self.redo_capture {
72 self.active_catalog_mut().enable_redo_all();
73 }
74 let result = self.execute_inner_with_cancel(sql, cancel);
75 if self.redo_capture {
76 let drained = self.active_catalog_mut().drain_redo();
77 if result.is_ok() {
78 self.last_redo = drained;
79 }
80 }
81 self.current_tx = saved;
82 result
83 }
84
85 /// v6.1.1 — parse and pre-process a SQL string ONCE so the
86 /// resulting [`Statement`] can be cached and re-executed via
87 /// [`Engine::execute_prepared`]. Returns the same `Statement`
88 /// the simple-query path would synthesise internally (clock
89 /// rewrites + ORDER BY position-ref resolution applied at
90 /// prepare time, since both are session-independent). The
91 /// `$N` placeholders in the SQL stay as `Expr::Placeholder(n)`
92 /// nodes; they're resolved to concrete values per-call by
93 /// `execute_prepared`'s substitution walk.
94 ///
95 /// Pgwire's `Parse` (P) message lands here.
96 pub fn prepare(&self, sql: &str) -> Result<Statement, ParseError> {
97 let mut stmt = parser::parse_statement_with(sql, self.backslash_escapes)?;
98 let now_micros = self.clock.map(|f| f());
99 rewrite_clock_calls(&mut stmt, now_micros);
100 if let Statement::Select(s) = &mut stmt {
101 // v6.4.1 — expand `GROUP BY ALL` to every non-aggregate
102 // SELECT-list item BEFORE position / alias resolution so
103 // downstream passes see the explicit list.
104 expand_group_by_all(s);
105 resolve_order_by_position(s);
106 // v6.2.3 — cost-based JOIN reorder. No-op for
107 // single-table FROMs or any non-INNER join shape.
108 reorder::reorder_joins(s, &self.catalog, &self.statistics);
109 }
110 Ok(stmt)
111 }
112
113 /// v6.3.0 — cached prepare. Returns a cloned `Statement` from
114 /// the plan cache on hit, runs the full `prepare()` path on miss
115 /// and inserts the resulting plan before returning. Skipping the
116 /// parse + JOIN-reorder pipeline on hit is the dominant win for
117 /// JDBC / sqlx / pgx clients that reuse the same SQL string.
118 ///
119 /// Returns a cloned `Statement` (not a borrow) because the
120 /// pgwire layer owns its `PreparedStmt` map per-session and the
121 /// engine-level cache must stay available for other sessions.
122 /// Clone cost on a 5-table JOIN AST is well under the parse cost
123 /// it replaces.
124 pub fn prepare_cached(&mut self, sql: &str) -> Result<Statement, ParseError> {
125 // v6.3.1 — version-aware lookup. If the cached plan was
126 // prepared before the most recent ANALYZE, evict and replan.
127 let current_version = self.statistics.version();
128 if let Some(plan) = self.plan_cache.get(sql) {
129 if plan.statistics_version == current_version {
130 return Ok(plan.stmt.clone());
131 }
132 // Stale entry — fall through to evict + re-prepare.
133 }
134 self.plan_cache.evict(sql);
135 let stmt = self.prepare(sql)?;
136 let source_tables = plan_cache::collect_source_tables(&stmt);
137 let plan = plan_cache::PreparedPlan {
138 stmt: stmt.clone(),
139 statistics_version: current_version,
140 source_tables,
141 describe_columns: alloc::vec::Vec::new(),
142 };
143 self.plan_cache.insert(String::from(sql), plan);
144 Ok(stmt)
145 }
146
147 /// v6.3.0 — read-only accessor for tests and v6.3.1 invalidation.
148 pub fn plan_cache(&self) -> &plan_cache::PlanCache {
149 &self.plan_cache
150 }
151
152 /// v6.3.0 — mutable accessor for v6.3.1 invalidation hooks.
153 pub fn plan_cache_mut(&mut self) -> &mut plan_cache::PlanCache {
154 &mut self.plan_cache
155 }
156
157 /// v6.3.3 — Describe a prepared `Statement` without executing.
158 /// Returns `(parameter_oids, output_columns)`. Empty
159 /// `output_columns` means the statement has no row-producing
160 /// shape we could resolve here (JOIN, subquery, non-SELECT, …)
161 /// — pgwire layer maps that to a `NoData` reply.
162 pub fn describe_prepared(&self, stmt: &Statement) -> (Vec<u32>, Vec<ColumnSchema>) {
163 describe::describe_prepared(stmt, self.active_catalog())
164 }
165
166 /// v6.1.1 — execute a [`Statement`] previously returned by
167 /// [`Engine::prepare`], substituting `Expr::Placeholder(n)`
168 /// nodes for the corresponding [`Value`] in `params` (1-based
169 /// per PG: `$1` → `params[0]`). Bind-time string parameters
170 /// are decoded into typed `Value`s by the pgwire layer before
171 /// this call so the resulting AST hits the same execution
172 /// path as a simple query — no SQL re-parse.
173 ///
174 /// Pgwire's `Execute` (E) message after a `Bind` (B) lands here.
175 pub fn execute_prepared(
176 &mut self,
177 stmt: Statement,
178 params: &[Value],
179 ) -> Result<QueryResult, EngineError> {
180 self.execute_prepared_with_cancel(stmt, params, CancelToken::none())
181 }
182
183 /// v7.37 (SPGS small-query bar) — borrow-based SELECT entry for
184 /// the pgwire `Execute` hot path when the portal has no bound
185 /// parameters. Skips both the AST clone the prepared path used
186 /// to do at the pgwire call site AND the `substitute_
187 /// placeholders` walk (a no-op when params are empty). Caller
188 /// must already hold the engine write lock — read would be
189 /// cleaner, but `current_tx` mutation keeps it `&mut`.
190 pub fn execute_prepared_select_no_params(
191 &mut self,
192 stmt: &spg_sql::ast::SelectStatement,
193 cancel: CancelToken<'_>,
194 ) -> Result<QueryResult, EngineError> {
195 let saved = self.current_tx;
196 self.current_tx = Some(IMPLICIT_TX);
197 let result = self.exec_select_cancel(stmt, cancel);
198 self.current_tx = saved;
199 result
200 }
201
202 /// v7.37 — streaming SELECT for the pgwire `Execute` hot path.
203 /// Emits one `StreamItem::Header(cols)` then one
204 /// `StreamItem::Row(&[&Value])` per surviving row. Returns the
205 /// total row count for the `CommandComplete` tag.
206 ///
207 /// For shapes where the engine can stream directly (non-aggregate
208 /// join projection of bound columns, no ORDER BY / DISTINCT / etc.)
209 /// no `Vec<Row>` is materialised — cell references come straight
210 /// out of the source tables. For non-streamable shapes the engine
211 /// runs the full `exec_select_cancel`, then walks the materialised
212 /// `Vec<Row>` driving the same emit callback (no engine-side win,
213 /// but pgwire dispatches every Execute through one path).
214 pub fn execute_prepared_select_streaming<F>(
215 &mut self,
216 stmt: &spg_sql::ast::SelectStatement,
217 cancel: CancelToken<'_>,
218 mut emit: F,
219 ) -> Result<usize, EngineError>
220 where
221 F: FnMut(StreamItem<'_>) -> Result<(), EngineError>,
222 {
223 let saved = self.current_tx;
224 self.current_tx = Some(IMPLICIT_TX);
225 let inner = self.exec_select_streaming(stmt, cancel, &mut emit);
226 self.current_tx = saved;
227 inner
228 }
229
230 /// v7.37 — internal streaming dispatcher. Phase 1: fall-back path
231 /// only — runs the materialising `exec_select_cancel`, then drives
232 /// the emit callback from the resulting `Vec<Row>`. Phase 2 will
233 /// add a true streaming path for the joined-projection shape.
234 fn exec_select_streaming<F>(
235 &mut self,
236 stmt: &spg_sql::ast::SelectStatement,
237 cancel: CancelToken<'_>,
238 emit: &mut F,
239 ) -> Result<usize, EngineError>
240 where
241 F: FnMut(StreamItem<'_>) -> Result<(), EngineError>,
242 {
243 // v7.37 — true-streaming fast path for joined-non-aggregate
244 // projection of bound columns. Skips `Vec<Row>` + per-cell
245 // `.cloned()` (about 4 ms saved on the 25 k-row PROJ shape).
246 // Unresolved subqueries / pull-up shapes / non-streamable
247 // structure (ORDER BY, DISTINCT, …) fall through to the
248 // materialising path.
249 if !crate::subquery::expr_tree_has_subquery(stmt) {
250 if let Some(n) = self.try_exec_joined_streaming(stmt, cancel, emit)? {
251 return Ok(n);
252 }
253 }
254 // Fall-back: materialise then iterate.
255 let QueryResult::Rows { columns, rows } = self.exec_select_cancel(stmt, cancel)? else {
256 return Err(EngineError::Unsupported(alloc::string::String::from(
257 "streaming SELECT got a non-Rows result",
258 )));
259 };
260 emit(StreamItem::Header(&columns))?;
261 let mut cell_refs: Vec<&Value> = Vec::with_capacity(columns.len());
262 for row in &rows {
263 cell_refs.clear();
264 for v in &row.values {
265 cell_refs.push(v);
266 }
267 emit(StreamItem::Row(&cell_refs))?;
268 }
269 Ok(rows.len())
270 }
271}
272
273/// v7.37 — one item in the streaming SELECT emit channel. The
274/// engine yields exactly one `Header` (before any row) then zero
275/// or more `Row`s. Pgwire (or any other consumer) decides how to
276/// turn those into wire bytes.
277#[derive(Debug)]
278pub enum StreamItem<'a> {
279 Header(&'a [ColumnSchema]),
280 Row(&'a [&'a Value]),
281}
282
283impl Engine {
284
285 /// v7.17.0 Phase 2.3 — prepared-statement entry that honors a
286 /// caller-supplied `CancelToken`. Mirrors `execute_prepared`'s
287 /// `current_tx` save/restore so the extended-query path stays
288 /// transactionally consistent with the simple-query path.
289 pub fn execute_prepared_with_cancel(
290 &mut self,
291 mut stmt: Statement,
292 params: &[Value],
293 cancel: CancelToken<'_>,
294 ) -> Result<QueryResult, EngineError> {
295 substitute_placeholders(&mut stmt, params)?;
296 // v7.16.0 — set `current_tx` for the duration of the
297 // dispatch so the `exec_*` helpers see the right TX
298 // slot (matches what `execute_in_with_cancel` does for
299 // simple-query). Pre-v7.16 the simple-query path
300 // worked because every public entry point routed
301 // through `execute_in_with_cancel`; the prepared path
302 // skipped the wrap and so its INSERTs/UPDATEs landed
303 // in the no-tx default slot, silently invisible to a
304 // BEGIN/COMMIT-bracketed flow. Caught by spg-sqlx's
305 // first transaction-visibility test.
306 let saved = self.current_tx;
307 self.current_tx = Some(IMPLICIT_TX);
308 let result = self.execute_stmt_with_cancel(stmt, cancel);
309 self.current_tx = saved;
310 result
311 }
312
313 fn execute_inner_with_cancel(
314 &mut self,
315 sql: &str,
316 cancel: CancelToken<'_>,
317 ) -> Result<QueryResult, EngineError> {
318 cancel.check()?;
319 let stmt = self.prepare(sql)?;
320 // v6.5.1 — wrap the executor with a wall-clock window so we
321 // can record into spg_stat_query. Skip when the engine has
322 // no clock attached (no_std embedded callers).
323 let start_us = self.clock.map(|f| f());
324 let result = self.execute_stmt_with_cancel(stmt, cancel);
325 if let (Some(t0), Ok(_)) = (start_us, &result) {
326 let now = self.clock.map_or(t0, |f| f());
327 let elapsed = now.saturating_sub(t0).max(0) as u64;
328 self.query_stats.record(sql, elapsed, now as u64);
329 // v6.5.6 — slow-query log: fire callback when elapsed
330 // exceeds the configured floor.
331 if let (Some(threshold), Some(logger)) =
332 (self.slow_query_threshold_us, self.slow_query_logger)
333 && elapsed >= threshold
334 {
335 logger(sql, elapsed);
336 }
337 }
338 result
339 }
340
341 pub(crate) fn execute_stmt_with_cancel(
342 &mut self,
343 stmt: Statement,
344 cancel: CancelToken<'_>,
345 ) -> Result<QueryResult, EngineError> {
346 cancel.check()?;
347 // v7.17.0 Phase 1.1 — pre-resolve nextval / currval /
348 // setval calls in the statement tree. Walks SELECT
349 // projection, INSERT VALUES, UPDATE SET, DELETE WHERE,
350 // and DEFAULT exprs; replaces sequence FunctionCall
351 // nodes with concrete Literal values minted against the
352 // catalog. This is the only place that mutates sequence
353 // state from a SELECT-shaped path (exec_select_cancel is
354 // `&self` and can't reach the catalog mutably).
355 //
356 // Fast-path: when no sequences exist anywhere in the
357 // catalog (the typical hot-path INSERT load), skip the
358 // walker entirely. Single map-emptiness check on the
359 // catalog beats walking every expression on every call.
360 let mut stmt = stmt;
361 // v7.17 dump-compat — the fast-path check
362 // `sequences().is_empty()` skips pre-resolve when no
363 // sequence exists in the *currently active* catalog
364 // snapshot. The committed catalog or the implicit-TX
365 // catalog may legitimately disagree on this between
366 // CREATE SEQUENCE and a later setval(): always run the
367 // resolver — the walk is O(expr-count) and dwarfed by
368 // the parse cost we just paid.
369 self.pre_resolve_sequence_calls_in_statement(&mut stmt)?;
370 let result = match stmt {
371 Statement::CreateTable(s) => self.exec_create_table(s),
372 // v7.9.15 — CREATE EXTENSION is a no-op on SPG. Returns
373 // CommandOk with affected=0; modified_catalog=false so
374 // the WAL doesn't grow a useless entry. mailrs F3.
375 Statement::CreateExtension(_) => Ok(QueryResult::CommandOk {
376 affected: 0,
377 modified_catalog: false,
378 }),
379 // v7.16.2 — DO $$ ... $$ block. mailrs round-10 A.2
380 // — the pre-v7.9.27 no-op SILENTLY swallowed every
381 // mailrs migrate-038/-040/-042 idempotent rename
382 // (the IF EXISTS … THEN ALTER … END block never
383 // ran). v7.16.2 dispatches to exec_do_block which
384 // runs the PlPgSqlBlock at top level via the same
385 // execute_stmts machinery the trigger executor
386 // uses (NEW=None, OLD=None — DO blocks have no
387 // row context).
388 Statement::DoBlock(body) => self.exec_do_block(body),
389 // v7.14.0 — empty-statement no-op for pg_dump /
390 // mysqldump preamble lines that collapse to nothing
391 // after comment-stripping.
392 Statement::Empty => Ok(QueryResult::CommandOk {
393 affected: 0,
394 modified_catalog: false,
395 }),
396 Statement::DropTable { names, if_exists } => self.exec_drop_table(names, if_exists),
397 Statement::DropIndex { name, if_exists } => self.exec_drop_index(name, if_exists),
398 Statement::CreateIndex(s) => self.exec_create_index(s),
399 Statement::Insert(s) => self.exec_insert(s),
400 Statement::Update(mut s) => {
401 // Materialise uncorrelated subqueries in SET / WHERE
402 // before the row walk — the SELECT path has done this
403 // since v4.10; UPDATE gained it for mailrs's
404 // `UPDATE … WHERE id IN (SELECT … FOR UPDATE SKIP
405 // LOCKED)` claim pattern (embed round-12).
406 for (_, e) in &mut s.assignments {
407 self.resolve_expr_subqueries(e, cancel)?;
408 }
409 if let Some(w) = &mut s.where_ {
410 self.resolve_expr_subqueries(w, cancel)?;
411 }
412 self.exec_update_cancel(&s, cancel)
413 }
414 Statement::Delete(mut s) => {
415 if let Some(w) = &mut s.where_ {
416 self.resolve_expr_subqueries(w, cancel)?;
417 }
418 self.exec_delete_cancel(&s, cancel)
419 }
420 Statement::Merge(s) => self.exec_merge_cancel(&s, cancel),
421 Statement::Select(s) => self.exec_select_cancel(&s, cancel),
422 Statement::Begin => self.exec_begin(),
423 Statement::Commit => self.exec_commit(),
424 Statement::Rollback => self.exec_rollback(),
425 Statement::Savepoint(name) => self.exec_savepoint(name),
426 Statement::RollbackToSavepoint(name) => self.exec_rollback_to_savepoint(&name),
427 Statement::ReleaseSavepoint(name) => self.exec_release_savepoint(&name),
428 Statement::ShowTables => Ok(self.exec_show_tables()),
429 Statement::ShowDatabases => Ok(self.exec_show_databases()),
430 Statement::ShowCreateTable(name) => self.exec_show_create_table(&name),
431 Statement::ShowIndexes(name) => self.exec_show_indexes(&name),
432 Statement::ShowStatus => Ok(self.exec_show_status()),
433 Statement::ShowVariables => Ok(self.exec_show_variables()),
434 Statement::ShowProcesslist => Ok(self.exec_show_processlist()),
435 Statement::ShowColumns(table) => self.exec_show_columns(&table),
436 Statement::ShowUsers => Ok(self.exec_show_users()),
437 Statement::ShowPublications => Ok(self.exec_show_publications()),
438 Statement::ShowSubscriptions => Ok(self.exec_show_subscriptions()),
439 Statement::CreateUser(s) => self.exec_create_user(&s),
440 Statement::DropUser(name) => self.exec_drop_user(&name),
441 Statement::Explain(e) => self.exec_explain(&e, cancel),
442 Statement::AlterIndex(s) => self.exec_alter_index(s),
443 Statement::AlterTable(s) => self.exec_alter_table(s),
444 Statement::CreatePublication(s) => self.exec_create_publication(s),
445 Statement::DropPublication(name) => self.exec_drop_publication(&name),
446 Statement::CreateSubscription(s) => self.exec_create_subscription(s),
447 Statement::DropSubscription(name) => self.exec_drop_subscription(&name),
448 // v6.1.7 — WAIT FOR WAL POSITION needs `lag_state`,
449 // which lives in spg-server's ServerState. The engine
450 // surfaces a clear error; the server-layer dispatch
451 // intercepts the SQL before it reaches the engine on
452 // a server build, so this arm only fires for
453 // engine-only callers (spg-embedded, lib tests).
454 Statement::WaitForWalPosition { .. } => Err(EngineError::Unsupported(
455 "WAIT FOR WAL POSITION must be handled by the server layer".into(),
456 )),
457 // v6.2.0 — ANALYZE recomputes per-column histograms.
458 Statement::Analyze(target) => self.exec_analyze(target.as_deref()),
459 // v6.7.3 — COMPACT COLD SEGMENTS.
460 Statement::CompactColdSegments => self.exec_compact_cold_segments(),
461 // v7.12.1 — SET / RESET session parameter. Engine
462 // tracks the value in `session_params`; FTS dispatcher
463 // reads `default_text_search_config`. Everything else
464 // is a recorded no-op (PG dump compat).
465 Statement::SetParameter { name, value } => {
466 self.set_session_param(name, value);
467 Ok(QueryResult::CommandOk {
468 affected: 0,
469 modified_catalog: false,
470 })
471 }
472 // v7.14.0 — MySQL multi-assignment SET. Each pair runs
473 // through `set_session_param` so engine-known params
474 // (FOREIGN_KEY_CHECKS, session_replication_role, …) take
475 // effect; unknown pairs (including `@VAR` LHS from the
476 // mysqldump preamble) are recorded then ignored.
477 Statement::SetParameterList(pairs) => {
478 for (name, value) in pairs {
479 self.set_session_param(name, value);
480 }
481 Ok(QueryResult::CommandOk {
482 affected: 0,
483 modified_catalog: false,
484 })
485 }
486 // v7.12.4 — CREATE FUNCTION / CREATE TRIGGER / DROP …
487 // for the PL/pgSQL trigger surface. exec_* methods are
488 // defined alongside the existing CREATE handlers below.
489 Statement::CreateFunction(s) => self.exec_create_function(s),
490 Statement::CreateTrigger(s) => self.exec_create_trigger(s),
491 Statement::DropTrigger {
492 name,
493 table,
494 if_exists,
495 } => self.exec_drop_trigger(&name, &table, if_exists),
496 Statement::DropFunction { name, if_exists } => {
497 self.exec_drop_function(&name, if_exists)
498 }
499 Statement::CreateSequence(s) => self.exec_create_sequence(s),
500 Statement::AlterSequence(s) => self.exec_alter_sequence(s),
501 Statement::DropSequence { names, if_exists } => {
502 self.exec_drop_sequence(&names, if_exists)
503 }
504 Statement::CreateView(s) => self.exec_create_view(s),
505 Statement::DropView { names, if_exists } => self.exec_drop_view(&names, if_exists),
506 Statement::CreateMaterializedView(s) => self.exec_create_materialized_view(s),
507 Statement::RefreshMaterializedView { name, with_data } => {
508 self.exec_refresh_materialized_view(&name, with_data)
509 }
510 Statement::DropMaterializedView { names, if_exists } => {
511 self.exec_drop_materialized_view(&names, if_exists)
512 }
513 Statement::CreateType(s) => self.exec_create_type(s),
514 Statement::DropType { names, if_exists } => self.exec_drop_type(&names, if_exists),
515 Statement::CreateDomain(s) => self.exec_create_domain(s),
516 Statement::DropDomain { names, if_exists } => self.exec_drop_domain(&names, if_exists),
517 Statement::CreateSchema {
518 name,
519 if_not_exists,
520 } => self.exec_create_schema(name, if_not_exists),
521 Statement::DropSchema { names, if_exists } => self.exec_drop_schema(&names, if_exists),
522 Statement::ResetParameter(target) => {
523 match target {
524 None => self.session_params.clear(),
525 Some(name) => {
526 self.session_params.remove(&name.to_ascii_lowercase());
527 }
528 }
529 Ok(QueryResult::CommandOk {
530 affected: 0,
531 modified_catalog: false,
532 })
533 }
534 };
535 self.enforce_row_limit(result)
536 }
537}