Skip to main content

icydb_core/db/session/sql/
mod.rs

1//! Module: db::session::sql
2//! Responsibility: session-owned SQL execution, explain, projection, and
3//! surface-classification helpers above lowered SQL commands.
4//! Does not own: SQL parsing or structural executor runtime behavior.
5//! Boundary: keeps session visibility, authority selection, and SQL surface routing in one subsystem.
6
7mod attribution;
8mod cache;
9mod compile;
10mod compile_cache;
11mod compiled;
12mod execute;
13mod projection;
14mod result;
15
16#[cfg(feature = "diagnostics")]
17use crate::db::DataStore;
18#[cfg(feature = "diagnostics")]
19use crate::db::executor::{
20    current_pure_covering_decode_local_instructions,
21    current_pure_covering_row_assembly_local_instructions,
22};
23#[cfg(test)]
24use crate::db::sql::parser::parse_sql;
25#[cfg(feature = "diagnostics")]
26use crate::db::{GroupedCountAttribution, GroupedExecutionAttribution};
27use crate::{
28    db::{
29        DbSession, PersistedRow, QueryError,
30        executor::{EntityAuthority, SharedPreparedExecutionPlan},
31        query::intent::StructuralQuery,
32        schema::AcceptedSchemaSnapshot,
33        schema::{
34            execute_sql_ddl_expression_index_addition, execute_sql_ddl_field_addition,
35            execute_sql_ddl_field_default_change, execute_sql_ddl_field_drop,
36            execute_sql_ddl_field_nullability_change, execute_sql_ddl_field_path_index_addition,
37            execute_sql_ddl_field_rename, execute_sql_ddl_secondary_index_drop,
38        },
39        session::sql::projection::{
40            projection_fixed_scales_from_projection_spec, projection_labels_from_projection_spec,
41        },
42        session::{AcceptedSchemaCatalogContext, query::QueryPlanCacheAttribution},
43        sql::{
44            ddl::{PreparedSqlDdlCommand, prepare_sql_ddl_statement},
45            parser::{SqlDdlStatement, SqlExplainTarget, SqlStatement, parse_sql_with_attribution},
46        },
47    },
48    traits::{CanisterKind, EntityValue, Path},
49};
50
51pub(in crate::db::session::sql) use crate::db::diagnostics::measure_local_instruction_delta as measure_sql_stage;
52pub use crate::db::sql::ddl::{SqlDdlExecutionStatus, SqlDdlMutationKind, SqlDdlPreparationReport};
53#[cfg(feature = "diagnostics")]
54pub(in crate::db) use attribution::SqlExecutePhaseAttribution;
55#[cfg(feature = "diagnostics")]
56pub use attribution::{
57    SqlCompileAttribution, SqlExecutionAttribution, SqlPureCoveringAttribution,
58    SqlQueryCacheAttribution, SqlQueryExecutionAttribution, SqlScalarAggregateAttribution,
59};
60pub(in crate::db) use cache::{SqlCacheAttribution, SqlCompiledCommandCacheKey};
61pub(in crate::db::session::sql) use cache::{
62    SqlCompiledCommandSurface, sql_compiled_command_cache_miss_reason,
63};
64pub(in crate::db::session::sql) use compile::{
65    SqlCompileAttributionBuilder, SqlCompilePhaseAttribution,
66};
67pub(in crate::db) use compiled::{
68    CompiledSqlCommand, SqlCompiledCommandExecutionContext, SqlProjectionContract,
69};
70pub use result::SqlStatementResult;
71
72/// Parsed SQL endpoint surface used by generated SQL helper dispatch.
73#[derive(Clone, Copy, Debug, Eq, PartialEq)]
74#[doc(hidden)]
75pub enum SqlStatementSurface {
76    /// SQL handled by the readonly query endpoint.
77    Query,
78    /// SQL handled by the DDL/update endpoint.
79    Ddl,
80}
81
82#[cfg(all(test, not(feature = "diagnostics")))]
83pub(crate) use crate::db::session::sql::projection::with_sql_projection_materialization_metrics;
84#[cfg(feature = "diagnostics")]
85pub use crate::db::session::sql::projection::{
86    SqlProjectionMaterializationMetrics, with_sql_projection_materialization_metrics,
87};
88
89// Keep parsing as a module-owned helper instead of hanging a pure parser off
90// `DbSession` as a fake session method.
91#[cfg(test)]
92pub(in crate::db) fn parse_sql_statement(sql: &str) -> Result<SqlStatement, QueryError> {
93    parse_sql(sql).map_err(QueryError::from_sql_parse_error)
94}
95
96/// Return the entity identifier targeted by one reduced SQL statement.
97///
98/// `SHOW ENTITIES`, `SHOW STORES`, and `SHOW MEMORY` intentionally have no
99/// entity target; callers that dispatch across canister-owned entities may
100/// route them through any accepted entity.
101#[doc(hidden)]
102pub fn sql_statement_entity_name(sql: &str) -> Result<Option<String>, QueryError> {
103    let (statement, _) =
104        parse_sql_with_attribution(sql).map_err(QueryError::from_sql_parse_error)?;
105
106    Ok(sql_statement_entity_name_from_statement(&statement).map(str::to_string))
107}
108
109/// Return the generated endpoint surface required by one reduced SQL statement.
110#[doc(hidden)]
111pub fn sql_statement_surface(sql: &str) -> Result<SqlStatementSurface, QueryError> {
112    let (statement, _) =
113        parse_sql_with_attribution(sql).map_err(QueryError::from_sql_parse_error)?;
114
115    Ok(sql_statement_surface_from_statement(&statement))
116}
117
118const fn sql_statement_surface_from_statement(statement: &SqlStatement) -> SqlStatementSurface {
119    match statement {
120        SqlStatement::Ddl(_) => SqlStatementSurface::Ddl,
121        SqlStatement::Select(_)
122        | SqlStatement::Delete(_)
123        | SqlStatement::Insert(_)
124        | SqlStatement::Update(_)
125        | SqlStatement::Explain(_)
126        | SqlStatement::Describe(_)
127        | SqlStatement::ShowIndexes(_)
128        | SqlStatement::ShowColumns(_)
129        | SqlStatement::ShowEntities(_)
130        | SqlStatement::ShowStores(_)
131        | SqlStatement::ShowMemory(_) => SqlStatementSurface::Query,
132    }
133}
134
135const fn sql_statement_entity_name_from_statement(statement: &SqlStatement) -> Option<&str> {
136    match statement {
137        SqlStatement::Select(statement) => Some(statement.entity.as_str()),
138        SqlStatement::Delete(statement) => Some(statement.entity.as_str()),
139        SqlStatement::Insert(statement) => Some(statement.entity.as_str()),
140        SqlStatement::Update(statement) => Some(statement.entity.as_str()),
141        SqlStatement::Ddl(SqlDdlStatement::CreateIndex(statement)) => {
142            Some(statement.entity.as_str())
143        }
144        SqlStatement::Ddl(SqlDdlStatement::DropIndex(statement)) => match &statement.entity {
145            Some(entity) => Some(entity.as_str()),
146            None => None,
147        },
148        SqlStatement::Ddl(SqlDdlStatement::AlterTableAddColumn(statement)) => {
149            Some(statement.entity.as_str())
150        }
151        SqlStatement::Ddl(SqlDdlStatement::AlterTableAlterColumn(statement)) => {
152            Some(statement.entity.as_str())
153        }
154        SqlStatement::Ddl(SqlDdlStatement::AlterTableDropColumn(statement)) => {
155            Some(statement.entity.as_str())
156        }
157        SqlStatement::Ddl(SqlDdlStatement::AlterTableRenameColumn(statement)) => {
158            Some(statement.entity.as_str())
159        }
160        SqlStatement::Explain(statement) => match &statement.statement {
161            SqlExplainTarget::Select(statement) => Some(statement.entity.as_str()),
162            SqlExplainTarget::Delete(statement) => Some(statement.entity.as_str()),
163        },
164        SqlStatement::Describe(statement) => Some(statement.entity.as_str()),
165        SqlStatement::ShowIndexes(statement) => Some(statement.entity.as_str()),
166        SqlStatement::ShowColumns(statement) => Some(statement.entity.as_str()),
167        SqlStatement::ShowEntities(_)
168        | SqlStatement::ShowStores(_)
169        | SqlStatement::ShowMemory(_) => None,
170    }
171}
172
173// Measure one SQL compile stage and immediately surface the stage result. The
174// helper keeps attribution capture uniform while avoiding repeated
175// `(cost, result); result?` boilerplate across the compile pipeline.
176fn measured<T>(stage: impl FnOnce() -> Result<T, QueryError>) -> Result<(u64, T), QueryError> {
177    let (local_instructions, result) = measure_sql_stage(stage);
178    let value = result?;
179
180    Ok((local_instructions, value))
181}
182
183impl<C: CanisterKind> DbSession<C> {
184    // Resolve one SQL SELECT through a caller-selected accepted authority and
185    // accepted schema snapshot. Typed SQL entrypoints use this to avoid passing
186    // generated authority through the runtime cache boundary.
187    fn sql_select_prepared_plan_for_accepted_authority(
188        &self,
189        query: &StructuralQuery,
190        authority: EntityAuthority,
191        accepted_schema: &AcceptedSchemaSnapshot,
192    ) -> Result<
193        (
194            SharedPreparedExecutionPlan,
195            SqlProjectionContract,
196            SqlCacheAttribution,
197        ),
198        QueryError,
199    > {
200        let schema_fingerprint =
201            crate::db::schema::accepted_schema_cache_fingerprint(accepted_schema)
202                .map_err(QueryError::execute)?;
203
204        self.sql_select_prepared_plan_for_accepted_authority_with_schema_fingerprint(
205            query,
206            authority,
207            accepted_schema,
208            schema_fingerprint,
209        )
210    }
211
212    fn sql_select_prepared_plan_for_accepted_authority_with_schema_fingerprint(
213        &self,
214        query: &StructuralQuery,
215        authority: EntityAuthority,
216        accepted_schema: &AcceptedSchemaSnapshot,
217        schema_fingerprint: crate::db::commit::CommitSchemaFingerprint,
218    ) -> Result<
219        (
220            SharedPreparedExecutionPlan,
221            SqlProjectionContract,
222            SqlCacheAttribution,
223        ),
224        QueryError,
225    > {
226        let (prepared_plan, cache_attribution) = self
227            .cached_shared_query_plan_for_accepted_authority_with_schema_fingerprint(
228                authority.clone(),
229                accepted_schema,
230                schema_fingerprint,
231                query,
232            )?;
233        Ok(Self::sql_select_projection_from_prepared_plan(
234            prepared_plan,
235            authority,
236            cache_attribution,
237        ))
238    }
239
240    // Resolve one typed SQL SELECT through accepted schema authority selected
241    // at the session boundary.
242    #[allow(
243        dead_code,
244        reason = "explicit compiled SQL execution can still plan without an attached compile context; immediate SQL query entrypoints use the context-aware sibling"
245    )]
246    fn sql_select_prepared_plan_for_entity<E>(
247        &self,
248        query: &StructuralQuery,
249    ) -> Result<
250        (
251            SharedPreparedExecutionPlan,
252            SqlProjectionContract,
253            SqlCacheAttribution,
254        ),
255        QueryError,
256    >
257    where
258        E: PersistedRow<Canister = C> + EntityValue,
259    {
260        let catalog = self
261            .accepted_schema_catalog_context_for_query::<E>()
262            .map_err(QueryError::execute)?;
263        let authority = catalog
264            .accepted_entity_authority_for::<E>()
265            .map_err(QueryError::execute)?;
266
267        self.sql_select_prepared_plan_for_accepted_authority_with_schema_fingerprint(
268            query,
269            authority,
270            catalog.snapshot(),
271            catalog.fingerprint(),
272        )
273    }
274
275    fn sql_select_projection_from_prepared_plan(
276        prepared_plan: SharedPreparedExecutionPlan,
277        authority: EntityAuthority,
278        cache_attribution: QueryPlanCacheAttribution,
279    ) -> (
280        SharedPreparedExecutionPlan,
281        SqlProjectionContract,
282        SqlCacheAttribution,
283    ) {
284        let projection_spec = prepared_plan
285            .logical_plan()
286            .projection_spec(authority.model());
287        let projection = SqlProjectionContract::new(
288            projection_labels_from_projection_spec(&projection_spec),
289            projection_fixed_scales_from_projection_spec(&projection_spec),
290        );
291
292        (
293            prepared_plan,
294            projection,
295            SqlCacheAttribution::from_shared_query_plan_cache(cache_attribution),
296        )
297    }
298
299    // Keep query/update surface gating owned by one helper so the SQL
300    // compiled-command lane does not duplicate the same statement-family split
301    // just to change the outward error wording.
302    fn ensure_sql_statement_supported_for_surface(
303        statement: &SqlStatement,
304        surface: SqlCompiledCommandSurface,
305    ) -> Result<(), QueryError> {
306        match (surface, statement) {
307            (
308                SqlCompiledCommandSurface::Query,
309                SqlStatement::Select(_)
310                | SqlStatement::Explain(_)
311                | SqlStatement::Describe(_)
312                | SqlStatement::ShowIndexes(_)
313                | SqlStatement::ShowColumns(_)
314                | SqlStatement::ShowEntities(_)
315                | SqlStatement::ShowStores(_)
316                | SqlStatement::ShowMemory(_),
317            )
318            | (
319                SqlCompiledCommandSurface::Update,
320                SqlStatement::Insert(_) | SqlStatement::Update(_) | SqlStatement::Delete(_),
321            ) => Ok(()),
322            (_, SqlStatement::Ddl(_)) => Err(QueryError::unsupported_query(
323                "SQL DDL execution is not supported in this release",
324            )),
325            (SqlCompiledCommandSurface::Query, SqlStatement::Insert(_)) => {
326                Err(QueryError::unsupported_query(
327                    "execute_sql_query rejects INSERT; use execute_sql_update::<E>()",
328                ))
329            }
330            (SqlCompiledCommandSurface::Query, SqlStatement::Update(_)) => {
331                Err(QueryError::unsupported_query(
332                    "execute_sql_query rejects UPDATE; use execute_sql_update::<E>()",
333                ))
334            }
335            (SqlCompiledCommandSurface::Query, SqlStatement::Delete(_)) => {
336                Err(QueryError::unsupported_query(
337                    "execute_sql_query rejects DELETE; use execute_sql_update::<E>()",
338                ))
339            }
340            (SqlCompiledCommandSurface::Update, SqlStatement::Select(_)) => {
341                Err(QueryError::unsupported_query(
342                    "execute_sql_update rejects SELECT; use execute_sql_query::<E>()",
343                ))
344            }
345            (SqlCompiledCommandSurface::Update, SqlStatement::Explain(_)) => {
346                Err(QueryError::unsupported_query(
347                    "execute_sql_update rejects EXPLAIN; use execute_sql_query::<E>()",
348                ))
349            }
350            (SqlCompiledCommandSurface::Update, SqlStatement::Describe(_)) => {
351                Err(QueryError::unsupported_query(
352                    "execute_sql_update rejects DESCRIBE; use execute_sql_query::<E>()",
353                ))
354            }
355            (SqlCompiledCommandSurface::Update, SqlStatement::ShowIndexes(_)) => {
356                Err(QueryError::unsupported_query(
357                    "execute_sql_update rejects SHOW INDEXES; use execute_sql_query::<E>()",
358                ))
359            }
360            (SqlCompiledCommandSurface::Update, SqlStatement::ShowColumns(_)) => {
361                Err(QueryError::unsupported_query(
362                    "execute_sql_update rejects SHOW COLUMNS; use execute_sql_query::<E>()",
363                ))
364            }
365            (SqlCompiledCommandSurface::Update, SqlStatement::ShowEntities(_)) => {
366                Err(QueryError::unsupported_query(
367                    "execute_sql_update rejects SHOW ENTITIES; use execute_sql_query::<E>()",
368                ))
369            }
370            (SqlCompiledCommandSurface::Update, SqlStatement::ShowStores(_)) => {
371                Err(QueryError::unsupported_query(
372                    "execute_sql_update rejects SHOW STORES; use execute_sql_query::<E>()",
373                ))
374            }
375            (SqlCompiledCommandSurface::Update, SqlStatement::ShowMemory(_)) => {
376                Err(QueryError::unsupported_query(
377                    "execute_sql_update rejects SHOW MEMORY; use execute_sql_query::<E>()",
378                ))
379            }
380        }
381    }
382
383    /// Execute one single-entity reduced SQL query or introspection statement.
384    ///
385    /// This surface stays hard-bound to `E`, rejects state-changing SQL, and
386    /// returns SQL-shaped statement output instead of typed entities.
387    pub fn execute_sql_query<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
388    where
389        E: PersistedRow<Canister = C> + EntityValue,
390    {
391        let (compiled, _, _) = self.compile_sql_query_with_execution_context::<E>(sql)?;
392
393        self.execute_compiled_sql_context_owned::<E>(compiled)
394    }
395
396    /// Execute one reduced SQL query while reporting the compile/execute split
397    /// at the top-level SQL seam.
398    #[cfg(feature = "diagnostics")]
399    #[doc(hidden)]
400    pub fn execute_sql_query_with_attribution<E>(
401        &self,
402        sql: &str,
403    ) -> Result<(SqlStatementResult, SqlQueryExecutionAttribution), QueryError>
404    where
405        E: PersistedRow<Canister = C> + EntityValue,
406    {
407        // Phase 1: measure the compile side of the new seam, including parse,
408        // surface validation, and semantic command construction.
409        let (compile_local_instructions, compiled) =
410            measure_sql_stage(|| self.compile_sql_query_with_execution_context::<E>(sql));
411        let (compiled, compile_cache_attribution, compile_phase_attribution) = compiled?;
412
413        // Phase 2: measure the execute side separately so repeat-run cache
414        // experiments can prove which side actually moved.
415        let store_get_calls_before = DataStore::current_get_call_count();
416        let pure_covering_decode_before = current_pure_covering_decode_local_instructions();
417        let pure_covering_row_assembly_before =
418            current_pure_covering_row_assembly_local_instructions();
419        let (result, execute_cache_attribution, execute_phase_attribution) =
420            self.execute_compiled_sql_context_with_phase_attribution::<E>(&compiled)?;
421        let store_get_calls =
422            DataStore::current_get_call_count().saturating_sub(store_get_calls_before);
423        let pure_covering_decode_local_instructions =
424            current_pure_covering_decode_local_instructions()
425                .saturating_sub(pure_covering_decode_before);
426        let pure_covering_row_assembly_local_instructions =
427            current_pure_covering_row_assembly_local_instructions()
428                .saturating_sub(pure_covering_row_assembly_before);
429        let execute_local_instructions = execute_phase_attribution
430            .planner_local_instructions
431            .saturating_add(execute_phase_attribution.store_local_instructions)
432            .saturating_add(execute_phase_attribution.executor_local_instructions)
433            .saturating_add(execute_phase_attribution.response_finalization_local_instructions);
434        let cache_attribution = compile_cache_attribution.merge(execute_cache_attribution);
435        let total_local_instructions =
436            compile_local_instructions.saturating_add(execute_local_instructions);
437        let grouped = matches!(&result, SqlStatementResult::Grouped { .. }).then_some(
438            GroupedExecutionAttribution {
439                stream_local_instructions: execute_phase_attribution
440                    .grouped_stream_local_instructions,
441                fold_local_instructions: execute_phase_attribution.grouped_fold_local_instructions,
442                finalize_local_instructions: execute_phase_attribution
443                    .grouped_finalize_local_instructions,
444                count: GroupedCountAttribution::from_executor(
445                    execute_phase_attribution.grouped_count,
446                ),
447            },
448        );
449        let pure_covering = (pure_covering_decode_local_instructions > 0
450            || pure_covering_row_assembly_local_instructions > 0)
451            .then_some(SqlPureCoveringAttribution {
452                decode_local_instructions: pure_covering_decode_local_instructions,
453                row_assembly_local_instructions: pure_covering_row_assembly_local_instructions,
454            });
455
456        Ok((
457            result,
458            SqlQueryExecutionAttribution {
459                compile_local_instructions,
460                compile: SqlCompileAttribution {
461                    cache_key_local_instructions: compile_phase_attribution.cache_key,
462                    cache_lookup_local_instructions: compile_phase_attribution.cache_lookup,
463                    parse_local_instructions: compile_phase_attribution.parse,
464                    parse_tokenize_local_instructions: compile_phase_attribution.parse_tokenize,
465                    parse_select_local_instructions: compile_phase_attribution.parse_select,
466                    parse_expr_local_instructions: compile_phase_attribution.parse_expr,
467                    parse_predicate_local_instructions: compile_phase_attribution.parse_predicate,
468                    aggregate_lane_check_local_instructions: compile_phase_attribution
469                        .aggregate_lane_check,
470                    prepare_local_instructions: compile_phase_attribution.prepare,
471                    lower_local_instructions: compile_phase_attribution.lower,
472                    bind_local_instructions: compile_phase_attribution.bind,
473                    cache_insert_local_instructions: compile_phase_attribution.cache_insert,
474                },
475                plan_lookup_local_instructions: execute_phase_attribution
476                    .planner_local_instructions,
477                execution: SqlExecutionAttribution {
478                    planner_local_instructions: execute_phase_attribution
479                        .planner_local_instructions,
480                    store_local_instructions: execute_phase_attribution.store_local_instructions,
481                    executor_invocation_local_instructions: execute_phase_attribution
482                        .executor_invocation_local_instructions,
483                    executor_local_instructions: execute_phase_attribution
484                        .executor_local_instructions,
485                    response_finalization_local_instructions: execute_phase_attribution
486                        .response_finalization_local_instructions,
487                },
488                grouped,
489                scalar_aggregate: SqlScalarAggregateAttribution::from_executor(
490                    execute_phase_attribution.scalar_aggregate_terminal,
491                ),
492                pure_covering,
493                store_get_calls,
494                response_decode_local_instructions: 0,
495                execute_local_instructions,
496                total_local_instructions,
497                cache: SqlQueryCacheAttribution {
498                    sql_compiled_command_hits: cache_attribution.sql_compiled_command_cache_hits,
499                    sql_compiled_command_misses: cache_attribution
500                        .sql_compiled_command_cache_misses,
501                    shared_query_plan_hits: cache_attribution.shared_query_plan_cache_hits,
502                    shared_query_plan_misses: cache_attribution.shared_query_plan_cache_misses,
503                },
504            },
505        ))
506    }
507
508    /// Execute one single-entity reduced SQL mutation statement.
509    ///
510    /// This surface stays hard-bound to `E`, rejects read-only SQL, and
511    /// returns SQL-shaped mutation output such as counts or `RETURNING` rows.
512    pub fn execute_sql_update<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
513    where
514        E: PersistedRow<Canister = C> + EntityValue,
515    {
516        let compiled = self.compile_sql_update::<E>(sql)?;
517
518        self.execute_compiled_sql_owned::<E>(compiled)
519    }
520
521    /// Prepare one SQL DDL statement against the accepted schema catalog.
522    ///
523    /// This is a non-executing surface: it proves the statement can bind,
524    /// derive an accepted-after snapshot, and pass schema mutation admission,
525    /// then returns a prepared-only report without mutating schema or index
526    /// storage.
527    pub fn prepare_sql_ddl<E>(&self, sql: &str) -> Result<SqlDdlPreparationReport, QueryError>
528    where
529        E: PersistedRow<Canister = C> + EntityValue,
530    {
531        let (_, prepared) = self.prepare_sql_ddl_command::<E>(sql)?;
532
533        Ok(prepared.report().clone())
534    }
535
536    fn prepare_sql_ddl_command<E>(
537        &self,
538        sql: &str,
539    ) -> Result<(AcceptedSchemaCatalogContext, PreparedSqlDdlCommand), QueryError>
540    where
541        E: PersistedRow<Canister = C> + EntityValue,
542    {
543        let (statement, _) =
544            parse_sql_with_attribution(sql).map_err(QueryError::from_sql_parse_error)?;
545        let catalog = self
546            .accepted_schema_catalog_context_for_query::<E>()
547            .map_err(QueryError::execute)?;
548        let schema_info = catalog.accepted_schema_info_for::<E>();
549        let prepared = match prepare_sql_ddl_statement(
550            &statement,
551            catalog.snapshot(),
552            &schema_info,
553            E::Store::PATH,
554        ) {
555            Ok(prepared) => prepared,
556            Err(err) => return Err(QueryError::from_sql_ddl_prepare_error(err)),
557        };
558
559        Ok((catalog, prepared))
560    }
561
562    /// Execute one SQL DDL statement.
563    ///
564    /// Supported DDL routes through schema-owned physical work and
565    /// accepted-snapshot publication.
566    pub fn execute_sql_ddl<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
567    where
568        E: PersistedRow<Canister = C> + EntityValue,
569    {
570        let (accepted_before, prepared) = self.prepare_sql_ddl_command::<E>(sql)?;
571        if !prepared.mutates_schema() {
572            return Ok(SqlStatementResult::Ddl(
573                prepared
574                    .report()
575                    .clone()
576                    .with_execution_status(SqlDdlExecutionStatus::NoOp),
577            ));
578        }
579
580        let Some(derivation) = prepared.derivation() else {
581            return Err(QueryError::unsupported_query(
582                "SQL DDL execution could not find a prepared schema derivation".to_string(),
583            ));
584        };
585        let store = self
586            .db
587            .recovered_store(E::Store::PATH)
588            .map_err(QueryError::execute)?;
589
590        let (rows_scanned, index_keys_written) = Self::execute_prepared_sql_ddl_mutation::<E>(
591            store,
592            accepted_before.snapshot(),
593            accepted_before.identity(),
594            derivation,
595            &prepared,
596        )?;
597        self.invalidate_accepted_schema_query_cache_for_entity::<E>();
598
599        Ok(SqlStatementResult::Ddl(
600            prepared
601                .report()
602                .clone()
603                .with_execution_status(SqlDdlExecutionStatus::Published)
604                .with_execution_metrics(rows_scanned, index_keys_written),
605        ))
606    }
607
608    fn execute_prepared_sql_ddl_mutation<E>(
609        store: crate::db::registry::StoreHandle,
610        accepted_before: &AcceptedSchemaSnapshot,
611        accepted_before_identity: crate::db::schema::AcceptedCatalogIdentity,
612        derivation: &crate::db::schema::SchemaDdlAcceptedSnapshotDerivation,
613        prepared: &PreparedSqlDdlCommand,
614    ) -> Result<(usize, usize), QueryError>
615    where
616        E: PersistedRow<Canister = C> + EntityValue,
617    {
618        let metrics = match prepared.bound().statement() {
619            crate::db::sql::ddl::BoundSqlDdlStatement::AddColumn(_) => {
620                execute_sql_ddl_field_addition(
621                    store,
622                    E::ENTITY_TAG,
623                    E::PATH,
624                    accepted_before,
625                    accepted_before_identity,
626                    derivation,
627                )
628                .map_err(QueryError::from_sql_ddl_execution_error)?;
629
630                (0, 0)
631            }
632            crate::db::sql::ddl::BoundSqlDdlStatement::AlterColumnDefault(_) => {
633                execute_sql_ddl_field_default_change(
634                    store,
635                    E::ENTITY_TAG,
636                    E::PATH,
637                    accepted_before,
638                    accepted_before_identity,
639                    derivation,
640                )
641                .map_err(QueryError::from_sql_ddl_execution_error)?;
642
643                (0, 0)
644            }
645            crate::db::sql::ddl::BoundSqlDdlStatement::AlterColumnNullability(_) => {
646                let rows_scanned = execute_sql_ddl_field_nullability_change(
647                    store,
648                    E::ENTITY_TAG,
649                    E::PATH,
650                    accepted_before,
651                    accepted_before_identity,
652                    derivation,
653                )
654                .map_err(QueryError::from_sql_ddl_execution_error)?;
655
656                (rows_scanned, 0)
657            }
658            crate::db::sql::ddl::BoundSqlDdlStatement::DropColumn(_) => {
659                execute_sql_ddl_field_drop(
660                    store,
661                    E::ENTITY_TAG,
662                    E::PATH,
663                    accepted_before,
664                    accepted_before_identity,
665                    derivation,
666                )
667                .map_err(QueryError::from_sql_ddl_execution_error)?;
668
669                (0, 0)
670            }
671            crate::db::sql::ddl::BoundSqlDdlStatement::RenameColumn(_) => {
672                execute_sql_ddl_field_rename(
673                    store,
674                    E::ENTITY_TAG,
675                    E::PATH,
676                    accepted_before,
677                    accepted_before_identity,
678                    derivation,
679                )
680                .map_err(QueryError::from_sql_ddl_execution_error)?;
681
682                (0, 0)
683            }
684            crate::db::sql::ddl::BoundSqlDdlStatement::CreateIndex(create)
685                if create.candidate_index().key().is_field_path_only() =>
686            {
687                execute_sql_ddl_field_path_index_addition(
688                    store,
689                    E::ENTITY_TAG,
690                    E::PATH,
691                    accepted_before,
692                    accepted_before_identity,
693                    derivation,
694                )
695                .map_err(QueryError::from_sql_ddl_execution_error)?
696            }
697            crate::db::sql::ddl::BoundSqlDdlStatement::CreateIndex(_) => {
698                execute_sql_ddl_expression_index_addition(
699                    store,
700                    E::ENTITY_TAG,
701                    E::PATH,
702                    accepted_before,
703                    accepted_before_identity,
704                    derivation,
705                )
706                .map_err(QueryError::from_sql_ddl_execution_error)?
707            }
708            crate::db::sql::ddl::BoundSqlDdlStatement::DropIndex(_) => {
709                execute_sql_ddl_secondary_index_drop(
710                    store,
711                    E::ENTITY_TAG,
712                    E::PATH,
713                    accepted_before,
714                    accepted_before_identity,
715                    derivation,
716                )
717                .map_err(QueryError::from_sql_ddl_execution_error)?;
718
719                (0, 0)
720            }
721            crate::db::sql::ddl::BoundSqlDdlStatement::NoOp(_) => (0, 0),
722        };
723
724        Ok(metrics)
725    }
726}