1mod attribution;
8mod cache;
9mod compile;
10mod compile_cache;
11mod compiled;
12mod execute;
13mod projection;
14mod result;
15
16#[cfg(feature = "diagnostics")]
17use crate::db::DataStore;
18#[cfg(feature = "diagnostics")]
19use crate::db::executor::{
20 current_pure_covering_decode_local_instructions,
21 current_pure_covering_row_assembly_local_instructions,
22};
23#[cfg(test)]
24use crate::db::sql::parser::parse_sql;
25#[cfg(feature = "diagnostics")]
26use crate::db::{GroupedCountAttribution, GroupedExecutionAttribution};
27use crate::{
28 db::{
29 DbSession, PersistedRow, QueryError,
30 executor::{EntityAuthority, SharedPreparedExecutionPlan},
31 query::intent::StructuralQuery,
32 schema::{AcceptedSchemaSnapshot, SchemaInfo},
33 schema::{
34 execute_sql_ddl_expression_index_addition, execute_sql_ddl_field_addition,
35 execute_sql_ddl_field_default_change, execute_sql_ddl_field_drop,
36 execute_sql_ddl_field_nullability_change, execute_sql_ddl_field_path_index_addition,
37 execute_sql_ddl_field_rename, execute_sql_ddl_secondary_index_drop,
38 },
39 session::query::QueryPlanCacheAttribution,
40 session::sql::projection::{
41 projection_fixed_scales_from_projection_spec, projection_labels_from_projection_spec,
42 },
43 sql::{
44 ddl::{PreparedSqlDdlCommand, prepare_sql_ddl_statement},
45 parser::{SqlDdlStatement, SqlExplainTarget, SqlStatement, parse_sql_with_attribution},
46 },
47 },
48 traits::{CanisterKind, EntityValue, Path},
49};
50
51pub(in crate::db::session::sql) use crate::db::diagnostics::measure_local_instruction_delta as measure_sql_stage;
52pub use crate::db::sql::ddl::{SqlDdlExecutionStatus, SqlDdlMutationKind, SqlDdlPreparationReport};
53#[cfg(feature = "diagnostics")]
54pub(in crate::db) use attribution::SqlExecutePhaseAttribution;
55#[cfg(feature = "diagnostics")]
56pub use attribution::{
57 SqlCompileAttribution, SqlExecutionAttribution, SqlPureCoveringAttribution,
58 SqlQueryCacheAttribution, SqlQueryExecutionAttribution, SqlScalarAggregateAttribution,
59};
60pub(in crate::db) use cache::{SqlCacheAttribution, SqlCompiledCommandCacheKey};
61pub(in crate::db::session::sql) use cache::{
62 SqlCompiledCommandSurface, sql_compiled_command_cache_miss_reason,
63};
64pub(in crate::db::session::sql) use compile::{
65 SqlCompileAttributionBuilder, SqlCompilePhaseAttribution,
66};
67pub(in crate::db) use compiled::{CompiledSqlCommand, SqlProjectionContract};
68pub use result::SqlStatementResult;
69
70#[cfg(all(test, not(feature = "diagnostics")))]
71pub(crate) use crate::db::session::sql::projection::with_sql_projection_materialization_metrics;
72#[cfg(feature = "diagnostics")]
73pub use crate::db::session::sql::projection::{
74 SqlProjectionMaterializationMetrics, with_sql_projection_materialization_metrics,
75};
76
77#[cfg(test)]
80pub(in crate::db) fn parse_sql_statement(sql: &str) -> Result<SqlStatement, QueryError> {
81 parse_sql(sql).map_err(QueryError::from_sql_parse_error)
82}
83
84#[doc(hidden)]
89pub fn sql_statement_entity_name(sql: &str) -> Result<Option<String>, QueryError> {
90 let (statement, _) =
91 parse_sql_with_attribution(sql).map_err(QueryError::from_sql_parse_error)?;
92
93 Ok(sql_statement_entity_name_from_statement(&statement).map(str::to_string))
94}
95
96const fn sql_statement_entity_name_from_statement(statement: &SqlStatement) -> Option<&str> {
97 match statement {
98 SqlStatement::Select(statement) => Some(statement.entity.as_str()),
99 SqlStatement::Delete(statement) => Some(statement.entity.as_str()),
100 SqlStatement::Insert(statement) => Some(statement.entity.as_str()),
101 SqlStatement::Update(statement) => Some(statement.entity.as_str()),
102 SqlStatement::Ddl(SqlDdlStatement::CreateIndex(statement)) => {
103 Some(statement.entity.as_str())
104 }
105 SqlStatement::Ddl(SqlDdlStatement::DropIndex(statement)) => match &statement.entity {
106 Some(entity) => Some(entity.as_str()),
107 None => None,
108 },
109 SqlStatement::Ddl(SqlDdlStatement::AlterTableAddColumn(statement)) => {
110 Some(statement.entity.as_str())
111 }
112 SqlStatement::Ddl(SqlDdlStatement::AlterTableAlterColumn(statement)) => {
113 Some(statement.entity.as_str())
114 }
115 SqlStatement::Ddl(SqlDdlStatement::AlterTableDropColumn(statement)) => {
116 Some(statement.entity.as_str())
117 }
118 SqlStatement::Ddl(SqlDdlStatement::AlterTableRenameColumn(statement)) => {
119 Some(statement.entity.as_str())
120 }
121 SqlStatement::Explain(statement) => match &statement.statement {
122 SqlExplainTarget::Select(statement) => Some(statement.entity.as_str()),
123 SqlExplainTarget::Delete(statement) => Some(statement.entity.as_str()),
124 },
125 SqlStatement::Describe(statement) => Some(statement.entity.as_str()),
126 SqlStatement::ShowIndexes(statement) => Some(statement.entity.as_str()),
127 SqlStatement::ShowColumns(statement) => Some(statement.entity.as_str()),
128 SqlStatement::ShowEntities(_) => None,
129 }
130}
131
132fn measured<T>(stage: impl FnOnce() -> Result<T, QueryError>) -> Result<(u64, T), QueryError> {
136 let (local_instructions, result) = measure_sql_stage(stage);
137 let value = result?;
138
139 Ok((local_instructions, value))
140}
141
142impl<C: CanisterKind> DbSession<C> {
143 fn sql_select_prepared_plan_for_accepted_authority(
147 &self,
148 query: &StructuralQuery,
149 authority: EntityAuthority,
150 accepted_schema: &AcceptedSchemaSnapshot,
151 ) -> Result<
152 (
153 SharedPreparedExecutionPlan,
154 SqlProjectionContract,
155 SqlCacheAttribution,
156 ),
157 QueryError,
158 > {
159 let (prepared_plan, cache_attribution) = self
160 .cached_shared_query_plan_for_accepted_authority(
161 authority.clone(),
162 accepted_schema,
163 query,
164 )?;
165 Ok(Self::sql_select_projection_from_prepared_plan(
166 prepared_plan,
167 authority,
168 cache_attribution,
169 ))
170 }
171
172 fn sql_select_prepared_plan_for_entity<E>(
175 &self,
176 query: &StructuralQuery,
177 ) -> Result<
178 (
179 SharedPreparedExecutionPlan,
180 SqlProjectionContract,
181 SqlCacheAttribution,
182 ),
183 QueryError,
184 >
185 where
186 E: PersistedRow<Canister = C> + EntityValue,
187 {
188 let (accepted_schema, authority) = self
189 .accepted_entity_authority::<E>()
190 .map_err(QueryError::execute)?;
191
192 self.sql_select_prepared_plan_for_accepted_authority(query, authority, &accepted_schema)
193 }
194
195 fn sql_select_projection_from_prepared_plan(
196 prepared_plan: SharedPreparedExecutionPlan,
197 authority: EntityAuthority,
198 cache_attribution: QueryPlanCacheAttribution,
199 ) -> (
200 SharedPreparedExecutionPlan,
201 SqlProjectionContract,
202 SqlCacheAttribution,
203 ) {
204 let projection_spec = prepared_plan
205 .logical_plan()
206 .projection_spec(authority.model());
207 let projection = SqlProjectionContract::new(
208 projection_labels_from_projection_spec(&projection_spec),
209 projection_fixed_scales_from_projection_spec(&projection_spec),
210 );
211
212 (
213 prepared_plan,
214 projection,
215 SqlCacheAttribution::from_shared_query_plan_cache(cache_attribution),
216 )
217 }
218
219 fn ensure_sql_statement_supported_for_surface(
223 statement: &SqlStatement,
224 surface: SqlCompiledCommandSurface,
225 ) -> Result<(), QueryError> {
226 match (surface, statement) {
227 (
228 SqlCompiledCommandSurface::Query,
229 SqlStatement::Select(_)
230 | SqlStatement::Explain(_)
231 | SqlStatement::Describe(_)
232 | SqlStatement::ShowIndexes(_)
233 | SqlStatement::ShowColumns(_)
234 | SqlStatement::ShowEntities(_),
235 )
236 | (
237 SqlCompiledCommandSurface::Update,
238 SqlStatement::Insert(_) | SqlStatement::Update(_) | SqlStatement::Delete(_),
239 ) => Ok(()),
240 (_, SqlStatement::Ddl(_)) => Err(QueryError::unsupported_query(
241 "SQL DDL execution is not supported in this release",
242 )),
243 (SqlCompiledCommandSurface::Query, SqlStatement::Insert(_)) => {
244 Err(QueryError::unsupported_query(
245 "execute_sql_query rejects INSERT; use execute_sql_update::<E>()",
246 ))
247 }
248 (SqlCompiledCommandSurface::Query, SqlStatement::Update(_)) => {
249 Err(QueryError::unsupported_query(
250 "execute_sql_query rejects UPDATE; use execute_sql_update::<E>()",
251 ))
252 }
253 (SqlCompiledCommandSurface::Query, SqlStatement::Delete(_)) => {
254 Err(QueryError::unsupported_query(
255 "execute_sql_query rejects DELETE; use execute_sql_update::<E>()",
256 ))
257 }
258 (SqlCompiledCommandSurface::Update, SqlStatement::Select(_)) => {
259 Err(QueryError::unsupported_query(
260 "execute_sql_update rejects SELECT; use execute_sql_query::<E>()",
261 ))
262 }
263 (SqlCompiledCommandSurface::Update, SqlStatement::Explain(_)) => {
264 Err(QueryError::unsupported_query(
265 "execute_sql_update rejects EXPLAIN; use execute_sql_query::<E>()",
266 ))
267 }
268 (SqlCompiledCommandSurface::Update, SqlStatement::Describe(_)) => {
269 Err(QueryError::unsupported_query(
270 "execute_sql_update rejects DESCRIBE; use execute_sql_query::<E>()",
271 ))
272 }
273 (SqlCompiledCommandSurface::Update, SqlStatement::ShowIndexes(_)) => {
274 Err(QueryError::unsupported_query(
275 "execute_sql_update rejects SHOW INDEXES; use execute_sql_query::<E>()",
276 ))
277 }
278 (SqlCompiledCommandSurface::Update, SqlStatement::ShowColumns(_)) => {
279 Err(QueryError::unsupported_query(
280 "execute_sql_update rejects SHOW COLUMNS; use execute_sql_query::<E>()",
281 ))
282 }
283 (SqlCompiledCommandSurface::Update, SqlStatement::ShowEntities(_)) => {
284 Err(QueryError::unsupported_query(
285 "execute_sql_update rejects SHOW ENTITIES; use execute_sql_query::<E>()",
286 ))
287 }
288 }
289 }
290
291 pub fn execute_sql_query<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
296 where
297 E: PersistedRow<Canister = C> + EntityValue,
298 {
299 let compiled = self.compile_sql_query::<E>(sql)?;
300
301 self.execute_compiled_sql_owned::<E>(compiled)
302 }
303
304 #[cfg(feature = "diagnostics")]
307 #[doc(hidden)]
308 pub fn execute_sql_query_with_attribution<E>(
309 &self,
310 sql: &str,
311 ) -> Result<(SqlStatementResult, SqlQueryExecutionAttribution), QueryError>
312 where
313 E: PersistedRow<Canister = C> + EntityValue,
314 {
315 let (compile_local_instructions, compiled) =
318 measure_sql_stage(|| self.compile_sql_query_with_cache_attribution::<E>(sql));
319 let (compiled, compile_cache_attribution, compile_phase_attribution) = compiled?;
320
321 let store_get_calls_before = DataStore::current_get_call_count();
324 let pure_covering_decode_before = current_pure_covering_decode_local_instructions();
325 let pure_covering_row_assembly_before =
326 current_pure_covering_row_assembly_local_instructions();
327 let (result, execute_cache_attribution, execute_phase_attribution) =
328 self.execute_compiled_sql_with_phase_attribution::<E>(&compiled)?;
329 let store_get_calls =
330 DataStore::current_get_call_count().saturating_sub(store_get_calls_before);
331 let pure_covering_decode_local_instructions =
332 current_pure_covering_decode_local_instructions()
333 .saturating_sub(pure_covering_decode_before);
334 let pure_covering_row_assembly_local_instructions =
335 current_pure_covering_row_assembly_local_instructions()
336 .saturating_sub(pure_covering_row_assembly_before);
337 let execute_local_instructions = execute_phase_attribution
338 .planner_local_instructions
339 .saturating_add(execute_phase_attribution.store_local_instructions)
340 .saturating_add(execute_phase_attribution.executor_local_instructions)
341 .saturating_add(execute_phase_attribution.response_finalization_local_instructions);
342 let cache_attribution = compile_cache_attribution.merge(execute_cache_attribution);
343 let total_local_instructions =
344 compile_local_instructions.saturating_add(execute_local_instructions);
345 let grouped = matches!(&result, SqlStatementResult::Grouped { .. }).then_some(
346 GroupedExecutionAttribution {
347 stream_local_instructions: execute_phase_attribution
348 .grouped_stream_local_instructions,
349 fold_local_instructions: execute_phase_attribution.grouped_fold_local_instructions,
350 finalize_local_instructions: execute_phase_attribution
351 .grouped_finalize_local_instructions,
352 count: GroupedCountAttribution::from_executor(
353 execute_phase_attribution.grouped_count,
354 ),
355 },
356 );
357 let pure_covering = (pure_covering_decode_local_instructions > 0
358 || pure_covering_row_assembly_local_instructions > 0)
359 .then_some(SqlPureCoveringAttribution {
360 decode_local_instructions: pure_covering_decode_local_instructions,
361 row_assembly_local_instructions: pure_covering_row_assembly_local_instructions,
362 });
363
364 Ok((
365 result,
366 SqlQueryExecutionAttribution {
367 compile_local_instructions,
368 compile: SqlCompileAttribution {
369 cache_key_local_instructions: compile_phase_attribution.cache_key,
370 cache_lookup_local_instructions: compile_phase_attribution.cache_lookup,
371 parse_local_instructions: compile_phase_attribution.parse,
372 parse_tokenize_local_instructions: compile_phase_attribution.parse_tokenize,
373 parse_select_local_instructions: compile_phase_attribution.parse_select,
374 parse_expr_local_instructions: compile_phase_attribution.parse_expr,
375 parse_predicate_local_instructions: compile_phase_attribution.parse_predicate,
376 aggregate_lane_check_local_instructions: compile_phase_attribution
377 .aggregate_lane_check,
378 prepare_local_instructions: compile_phase_attribution.prepare,
379 lower_local_instructions: compile_phase_attribution.lower,
380 bind_local_instructions: compile_phase_attribution.bind,
381 cache_insert_local_instructions: compile_phase_attribution.cache_insert,
382 },
383 plan_lookup_local_instructions: execute_phase_attribution
384 .planner_local_instructions,
385 execution: SqlExecutionAttribution {
386 planner_local_instructions: execute_phase_attribution
387 .planner_local_instructions,
388 store_local_instructions: execute_phase_attribution.store_local_instructions,
389 executor_invocation_local_instructions: execute_phase_attribution
390 .executor_invocation_local_instructions,
391 executor_local_instructions: execute_phase_attribution
392 .executor_local_instructions,
393 response_finalization_local_instructions: execute_phase_attribution
394 .response_finalization_local_instructions,
395 },
396 grouped,
397 scalar_aggregate: SqlScalarAggregateAttribution::from_executor(
398 execute_phase_attribution.scalar_aggregate_terminal,
399 ),
400 pure_covering,
401 store_get_calls,
402 response_decode_local_instructions: 0,
403 execute_local_instructions,
404 total_local_instructions,
405 cache: SqlQueryCacheAttribution {
406 sql_compiled_command_hits: cache_attribution.sql_compiled_command_cache_hits,
407 sql_compiled_command_misses: cache_attribution
408 .sql_compiled_command_cache_misses,
409 shared_query_plan_hits: cache_attribution.shared_query_plan_cache_hits,
410 shared_query_plan_misses: cache_attribution.shared_query_plan_cache_misses,
411 },
412 },
413 ))
414 }
415
416 pub fn execute_sql_update<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
421 where
422 E: PersistedRow<Canister = C> + EntityValue,
423 {
424 let compiled = self.compile_sql_update::<E>(sql)?;
425
426 self.execute_compiled_sql_owned::<E>(compiled)
427 }
428
429 pub fn prepare_sql_ddl<E>(&self, sql: &str) -> Result<SqlDdlPreparationReport, QueryError>
436 where
437 E: PersistedRow<Canister = C> + EntityValue,
438 {
439 let (_, prepared) = self.prepare_sql_ddl_command::<E>(sql)?;
440
441 Ok(prepared.report().clone())
442 }
443
444 fn prepare_sql_ddl_command<E>(
445 &self,
446 sql: &str,
447 ) -> Result<(AcceptedSchemaSnapshot, PreparedSqlDdlCommand), QueryError>
448 where
449 E: PersistedRow<Canister = C> + EntityValue,
450 {
451 let (statement, _) =
452 parse_sql_with_attribution(sql).map_err(QueryError::from_sql_parse_error)?;
453 let (accepted_schema, _) = self
454 .accepted_entity_authority::<E>()
455 .map_err(QueryError::execute)?;
456 let schema_info = SchemaInfo::from_accepted_snapshot_for_model_with_expression_indexes(
457 E::MODEL,
458 &accepted_schema,
459 true,
460 );
461 let prepared = match prepare_sql_ddl_statement(
462 &statement,
463 &accepted_schema,
464 &schema_info,
465 E::Store::PATH,
466 ) {
467 Ok(prepared) => prepared,
468 Err(err) => {
469 return Err(QueryError::unsupported_query(format!(
470 "SQL DDL preparation failed before execution: {err}"
471 )));
472 }
473 };
474
475 Ok((accepted_schema, prepared))
476 }
477
478 pub fn execute_sql_ddl<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
483 where
484 E: PersistedRow<Canister = C> + EntityValue,
485 {
486 let (accepted_before, prepared) = self.prepare_sql_ddl_command::<E>(sql)?;
487 if !prepared.mutates_schema() {
488 return Ok(SqlStatementResult::Ddl(
489 prepared
490 .report()
491 .clone()
492 .with_execution_status(SqlDdlExecutionStatus::NoOp),
493 ));
494 }
495
496 let Some(derivation) = prepared.derivation() else {
497 return Err(QueryError::unsupported_query(
498 "SQL DDL execution could not find a prepared schema derivation".to_string(),
499 ));
500 };
501 let store = self
502 .db
503 .recovered_store(E::Store::PATH)
504 .map_err(QueryError::execute)?;
505
506 let (rows_scanned, index_keys_written) = Self::execute_prepared_sql_ddl_mutation::<E>(
507 store,
508 &accepted_before,
509 derivation,
510 &prepared,
511 )?;
512
513 Ok(SqlStatementResult::Ddl(
514 prepared
515 .report()
516 .clone()
517 .with_execution_status(SqlDdlExecutionStatus::Published)
518 .with_execution_metrics(rows_scanned, index_keys_written),
519 ))
520 }
521
522 fn execute_prepared_sql_ddl_mutation<E>(
523 store: crate::db::registry::StoreHandle,
524 accepted_before: &AcceptedSchemaSnapshot,
525 derivation: &crate::db::schema::SchemaDdlAcceptedSnapshotDerivation,
526 prepared: &PreparedSqlDdlCommand,
527 ) -> Result<(usize, usize), QueryError>
528 where
529 E: PersistedRow<Canister = C> + EntityValue,
530 {
531 let metrics = match prepared.bound().statement() {
532 crate::db::sql::ddl::BoundSqlDdlStatement::AddColumn(_) => {
533 execute_sql_ddl_field_addition(
534 store,
535 E::ENTITY_TAG,
536 E::PATH,
537 accepted_before,
538 derivation,
539 )
540 .map_err(QueryError::execute)?;
541
542 (0, 0)
543 }
544 crate::db::sql::ddl::BoundSqlDdlStatement::AlterColumnDefault(_) => {
545 execute_sql_ddl_field_default_change(
546 store,
547 E::ENTITY_TAG,
548 E::PATH,
549 accepted_before,
550 derivation,
551 )
552 .map_err(QueryError::execute)?;
553
554 (0, 0)
555 }
556 crate::db::sql::ddl::BoundSqlDdlStatement::AlterColumnNullability(_) => {
557 let rows_scanned = execute_sql_ddl_field_nullability_change(
558 store,
559 E::ENTITY_TAG,
560 E::PATH,
561 accepted_before,
562 derivation,
563 )
564 .map_err(QueryError::execute)?;
565
566 (rows_scanned, 0)
567 }
568 crate::db::sql::ddl::BoundSqlDdlStatement::DropColumn(_) => {
569 execute_sql_ddl_field_drop(
570 store,
571 E::ENTITY_TAG,
572 E::PATH,
573 accepted_before,
574 derivation,
575 )
576 .map_err(QueryError::execute)?;
577
578 (0, 0)
579 }
580 crate::db::sql::ddl::BoundSqlDdlStatement::RenameColumn(_) => {
581 execute_sql_ddl_field_rename(
582 store,
583 E::ENTITY_TAG,
584 E::PATH,
585 accepted_before,
586 derivation,
587 )
588 .map_err(QueryError::execute)?;
589
590 (0, 0)
591 }
592 crate::db::sql::ddl::BoundSqlDdlStatement::CreateIndex(create)
593 if create.candidate_index().key().is_field_path_only() =>
594 {
595 execute_sql_ddl_field_path_index_addition(
596 store,
597 E::ENTITY_TAG,
598 E::PATH,
599 accepted_before,
600 derivation,
601 )
602 .map_err(QueryError::execute)?
603 }
604 crate::db::sql::ddl::BoundSqlDdlStatement::CreateIndex(_) => {
605 execute_sql_ddl_expression_index_addition(
606 store,
607 E::ENTITY_TAG,
608 E::PATH,
609 accepted_before,
610 derivation,
611 )
612 .map_err(QueryError::execute)?
613 }
614 crate::db::sql::ddl::BoundSqlDdlStatement::DropIndex(_) => {
615 execute_sql_ddl_secondary_index_drop(
616 store,
617 E::ENTITY_TAG,
618 E::PATH,
619 accepted_before,
620 derivation,
621 )
622 .map_err(QueryError::execute)?;
623
624 (0, 0)
625 }
626 crate::db::sql::ddl::BoundSqlDdlStatement::NoOp(_) => (0, 0),
627 };
628
629 Ok(metrics)
630 }
631}