1mod attribution;
8mod cache;
9mod compile;
10mod compile_cache;
11mod compiled;
12mod execute;
13mod projection;
14mod result;
15
16#[cfg(feature = "diagnostics")]
17use crate::db::DataStore;
18#[cfg(feature = "diagnostics")]
19use crate::db::executor::{
20 current_pure_covering_decode_local_instructions,
21 current_pure_covering_row_assembly_local_instructions,
22};
23#[cfg(test)]
24use crate::db::sql::parser::parse_sql;
25#[cfg(feature = "diagnostics")]
26use crate::db::{GroupedCountAttribution, GroupedExecutionAttribution};
27use crate::{
28 db::{
29 DbSession, PersistedRow, QueryError,
30 executor::{EntityAuthority, SharedPreparedExecutionPlan},
31 query::intent::StructuralQuery,
32 schema::{AcceptedSchemaSnapshot, SchemaInfo},
33 schema::{
34 execute_sql_ddl_expression_index_addition, execute_sql_ddl_field_addition,
35 execute_sql_ddl_field_default_change, execute_sql_ddl_field_drop,
36 execute_sql_ddl_field_nullability_change, execute_sql_ddl_field_path_index_addition,
37 execute_sql_ddl_field_rename, execute_sql_ddl_secondary_index_drop,
38 },
39 session::query::QueryPlanCacheAttribution,
40 session::sql::projection::{
41 projection_fixed_scales_from_projection_spec, projection_labels_from_projection_spec,
42 },
43 sql::{
44 ddl::{PreparedSqlDdlCommand, prepare_sql_ddl_statement},
45 parser::{SqlDdlStatement, SqlExplainTarget, SqlStatement, parse_sql_with_attribution},
46 },
47 },
48 traits::{CanisterKind, EntityValue, Path},
49};
50
51pub(in crate::db::session::sql) use crate::db::diagnostics::measure_local_instruction_delta as measure_sql_stage;
52pub use crate::db::sql::ddl::{SqlDdlExecutionStatus, SqlDdlMutationKind, SqlDdlPreparationReport};
53#[cfg(feature = "diagnostics")]
54pub(in crate::db) use attribution::SqlExecutePhaseAttribution;
55#[cfg(feature = "diagnostics")]
56pub use attribution::{
57 SqlCompileAttribution, SqlExecutionAttribution, SqlPureCoveringAttribution,
58 SqlQueryCacheAttribution, SqlQueryExecutionAttribution, SqlScalarAggregateAttribution,
59};
60pub(in crate::db) use cache::{SqlCacheAttribution, SqlCompiledCommandCacheKey};
61pub(in crate::db::session::sql) use cache::{
62 SqlCompiledCommandSurface, sql_compiled_command_cache_miss_reason,
63};
64pub(in crate::db::session::sql) use compile::{
65 SqlCompileAttributionBuilder, SqlCompilePhaseAttribution,
66};
67pub(in crate::db) use compiled::{CompiledSqlCommand, SqlProjectionContract};
68pub use result::SqlStatementResult;
69
70#[derive(Clone, Copy, Debug, Eq, PartialEq)]
72#[doc(hidden)]
73pub enum SqlStatementSurface {
74 Query,
76 Ddl,
78}
79
80#[cfg(all(test, not(feature = "diagnostics")))]
81pub(crate) use crate::db::session::sql::projection::with_sql_projection_materialization_metrics;
82#[cfg(feature = "diagnostics")]
83pub use crate::db::session::sql::projection::{
84 SqlProjectionMaterializationMetrics, with_sql_projection_materialization_metrics,
85};
86
87#[cfg(test)]
90pub(in crate::db) fn parse_sql_statement(sql: &str) -> Result<SqlStatement, QueryError> {
91 parse_sql(sql).map_err(QueryError::from_sql_parse_error)
92}
93
94#[doc(hidden)]
99pub fn sql_statement_entity_name(sql: &str) -> Result<Option<String>, QueryError> {
100 let (statement, _) =
101 parse_sql_with_attribution(sql).map_err(QueryError::from_sql_parse_error)?;
102
103 Ok(sql_statement_entity_name_from_statement(&statement).map(str::to_string))
104}
105
106#[doc(hidden)]
108pub fn sql_statement_surface(sql: &str) -> Result<SqlStatementSurface, QueryError> {
109 let (statement, _) =
110 parse_sql_with_attribution(sql).map_err(QueryError::from_sql_parse_error)?;
111
112 Ok(sql_statement_surface_from_statement(&statement))
113}
114
115const fn sql_statement_surface_from_statement(statement: &SqlStatement) -> SqlStatementSurface {
116 match statement {
117 SqlStatement::Ddl(_) => SqlStatementSurface::Ddl,
118 SqlStatement::Select(_)
119 | SqlStatement::Delete(_)
120 | SqlStatement::Insert(_)
121 | SqlStatement::Update(_)
122 | SqlStatement::Explain(_)
123 | SqlStatement::Describe(_)
124 | SqlStatement::ShowIndexes(_)
125 | SqlStatement::ShowColumns(_)
126 | SqlStatement::ShowEntities(_) => SqlStatementSurface::Query,
127 }
128}
129
130const fn sql_statement_entity_name_from_statement(statement: &SqlStatement) -> Option<&str> {
131 match statement {
132 SqlStatement::Select(statement) => Some(statement.entity.as_str()),
133 SqlStatement::Delete(statement) => Some(statement.entity.as_str()),
134 SqlStatement::Insert(statement) => Some(statement.entity.as_str()),
135 SqlStatement::Update(statement) => Some(statement.entity.as_str()),
136 SqlStatement::Ddl(SqlDdlStatement::CreateIndex(statement)) => {
137 Some(statement.entity.as_str())
138 }
139 SqlStatement::Ddl(SqlDdlStatement::DropIndex(statement)) => match &statement.entity {
140 Some(entity) => Some(entity.as_str()),
141 None => None,
142 },
143 SqlStatement::Ddl(SqlDdlStatement::AlterTableAddColumn(statement)) => {
144 Some(statement.entity.as_str())
145 }
146 SqlStatement::Ddl(SqlDdlStatement::AlterTableAlterColumn(statement)) => {
147 Some(statement.entity.as_str())
148 }
149 SqlStatement::Ddl(SqlDdlStatement::AlterTableDropColumn(statement)) => {
150 Some(statement.entity.as_str())
151 }
152 SqlStatement::Ddl(SqlDdlStatement::AlterTableRenameColumn(statement)) => {
153 Some(statement.entity.as_str())
154 }
155 SqlStatement::Explain(statement) => match &statement.statement {
156 SqlExplainTarget::Select(statement) => Some(statement.entity.as_str()),
157 SqlExplainTarget::Delete(statement) => Some(statement.entity.as_str()),
158 },
159 SqlStatement::Describe(statement) => Some(statement.entity.as_str()),
160 SqlStatement::ShowIndexes(statement) => Some(statement.entity.as_str()),
161 SqlStatement::ShowColumns(statement) => Some(statement.entity.as_str()),
162 SqlStatement::ShowEntities(_) => None,
163 }
164}
165
166fn measured<T>(stage: impl FnOnce() -> Result<T, QueryError>) -> Result<(u64, T), QueryError> {
170 let (local_instructions, result) = measure_sql_stage(stage);
171 let value = result?;
172
173 Ok((local_instructions, value))
174}
175
176impl<C: CanisterKind> DbSession<C> {
177 fn sql_select_prepared_plan_for_accepted_authority(
181 &self,
182 query: &StructuralQuery,
183 authority: EntityAuthority,
184 accepted_schema: &AcceptedSchemaSnapshot,
185 ) -> Result<
186 (
187 SharedPreparedExecutionPlan,
188 SqlProjectionContract,
189 SqlCacheAttribution,
190 ),
191 QueryError,
192 > {
193 let (prepared_plan, cache_attribution) = self
194 .cached_shared_query_plan_for_accepted_authority(
195 authority.clone(),
196 accepted_schema,
197 query,
198 )?;
199 Ok(Self::sql_select_projection_from_prepared_plan(
200 prepared_plan,
201 authority,
202 cache_attribution,
203 ))
204 }
205
206 fn sql_select_prepared_plan_for_entity<E>(
209 &self,
210 query: &StructuralQuery,
211 ) -> Result<
212 (
213 SharedPreparedExecutionPlan,
214 SqlProjectionContract,
215 SqlCacheAttribution,
216 ),
217 QueryError,
218 >
219 where
220 E: PersistedRow<Canister = C> + EntityValue,
221 {
222 let (accepted_schema, authority) = self
223 .accepted_entity_authority::<E>()
224 .map_err(QueryError::execute)?;
225
226 self.sql_select_prepared_plan_for_accepted_authority(query, authority, &accepted_schema)
227 }
228
229 fn sql_select_projection_from_prepared_plan(
230 prepared_plan: SharedPreparedExecutionPlan,
231 authority: EntityAuthority,
232 cache_attribution: QueryPlanCacheAttribution,
233 ) -> (
234 SharedPreparedExecutionPlan,
235 SqlProjectionContract,
236 SqlCacheAttribution,
237 ) {
238 let projection_spec = prepared_plan
239 .logical_plan()
240 .projection_spec(authority.model());
241 let projection = SqlProjectionContract::new(
242 projection_labels_from_projection_spec(&projection_spec),
243 projection_fixed_scales_from_projection_spec(&projection_spec),
244 );
245
246 (
247 prepared_plan,
248 projection,
249 SqlCacheAttribution::from_shared_query_plan_cache(cache_attribution),
250 )
251 }
252
253 fn ensure_sql_statement_supported_for_surface(
257 statement: &SqlStatement,
258 surface: SqlCompiledCommandSurface,
259 ) -> Result<(), QueryError> {
260 match (surface, statement) {
261 (
262 SqlCompiledCommandSurface::Query,
263 SqlStatement::Select(_)
264 | SqlStatement::Explain(_)
265 | SqlStatement::Describe(_)
266 | SqlStatement::ShowIndexes(_)
267 | SqlStatement::ShowColumns(_)
268 | SqlStatement::ShowEntities(_),
269 )
270 | (
271 SqlCompiledCommandSurface::Update,
272 SqlStatement::Insert(_) | SqlStatement::Update(_) | SqlStatement::Delete(_),
273 ) => Ok(()),
274 (_, SqlStatement::Ddl(_)) => Err(QueryError::unsupported_query(
275 "SQL DDL execution is not supported in this release",
276 )),
277 (SqlCompiledCommandSurface::Query, SqlStatement::Insert(_)) => {
278 Err(QueryError::unsupported_query(
279 "execute_sql_query rejects INSERT; use execute_sql_update::<E>()",
280 ))
281 }
282 (SqlCompiledCommandSurface::Query, SqlStatement::Update(_)) => {
283 Err(QueryError::unsupported_query(
284 "execute_sql_query rejects UPDATE; use execute_sql_update::<E>()",
285 ))
286 }
287 (SqlCompiledCommandSurface::Query, SqlStatement::Delete(_)) => {
288 Err(QueryError::unsupported_query(
289 "execute_sql_query rejects DELETE; use execute_sql_update::<E>()",
290 ))
291 }
292 (SqlCompiledCommandSurface::Update, SqlStatement::Select(_)) => {
293 Err(QueryError::unsupported_query(
294 "execute_sql_update rejects SELECT; use execute_sql_query::<E>()",
295 ))
296 }
297 (SqlCompiledCommandSurface::Update, SqlStatement::Explain(_)) => {
298 Err(QueryError::unsupported_query(
299 "execute_sql_update rejects EXPLAIN; use execute_sql_query::<E>()",
300 ))
301 }
302 (SqlCompiledCommandSurface::Update, SqlStatement::Describe(_)) => {
303 Err(QueryError::unsupported_query(
304 "execute_sql_update rejects DESCRIBE; use execute_sql_query::<E>()",
305 ))
306 }
307 (SqlCompiledCommandSurface::Update, SqlStatement::ShowIndexes(_)) => {
308 Err(QueryError::unsupported_query(
309 "execute_sql_update rejects SHOW INDEXES; use execute_sql_query::<E>()",
310 ))
311 }
312 (SqlCompiledCommandSurface::Update, SqlStatement::ShowColumns(_)) => {
313 Err(QueryError::unsupported_query(
314 "execute_sql_update rejects SHOW COLUMNS; use execute_sql_query::<E>()",
315 ))
316 }
317 (SqlCompiledCommandSurface::Update, SqlStatement::ShowEntities(_)) => {
318 Err(QueryError::unsupported_query(
319 "execute_sql_update rejects SHOW ENTITIES; use execute_sql_query::<E>()",
320 ))
321 }
322 }
323 }
324
325 pub fn execute_sql_query<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
330 where
331 E: PersistedRow<Canister = C> + EntityValue,
332 {
333 let compiled = self.compile_sql_query::<E>(sql)?;
334
335 self.execute_compiled_sql_owned::<E>(compiled)
336 }
337
338 #[cfg(feature = "diagnostics")]
341 #[doc(hidden)]
342 pub fn execute_sql_query_with_attribution<E>(
343 &self,
344 sql: &str,
345 ) -> Result<(SqlStatementResult, SqlQueryExecutionAttribution), QueryError>
346 where
347 E: PersistedRow<Canister = C> + EntityValue,
348 {
349 let (compile_local_instructions, compiled) =
352 measure_sql_stage(|| self.compile_sql_query_with_cache_attribution::<E>(sql));
353 let (compiled, compile_cache_attribution, compile_phase_attribution) = compiled?;
354
355 let store_get_calls_before = DataStore::current_get_call_count();
358 let pure_covering_decode_before = current_pure_covering_decode_local_instructions();
359 let pure_covering_row_assembly_before =
360 current_pure_covering_row_assembly_local_instructions();
361 let (result, execute_cache_attribution, execute_phase_attribution) =
362 self.execute_compiled_sql_with_phase_attribution::<E>(&compiled)?;
363 let store_get_calls =
364 DataStore::current_get_call_count().saturating_sub(store_get_calls_before);
365 let pure_covering_decode_local_instructions =
366 current_pure_covering_decode_local_instructions()
367 .saturating_sub(pure_covering_decode_before);
368 let pure_covering_row_assembly_local_instructions =
369 current_pure_covering_row_assembly_local_instructions()
370 .saturating_sub(pure_covering_row_assembly_before);
371 let execute_local_instructions = execute_phase_attribution
372 .planner_local_instructions
373 .saturating_add(execute_phase_attribution.store_local_instructions)
374 .saturating_add(execute_phase_attribution.executor_local_instructions)
375 .saturating_add(execute_phase_attribution.response_finalization_local_instructions);
376 let cache_attribution = compile_cache_attribution.merge(execute_cache_attribution);
377 let total_local_instructions =
378 compile_local_instructions.saturating_add(execute_local_instructions);
379 let grouped = matches!(&result, SqlStatementResult::Grouped { .. }).then_some(
380 GroupedExecutionAttribution {
381 stream_local_instructions: execute_phase_attribution
382 .grouped_stream_local_instructions,
383 fold_local_instructions: execute_phase_attribution.grouped_fold_local_instructions,
384 finalize_local_instructions: execute_phase_attribution
385 .grouped_finalize_local_instructions,
386 count: GroupedCountAttribution::from_executor(
387 execute_phase_attribution.grouped_count,
388 ),
389 },
390 );
391 let pure_covering = (pure_covering_decode_local_instructions > 0
392 || pure_covering_row_assembly_local_instructions > 0)
393 .then_some(SqlPureCoveringAttribution {
394 decode_local_instructions: pure_covering_decode_local_instructions,
395 row_assembly_local_instructions: pure_covering_row_assembly_local_instructions,
396 });
397
398 Ok((
399 result,
400 SqlQueryExecutionAttribution {
401 compile_local_instructions,
402 compile: SqlCompileAttribution {
403 cache_key_local_instructions: compile_phase_attribution.cache_key,
404 cache_lookup_local_instructions: compile_phase_attribution.cache_lookup,
405 parse_local_instructions: compile_phase_attribution.parse,
406 parse_tokenize_local_instructions: compile_phase_attribution.parse_tokenize,
407 parse_select_local_instructions: compile_phase_attribution.parse_select,
408 parse_expr_local_instructions: compile_phase_attribution.parse_expr,
409 parse_predicate_local_instructions: compile_phase_attribution.parse_predicate,
410 aggregate_lane_check_local_instructions: compile_phase_attribution
411 .aggregate_lane_check,
412 prepare_local_instructions: compile_phase_attribution.prepare,
413 lower_local_instructions: compile_phase_attribution.lower,
414 bind_local_instructions: compile_phase_attribution.bind,
415 cache_insert_local_instructions: compile_phase_attribution.cache_insert,
416 },
417 plan_lookup_local_instructions: execute_phase_attribution
418 .planner_local_instructions,
419 execution: SqlExecutionAttribution {
420 planner_local_instructions: execute_phase_attribution
421 .planner_local_instructions,
422 store_local_instructions: execute_phase_attribution.store_local_instructions,
423 executor_invocation_local_instructions: execute_phase_attribution
424 .executor_invocation_local_instructions,
425 executor_local_instructions: execute_phase_attribution
426 .executor_local_instructions,
427 response_finalization_local_instructions: execute_phase_attribution
428 .response_finalization_local_instructions,
429 },
430 grouped,
431 scalar_aggregate: SqlScalarAggregateAttribution::from_executor(
432 execute_phase_attribution.scalar_aggregate_terminal,
433 ),
434 pure_covering,
435 store_get_calls,
436 response_decode_local_instructions: 0,
437 execute_local_instructions,
438 total_local_instructions,
439 cache: SqlQueryCacheAttribution {
440 sql_compiled_command_hits: cache_attribution.sql_compiled_command_cache_hits,
441 sql_compiled_command_misses: cache_attribution
442 .sql_compiled_command_cache_misses,
443 shared_query_plan_hits: cache_attribution.shared_query_plan_cache_hits,
444 shared_query_plan_misses: cache_attribution.shared_query_plan_cache_misses,
445 },
446 },
447 ))
448 }
449
450 pub fn execute_sql_update<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
455 where
456 E: PersistedRow<Canister = C> + EntityValue,
457 {
458 let compiled = self.compile_sql_update::<E>(sql)?;
459
460 self.execute_compiled_sql_owned::<E>(compiled)
461 }
462
463 pub fn prepare_sql_ddl<E>(&self, sql: &str) -> Result<SqlDdlPreparationReport, QueryError>
470 where
471 E: PersistedRow<Canister = C> + EntityValue,
472 {
473 let (_, prepared) = self.prepare_sql_ddl_command::<E>(sql)?;
474
475 Ok(prepared.report().clone())
476 }
477
478 fn prepare_sql_ddl_command<E>(
479 &self,
480 sql: &str,
481 ) -> Result<(AcceptedSchemaSnapshot, PreparedSqlDdlCommand), QueryError>
482 where
483 E: PersistedRow<Canister = C> + EntityValue,
484 {
485 let (statement, _) =
486 parse_sql_with_attribution(sql).map_err(QueryError::from_sql_parse_error)?;
487 let (accepted_schema, _) = self
488 .accepted_entity_authority::<E>()
489 .map_err(QueryError::execute)?;
490 let schema_info = SchemaInfo::from_accepted_snapshot_for_model_with_expression_indexes(
491 E::MODEL,
492 &accepted_schema,
493 true,
494 );
495 let prepared = match prepare_sql_ddl_statement(
496 &statement,
497 &accepted_schema,
498 &schema_info,
499 E::Store::PATH,
500 ) {
501 Ok(prepared) => prepared,
502 Err(err) => {
503 return Err(QueryError::unsupported_query(format!(
504 "SQL DDL preparation failed before execution: {err}"
505 )));
506 }
507 };
508
509 Ok((accepted_schema, prepared))
510 }
511
512 pub fn execute_sql_ddl<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
517 where
518 E: PersistedRow<Canister = C> + EntityValue,
519 {
520 let (accepted_before, prepared) = self.prepare_sql_ddl_command::<E>(sql)?;
521 if !prepared.mutates_schema() {
522 return Ok(SqlStatementResult::Ddl(
523 prepared
524 .report()
525 .clone()
526 .with_execution_status(SqlDdlExecutionStatus::NoOp),
527 ));
528 }
529
530 let Some(derivation) = prepared.derivation() else {
531 return Err(QueryError::unsupported_query(
532 "SQL DDL execution could not find a prepared schema derivation".to_string(),
533 ));
534 };
535 let store = self
536 .db
537 .recovered_store(E::Store::PATH)
538 .map_err(QueryError::execute)?;
539
540 let (rows_scanned, index_keys_written) = Self::execute_prepared_sql_ddl_mutation::<E>(
541 store,
542 &accepted_before,
543 derivation,
544 &prepared,
545 )?;
546
547 Ok(SqlStatementResult::Ddl(
548 prepared
549 .report()
550 .clone()
551 .with_execution_status(SqlDdlExecutionStatus::Published)
552 .with_execution_metrics(rows_scanned, index_keys_written),
553 ))
554 }
555
556 fn execute_prepared_sql_ddl_mutation<E>(
557 store: crate::db::registry::StoreHandle,
558 accepted_before: &AcceptedSchemaSnapshot,
559 derivation: &crate::db::schema::SchemaDdlAcceptedSnapshotDerivation,
560 prepared: &PreparedSqlDdlCommand,
561 ) -> Result<(usize, usize), QueryError>
562 where
563 E: PersistedRow<Canister = C> + EntityValue,
564 {
565 let metrics = match prepared.bound().statement() {
566 crate::db::sql::ddl::BoundSqlDdlStatement::AddColumn(_) => {
567 execute_sql_ddl_field_addition(
568 store,
569 E::ENTITY_TAG,
570 E::PATH,
571 accepted_before,
572 derivation,
573 )
574 .map_err(QueryError::execute)?;
575
576 (0, 0)
577 }
578 crate::db::sql::ddl::BoundSqlDdlStatement::AlterColumnDefault(_) => {
579 execute_sql_ddl_field_default_change(
580 store,
581 E::ENTITY_TAG,
582 E::PATH,
583 accepted_before,
584 derivation,
585 )
586 .map_err(QueryError::execute)?;
587
588 (0, 0)
589 }
590 crate::db::sql::ddl::BoundSqlDdlStatement::AlterColumnNullability(_) => {
591 let rows_scanned = execute_sql_ddl_field_nullability_change(
592 store,
593 E::ENTITY_TAG,
594 E::PATH,
595 accepted_before,
596 derivation,
597 )
598 .map_err(QueryError::execute)?;
599
600 (rows_scanned, 0)
601 }
602 crate::db::sql::ddl::BoundSqlDdlStatement::DropColumn(_) => {
603 execute_sql_ddl_field_drop(
604 store,
605 E::ENTITY_TAG,
606 E::PATH,
607 accepted_before,
608 derivation,
609 )
610 .map_err(QueryError::execute)?;
611
612 (0, 0)
613 }
614 crate::db::sql::ddl::BoundSqlDdlStatement::RenameColumn(_) => {
615 execute_sql_ddl_field_rename(
616 store,
617 E::ENTITY_TAG,
618 E::PATH,
619 accepted_before,
620 derivation,
621 )
622 .map_err(QueryError::execute)?;
623
624 (0, 0)
625 }
626 crate::db::sql::ddl::BoundSqlDdlStatement::CreateIndex(create)
627 if create.candidate_index().key().is_field_path_only() =>
628 {
629 execute_sql_ddl_field_path_index_addition(
630 store,
631 E::ENTITY_TAG,
632 E::PATH,
633 accepted_before,
634 derivation,
635 )
636 .map_err(QueryError::execute)?
637 }
638 crate::db::sql::ddl::BoundSqlDdlStatement::CreateIndex(_) => {
639 execute_sql_ddl_expression_index_addition(
640 store,
641 E::ENTITY_TAG,
642 E::PATH,
643 accepted_before,
644 derivation,
645 )
646 .map_err(QueryError::execute)?
647 }
648 crate::db::sql::ddl::BoundSqlDdlStatement::DropIndex(_) => {
649 execute_sql_ddl_secondary_index_drop(
650 store,
651 E::ENTITY_TAG,
652 E::PATH,
653 accepted_before,
654 derivation,
655 )
656 .map_err(QueryError::execute)?;
657
658 (0, 0)
659 }
660 crate::db::sql::ddl::BoundSqlDdlStatement::NoOp(_) => (0, 0),
661 };
662
663 Ok(metrics)
664 }
665}