Skip to main content

icydb_core/db/session/sql/
mod.rs

1//! Module: db::session::sql
2//! Responsibility: session-owned SQL execution, explain, projection, and
3//! surface-classification helpers above lowered SQL commands.
4//! Does not own: SQL parsing or structural executor runtime behavior.
5//! Boundary: keeps session visibility, authority selection, and SQL surface routing in one subsystem.
6
7mod attribution;
8mod cache;
9mod compile;
10mod compile_cache;
11mod compiled;
12mod execute;
13mod projection;
14mod result;
15
16#[cfg(feature = "diagnostics")]
17use crate::db::DataStore;
18#[cfg(feature = "diagnostics")]
19use crate::db::executor::{
20    current_pure_covering_decode_local_instructions,
21    current_pure_covering_row_assembly_local_instructions,
22};
23#[cfg(test)]
24use crate::db::sql::parser::parse_sql;
25#[cfg(feature = "diagnostics")]
26use crate::db::{GroupedCountAttribution, GroupedExecutionAttribution};
27use crate::{
28    db::{
29        DbSession, PersistedRow, QueryError,
30        executor::{EntityAuthority, SharedPreparedExecutionPlan},
31        query::intent::StructuralQuery,
32        schema::{AcceptedSchemaSnapshot, SchemaInfo},
33        schema::{
34            execute_sql_ddl_expression_index_addition, execute_sql_ddl_field_addition,
35            execute_sql_ddl_field_default_change, execute_sql_ddl_field_drop,
36            execute_sql_ddl_field_nullability_change, execute_sql_ddl_field_path_index_addition,
37            execute_sql_ddl_field_rename, execute_sql_ddl_secondary_index_drop,
38        },
39        session::query::QueryPlanCacheAttribution,
40        session::sql::projection::{
41            projection_fixed_scales_from_projection_spec, projection_labels_from_projection_spec,
42        },
43        sql::{
44            ddl::{PreparedSqlDdlCommand, prepare_sql_ddl_statement},
45            parser::{SqlDdlStatement, SqlExplainTarget, SqlStatement, parse_sql_with_attribution},
46        },
47    },
48    traits::{CanisterKind, EntityValue, Path},
49};
50
51pub(in crate::db::session::sql) use crate::db::diagnostics::measure_local_instruction_delta as measure_sql_stage;
52pub use crate::db::sql::ddl::{SqlDdlExecutionStatus, SqlDdlMutationKind, SqlDdlPreparationReport};
53#[cfg(feature = "diagnostics")]
54pub(in crate::db) use attribution::SqlExecutePhaseAttribution;
55#[cfg(feature = "diagnostics")]
56pub use attribution::{
57    SqlCompileAttribution, SqlExecutionAttribution, SqlPureCoveringAttribution,
58    SqlQueryCacheAttribution, SqlQueryExecutionAttribution, SqlScalarAggregateAttribution,
59};
60pub(in crate::db) use cache::{SqlCacheAttribution, SqlCompiledCommandCacheKey};
61pub(in crate::db::session::sql) use cache::{
62    SqlCompiledCommandSurface, sql_compiled_command_cache_miss_reason,
63};
64pub(in crate::db::session::sql) use compile::{
65    SqlCompileAttributionBuilder, SqlCompilePhaseAttribution,
66};
67pub(in crate::db) use compiled::{CompiledSqlCommand, SqlProjectionContract};
68pub use result::SqlStatementResult;
69
70/// Parsed SQL endpoint surface used by generated SQL helper dispatch.
71#[derive(Clone, Copy, Debug, Eq, PartialEq)]
72#[doc(hidden)]
73pub enum SqlStatementSurface {
74    /// SQL handled by the readonly query endpoint.
75    Query,
76    /// SQL handled by the DDL/update endpoint.
77    Ddl,
78}
79
80#[cfg(all(test, not(feature = "diagnostics")))]
81pub(crate) use crate::db::session::sql::projection::with_sql_projection_materialization_metrics;
82#[cfg(feature = "diagnostics")]
83pub use crate::db::session::sql::projection::{
84    SqlProjectionMaterializationMetrics, with_sql_projection_materialization_metrics,
85};
86
87// Keep parsing as a module-owned helper instead of hanging a pure parser off
88// `DbSession` as a fake session method.
89#[cfg(test)]
90pub(in crate::db) fn parse_sql_statement(sql: &str) -> Result<SqlStatement, QueryError> {
91    parse_sql(sql).map_err(QueryError::from_sql_parse_error)
92}
93
94/// Return the entity identifier targeted by one reduced SQL statement.
95///
96/// `SHOW ENTITIES` intentionally has no entity target; callers that dispatch
97/// across canister-owned entities may route it through any accepted entity.
98#[doc(hidden)]
99pub fn sql_statement_entity_name(sql: &str) -> Result<Option<String>, QueryError> {
100    let (statement, _) =
101        parse_sql_with_attribution(sql).map_err(QueryError::from_sql_parse_error)?;
102
103    Ok(sql_statement_entity_name_from_statement(&statement).map(str::to_string))
104}
105
106/// Return the generated endpoint surface required by one reduced SQL statement.
107#[doc(hidden)]
108pub fn sql_statement_surface(sql: &str) -> Result<SqlStatementSurface, QueryError> {
109    let (statement, _) =
110        parse_sql_with_attribution(sql).map_err(QueryError::from_sql_parse_error)?;
111
112    Ok(sql_statement_surface_from_statement(&statement))
113}
114
115const fn sql_statement_surface_from_statement(statement: &SqlStatement) -> SqlStatementSurface {
116    match statement {
117        SqlStatement::Ddl(_) => SqlStatementSurface::Ddl,
118        SqlStatement::Select(_)
119        | SqlStatement::Delete(_)
120        | SqlStatement::Insert(_)
121        | SqlStatement::Update(_)
122        | SqlStatement::Explain(_)
123        | SqlStatement::Describe(_)
124        | SqlStatement::ShowIndexes(_)
125        | SqlStatement::ShowColumns(_)
126        | SqlStatement::ShowEntities(_) => SqlStatementSurface::Query,
127    }
128}
129
130const fn sql_statement_entity_name_from_statement(statement: &SqlStatement) -> Option<&str> {
131    match statement {
132        SqlStatement::Select(statement) => Some(statement.entity.as_str()),
133        SqlStatement::Delete(statement) => Some(statement.entity.as_str()),
134        SqlStatement::Insert(statement) => Some(statement.entity.as_str()),
135        SqlStatement::Update(statement) => Some(statement.entity.as_str()),
136        SqlStatement::Ddl(SqlDdlStatement::CreateIndex(statement)) => {
137            Some(statement.entity.as_str())
138        }
139        SqlStatement::Ddl(SqlDdlStatement::DropIndex(statement)) => match &statement.entity {
140            Some(entity) => Some(entity.as_str()),
141            None => None,
142        },
143        SqlStatement::Ddl(SqlDdlStatement::AlterTableAddColumn(statement)) => {
144            Some(statement.entity.as_str())
145        }
146        SqlStatement::Ddl(SqlDdlStatement::AlterTableAlterColumn(statement)) => {
147            Some(statement.entity.as_str())
148        }
149        SqlStatement::Ddl(SqlDdlStatement::AlterTableDropColumn(statement)) => {
150            Some(statement.entity.as_str())
151        }
152        SqlStatement::Ddl(SqlDdlStatement::AlterTableRenameColumn(statement)) => {
153            Some(statement.entity.as_str())
154        }
155        SqlStatement::Explain(statement) => match &statement.statement {
156            SqlExplainTarget::Select(statement) => Some(statement.entity.as_str()),
157            SqlExplainTarget::Delete(statement) => Some(statement.entity.as_str()),
158        },
159        SqlStatement::Describe(statement) => Some(statement.entity.as_str()),
160        SqlStatement::ShowIndexes(statement) => Some(statement.entity.as_str()),
161        SqlStatement::ShowColumns(statement) => Some(statement.entity.as_str()),
162        SqlStatement::ShowEntities(_) => None,
163    }
164}
165
166// Measure one SQL compile stage and immediately surface the stage result. The
167// helper keeps attribution capture uniform while avoiding repeated
168// `(cost, result); result?` boilerplate across the compile pipeline.
169fn measured<T>(stage: impl FnOnce() -> Result<T, QueryError>) -> Result<(u64, T), QueryError> {
170    let (local_instructions, result) = measure_sql_stage(stage);
171    let value = result?;
172
173    Ok((local_instructions, value))
174}
175
176impl<C: CanisterKind> DbSession<C> {
177    // Resolve one SQL SELECT through a caller-selected accepted authority and
178    // accepted schema snapshot. Typed SQL entrypoints use this to avoid passing
179    // generated authority through the runtime cache boundary.
180    fn sql_select_prepared_plan_for_accepted_authority(
181        &self,
182        query: &StructuralQuery,
183        authority: EntityAuthority,
184        accepted_schema: &AcceptedSchemaSnapshot,
185    ) -> Result<
186        (
187            SharedPreparedExecutionPlan,
188            SqlProjectionContract,
189            SqlCacheAttribution,
190        ),
191        QueryError,
192    > {
193        let (prepared_plan, cache_attribution) = self
194            .cached_shared_query_plan_for_accepted_authority(
195                authority.clone(),
196                accepted_schema,
197                query,
198            )?;
199        Ok(Self::sql_select_projection_from_prepared_plan(
200            prepared_plan,
201            authority,
202            cache_attribution,
203        ))
204    }
205
206    // Resolve one typed SQL SELECT through accepted schema authority selected
207    // at the session boundary.
208    fn sql_select_prepared_plan_for_entity<E>(
209        &self,
210        query: &StructuralQuery,
211    ) -> Result<
212        (
213            SharedPreparedExecutionPlan,
214            SqlProjectionContract,
215            SqlCacheAttribution,
216        ),
217        QueryError,
218    >
219    where
220        E: PersistedRow<Canister = C> + EntityValue,
221    {
222        let (accepted_schema, authority) = self
223            .accepted_entity_authority::<E>()
224            .map_err(QueryError::execute)?;
225
226        self.sql_select_prepared_plan_for_accepted_authority(query, authority, &accepted_schema)
227    }
228
229    fn sql_select_projection_from_prepared_plan(
230        prepared_plan: SharedPreparedExecutionPlan,
231        authority: EntityAuthority,
232        cache_attribution: QueryPlanCacheAttribution,
233    ) -> (
234        SharedPreparedExecutionPlan,
235        SqlProjectionContract,
236        SqlCacheAttribution,
237    ) {
238        let projection_spec = prepared_plan
239            .logical_plan()
240            .projection_spec(authority.model());
241        let projection = SqlProjectionContract::new(
242            projection_labels_from_projection_spec(&projection_spec),
243            projection_fixed_scales_from_projection_spec(&projection_spec),
244        );
245
246        (
247            prepared_plan,
248            projection,
249            SqlCacheAttribution::from_shared_query_plan_cache(cache_attribution),
250        )
251    }
252
253    // Keep query/update surface gating owned by one helper so the SQL
254    // compiled-command lane does not duplicate the same statement-family split
255    // just to change the outward error wording.
256    fn ensure_sql_statement_supported_for_surface(
257        statement: &SqlStatement,
258        surface: SqlCompiledCommandSurface,
259    ) -> Result<(), QueryError> {
260        match (surface, statement) {
261            (
262                SqlCompiledCommandSurface::Query,
263                SqlStatement::Select(_)
264                | SqlStatement::Explain(_)
265                | SqlStatement::Describe(_)
266                | SqlStatement::ShowIndexes(_)
267                | SqlStatement::ShowColumns(_)
268                | SqlStatement::ShowEntities(_),
269            )
270            | (
271                SqlCompiledCommandSurface::Update,
272                SqlStatement::Insert(_) | SqlStatement::Update(_) | SqlStatement::Delete(_),
273            ) => Ok(()),
274            (_, SqlStatement::Ddl(_)) => Err(QueryError::unsupported_query(
275                "SQL DDL execution is not supported in this release",
276            )),
277            (SqlCompiledCommandSurface::Query, SqlStatement::Insert(_)) => {
278                Err(QueryError::unsupported_query(
279                    "execute_sql_query rejects INSERT; use execute_sql_update::<E>()",
280                ))
281            }
282            (SqlCompiledCommandSurface::Query, SqlStatement::Update(_)) => {
283                Err(QueryError::unsupported_query(
284                    "execute_sql_query rejects UPDATE; use execute_sql_update::<E>()",
285                ))
286            }
287            (SqlCompiledCommandSurface::Query, SqlStatement::Delete(_)) => {
288                Err(QueryError::unsupported_query(
289                    "execute_sql_query rejects DELETE; use execute_sql_update::<E>()",
290                ))
291            }
292            (SqlCompiledCommandSurface::Update, SqlStatement::Select(_)) => {
293                Err(QueryError::unsupported_query(
294                    "execute_sql_update rejects SELECT; use execute_sql_query::<E>()",
295                ))
296            }
297            (SqlCompiledCommandSurface::Update, SqlStatement::Explain(_)) => {
298                Err(QueryError::unsupported_query(
299                    "execute_sql_update rejects EXPLAIN; use execute_sql_query::<E>()",
300                ))
301            }
302            (SqlCompiledCommandSurface::Update, SqlStatement::Describe(_)) => {
303                Err(QueryError::unsupported_query(
304                    "execute_sql_update rejects DESCRIBE; use execute_sql_query::<E>()",
305                ))
306            }
307            (SqlCompiledCommandSurface::Update, SqlStatement::ShowIndexes(_)) => {
308                Err(QueryError::unsupported_query(
309                    "execute_sql_update rejects SHOW INDEXES; use execute_sql_query::<E>()",
310                ))
311            }
312            (SqlCompiledCommandSurface::Update, SqlStatement::ShowColumns(_)) => {
313                Err(QueryError::unsupported_query(
314                    "execute_sql_update rejects SHOW COLUMNS; use execute_sql_query::<E>()",
315                ))
316            }
317            (SqlCompiledCommandSurface::Update, SqlStatement::ShowEntities(_)) => {
318                Err(QueryError::unsupported_query(
319                    "execute_sql_update rejects SHOW ENTITIES; use execute_sql_query::<E>()",
320                ))
321            }
322        }
323    }
324
325    /// Execute one single-entity reduced SQL query or introspection statement.
326    ///
327    /// This surface stays hard-bound to `E`, rejects state-changing SQL, and
328    /// returns SQL-shaped statement output instead of typed entities.
329    pub fn execute_sql_query<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
330    where
331        E: PersistedRow<Canister = C> + EntityValue,
332    {
333        let compiled = self.compile_sql_query::<E>(sql)?;
334
335        self.execute_compiled_sql_owned::<E>(compiled)
336    }
337
338    /// Execute one reduced SQL query while reporting the compile/execute split
339    /// at the top-level SQL seam.
340    #[cfg(feature = "diagnostics")]
341    #[doc(hidden)]
342    pub fn execute_sql_query_with_attribution<E>(
343        &self,
344        sql: &str,
345    ) -> Result<(SqlStatementResult, SqlQueryExecutionAttribution), QueryError>
346    where
347        E: PersistedRow<Canister = C> + EntityValue,
348    {
349        // Phase 1: measure the compile side of the new seam, including parse,
350        // surface validation, and semantic command construction.
351        let (compile_local_instructions, compiled) =
352            measure_sql_stage(|| self.compile_sql_query_with_cache_attribution::<E>(sql));
353        let (compiled, compile_cache_attribution, compile_phase_attribution) = compiled?;
354
355        // Phase 2: measure the execute side separately so repeat-run cache
356        // experiments can prove which side actually moved.
357        let store_get_calls_before = DataStore::current_get_call_count();
358        let pure_covering_decode_before = current_pure_covering_decode_local_instructions();
359        let pure_covering_row_assembly_before =
360            current_pure_covering_row_assembly_local_instructions();
361        let (result, execute_cache_attribution, execute_phase_attribution) =
362            self.execute_compiled_sql_with_phase_attribution::<E>(&compiled)?;
363        let store_get_calls =
364            DataStore::current_get_call_count().saturating_sub(store_get_calls_before);
365        let pure_covering_decode_local_instructions =
366            current_pure_covering_decode_local_instructions()
367                .saturating_sub(pure_covering_decode_before);
368        let pure_covering_row_assembly_local_instructions =
369            current_pure_covering_row_assembly_local_instructions()
370                .saturating_sub(pure_covering_row_assembly_before);
371        let execute_local_instructions = execute_phase_attribution
372            .planner_local_instructions
373            .saturating_add(execute_phase_attribution.store_local_instructions)
374            .saturating_add(execute_phase_attribution.executor_local_instructions)
375            .saturating_add(execute_phase_attribution.response_finalization_local_instructions);
376        let cache_attribution = compile_cache_attribution.merge(execute_cache_attribution);
377        let total_local_instructions =
378            compile_local_instructions.saturating_add(execute_local_instructions);
379        let grouped = matches!(&result, SqlStatementResult::Grouped { .. }).then_some(
380            GroupedExecutionAttribution {
381                stream_local_instructions: execute_phase_attribution
382                    .grouped_stream_local_instructions,
383                fold_local_instructions: execute_phase_attribution.grouped_fold_local_instructions,
384                finalize_local_instructions: execute_phase_attribution
385                    .grouped_finalize_local_instructions,
386                count: GroupedCountAttribution::from_executor(
387                    execute_phase_attribution.grouped_count,
388                ),
389            },
390        );
391        let pure_covering = (pure_covering_decode_local_instructions > 0
392            || pure_covering_row_assembly_local_instructions > 0)
393            .then_some(SqlPureCoveringAttribution {
394                decode_local_instructions: pure_covering_decode_local_instructions,
395                row_assembly_local_instructions: pure_covering_row_assembly_local_instructions,
396            });
397
398        Ok((
399            result,
400            SqlQueryExecutionAttribution {
401                compile_local_instructions,
402                compile: SqlCompileAttribution {
403                    cache_key_local_instructions: compile_phase_attribution.cache_key,
404                    cache_lookup_local_instructions: compile_phase_attribution.cache_lookup,
405                    parse_local_instructions: compile_phase_attribution.parse,
406                    parse_tokenize_local_instructions: compile_phase_attribution.parse_tokenize,
407                    parse_select_local_instructions: compile_phase_attribution.parse_select,
408                    parse_expr_local_instructions: compile_phase_attribution.parse_expr,
409                    parse_predicate_local_instructions: compile_phase_attribution.parse_predicate,
410                    aggregate_lane_check_local_instructions: compile_phase_attribution
411                        .aggregate_lane_check,
412                    prepare_local_instructions: compile_phase_attribution.prepare,
413                    lower_local_instructions: compile_phase_attribution.lower,
414                    bind_local_instructions: compile_phase_attribution.bind,
415                    cache_insert_local_instructions: compile_phase_attribution.cache_insert,
416                },
417                plan_lookup_local_instructions: execute_phase_attribution
418                    .planner_local_instructions,
419                execution: SqlExecutionAttribution {
420                    planner_local_instructions: execute_phase_attribution
421                        .planner_local_instructions,
422                    store_local_instructions: execute_phase_attribution.store_local_instructions,
423                    executor_invocation_local_instructions: execute_phase_attribution
424                        .executor_invocation_local_instructions,
425                    executor_local_instructions: execute_phase_attribution
426                        .executor_local_instructions,
427                    response_finalization_local_instructions: execute_phase_attribution
428                        .response_finalization_local_instructions,
429                },
430                grouped,
431                scalar_aggregate: SqlScalarAggregateAttribution::from_executor(
432                    execute_phase_attribution.scalar_aggregate_terminal,
433                ),
434                pure_covering,
435                store_get_calls,
436                response_decode_local_instructions: 0,
437                execute_local_instructions,
438                total_local_instructions,
439                cache: SqlQueryCacheAttribution {
440                    sql_compiled_command_hits: cache_attribution.sql_compiled_command_cache_hits,
441                    sql_compiled_command_misses: cache_attribution
442                        .sql_compiled_command_cache_misses,
443                    shared_query_plan_hits: cache_attribution.shared_query_plan_cache_hits,
444                    shared_query_plan_misses: cache_attribution.shared_query_plan_cache_misses,
445                },
446            },
447        ))
448    }
449
450    /// Execute one single-entity reduced SQL mutation statement.
451    ///
452    /// This surface stays hard-bound to `E`, rejects read-only SQL, and
453    /// returns SQL-shaped mutation output such as counts or `RETURNING` rows.
454    pub fn execute_sql_update<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
455    where
456        E: PersistedRow<Canister = C> + EntityValue,
457    {
458        let compiled = self.compile_sql_update::<E>(sql)?;
459
460        self.execute_compiled_sql_owned::<E>(compiled)
461    }
462
463    /// Prepare one SQL DDL statement against the accepted schema catalog.
464    ///
465    /// This is a non-executing surface: it proves the statement can bind,
466    /// derive an accepted-after snapshot, and pass schema mutation admission,
467    /// then returns a prepared-only report without mutating schema or index
468    /// storage.
469    pub fn prepare_sql_ddl<E>(&self, sql: &str) -> Result<SqlDdlPreparationReport, QueryError>
470    where
471        E: PersistedRow<Canister = C> + EntityValue,
472    {
473        let (_, prepared) = self.prepare_sql_ddl_command::<E>(sql)?;
474
475        Ok(prepared.report().clone())
476    }
477
478    fn prepare_sql_ddl_command<E>(
479        &self,
480        sql: &str,
481    ) -> Result<(AcceptedSchemaSnapshot, PreparedSqlDdlCommand), QueryError>
482    where
483        E: PersistedRow<Canister = C> + EntityValue,
484    {
485        let (statement, _) =
486            parse_sql_with_attribution(sql).map_err(QueryError::from_sql_parse_error)?;
487        let (accepted_schema, _) = self
488            .accepted_entity_authority::<E>()
489            .map_err(QueryError::execute)?;
490        let schema_info = SchemaInfo::from_accepted_snapshot_for_model_with_expression_indexes(
491            E::MODEL,
492            &accepted_schema,
493            true,
494        );
495        let prepared = match prepare_sql_ddl_statement(
496            &statement,
497            &accepted_schema,
498            &schema_info,
499            E::Store::PATH,
500        ) {
501            Ok(prepared) => prepared,
502            Err(err) => {
503                return Err(QueryError::unsupported_query(format!(
504                    "SQL DDL preparation failed before execution: {err}"
505                )));
506            }
507        };
508
509        Ok((accepted_schema, prepared))
510    }
511
512    /// Execute one SQL DDL statement.
513    ///
514    /// Supported DDL routes through schema-owned physical work and
515    /// accepted-snapshot publication.
516    pub fn execute_sql_ddl<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
517    where
518        E: PersistedRow<Canister = C> + EntityValue,
519    {
520        let (accepted_before, prepared) = self.prepare_sql_ddl_command::<E>(sql)?;
521        if !prepared.mutates_schema() {
522            return Ok(SqlStatementResult::Ddl(
523                prepared
524                    .report()
525                    .clone()
526                    .with_execution_status(SqlDdlExecutionStatus::NoOp),
527            ));
528        }
529
530        let Some(derivation) = prepared.derivation() else {
531            return Err(QueryError::unsupported_query(
532                "SQL DDL execution could not find a prepared schema derivation".to_string(),
533            ));
534        };
535        let store = self
536            .db
537            .recovered_store(E::Store::PATH)
538            .map_err(QueryError::execute)?;
539
540        let (rows_scanned, index_keys_written) = Self::execute_prepared_sql_ddl_mutation::<E>(
541            store,
542            &accepted_before,
543            derivation,
544            &prepared,
545        )?;
546
547        Ok(SqlStatementResult::Ddl(
548            prepared
549                .report()
550                .clone()
551                .with_execution_status(SqlDdlExecutionStatus::Published)
552                .with_execution_metrics(rows_scanned, index_keys_written),
553        ))
554    }
555
556    fn execute_prepared_sql_ddl_mutation<E>(
557        store: crate::db::registry::StoreHandle,
558        accepted_before: &AcceptedSchemaSnapshot,
559        derivation: &crate::db::schema::SchemaDdlAcceptedSnapshotDerivation,
560        prepared: &PreparedSqlDdlCommand,
561    ) -> Result<(usize, usize), QueryError>
562    where
563        E: PersistedRow<Canister = C> + EntityValue,
564    {
565        let metrics = match prepared.bound().statement() {
566            crate::db::sql::ddl::BoundSqlDdlStatement::AddColumn(_) => {
567                execute_sql_ddl_field_addition(
568                    store,
569                    E::ENTITY_TAG,
570                    E::PATH,
571                    accepted_before,
572                    derivation,
573                )
574                .map_err(QueryError::execute)?;
575
576                (0, 0)
577            }
578            crate::db::sql::ddl::BoundSqlDdlStatement::AlterColumnDefault(_) => {
579                execute_sql_ddl_field_default_change(
580                    store,
581                    E::ENTITY_TAG,
582                    E::PATH,
583                    accepted_before,
584                    derivation,
585                )
586                .map_err(QueryError::execute)?;
587
588                (0, 0)
589            }
590            crate::db::sql::ddl::BoundSqlDdlStatement::AlterColumnNullability(_) => {
591                let rows_scanned = execute_sql_ddl_field_nullability_change(
592                    store,
593                    E::ENTITY_TAG,
594                    E::PATH,
595                    accepted_before,
596                    derivation,
597                )
598                .map_err(QueryError::execute)?;
599
600                (rows_scanned, 0)
601            }
602            crate::db::sql::ddl::BoundSqlDdlStatement::DropColumn(_) => {
603                execute_sql_ddl_field_drop(
604                    store,
605                    E::ENTITY_TAG,
606                    E::PATH,
607                    accepted_before,
608                    derivation,
609                )
610                .map_err(QueryError::execute)?;
611
612                (0, 0)
613            }
614            crate::db::sql::ddl::BoundSqlDdlStatement::RenameColumn(_) => {
615                execute_sql_ddl_field_rename(
616                    store,
617                    E::ENTITY_TAG,
618                    E::PATH,
619                    accepted_before,
620                    derivation,
621                )
622                .map_err(QueryError::execute)?;
623
624                (0, 0)
625            }
626            crate::db::sql::ddl::BoundSqlDdlStatement::CreateIndex(create)
627                if create.candidate_index().key().is_field_path_only() =>
628            {
629                execute_sql_ddl_field_path_index_addition(
630                    store,
631                    E::ENTITY_TAG,
632                    E::PATH,
633                    accepted_before,
634                    derivation,
635                )
636                .map_err(QueryError::execute)?
637            }
638            crate::db::sql::ddl::BoundSqlDdlStatement::CreateIndex(_) => {
639                execute_sql_ddl_expression_index_addition(
640                    store,
641                    E::ENTITY_TAG,
642                    E::PATH,
643                    accepted_before,
644                    derivation,
645                )
646                .map_err(QueryError::execute)?
647            }
648            crate::db::sql::ddl::BoundSqlDdlStatement::DropIndex(_) => {
649                execute_sql_ddl_secondary_index_drop(
650                    store,
651                    E::ENTITY_TAG,
652                    E::PATH,
653                    accepted_before,
654                    derivation,
655                )
656                .map_err(QueryError::execute)?;
657
658                (0, 0)
659            }
660            crate::db::sql::ddl::BoundSqlDdlStatement::NoOp(_) => (0, 0),
661        };
662
663        Ok(metrics)
664    }
665}