1mod attribution;
8mod cache;
9mod compile;
10mod compile_cache;
11mod compiled;
12mod execute;
13mod projection;
14mod result;
15
16#[cfg(feature = "diagnostics")]
17use crate::db::DataStore;
18#[cfg(feature = "diagnostics")]
19use crate::db::executor::{
20 current_pure_covering_decode_local_instructions,
21 current_pure_covering_row_assembly_local_instructions,
22};
23#[cfg(test)]
24use crate::db::sql::parser::parse_sql;
25#[cfg(feature = "diagnostics")]
26use crate::db::{GroupedCountAttribution, GroupedExecutionAttribution};
27use crate::{
28 db::{
29 DbSession, PersistedRow, QueryError,
30 executor::{EntityAuthority, SharedPreparedExecutionPlan},
31 query::intent::StructuralQuery,
32 schema::{AcceptedSchemaSnapshot, SchemaInfo},
33 schema::{
34 execute_sql_ddl_expression_index_addition, execute_sql_ddl_field_addition,
35 execute_sql_ddl_field_default_change, execute_sql_ddl_field_nullability_change,
36 execute_sql_ddl_field_path_index_addition, execute_sql_ddl_secondary_index_drop,
37 },
38 session::query::QueryPlanCacheAttribution,
39 session::sql::projection::{
40 projection_fixed_scales_from_projection_spec, projection_labels_from_projection_spec,
41 },
42 sql::{
43 ddl::{PreparedSqlDdlCommand, prepare_sql_ddl_statement},
44 parser::{SqlDdlStatement, SqlExplainTarget, SqlStatement, parse_sql_with_attribution},
45 },
46 },
47 traits::{CanisterKind, EntityValue, Path},
48};
49
50pub(in crate::db::session::sql) use crate::db::diagnostics::measure_local_instruction_delta as measure_sql_stage;
51pub use crate::db::sql::ddl::{SqlDdlExecutionStatus, SqlDdlMutationKind, SqlDdlPreparationReport};
52#[cfg(feature = "diagnostics")]
53pub(in crate::db) use attribution::SqlExecutePhaseAttribution;
54#[cfg(feature = "diagnostics")]
55pub use attribution::{
56 SqlCompileAttribution, SqlExecutionAttribution, SqlPureCoveringAttribution,
57 SqlQueryCacheAttribution, SqlQueryExecutionAttribution, SqlScalarAggregateAttribution,
58};
59pub(in crate::db) use cache::{SqlCacheAttribution, SqlCompiledCommandCacheKey};
60pub(in crate::db::session::sql) use cache::{
61 SqlCompiledCommandSurface, sql_compiled_command_cache_miss_reason,
62};
63pub(in crate::db::session::sql) use compile::{
64 SqlCompileAttributionBuilder, SqlCompilePhaseAttribution,
65};
66pub(in crate::db) use compiled::{CompiledSqlCommand, SqlProjectionContract};
67pub use result::SqlStatementResult;
68
69#[cfg(all(test, not(feature = "diagnostics")))]
70pub(crate) use crate::db::session::sql::projection::with_sql_projection_materialization_metrics;
71#[cfg(feature = "diagnostics")]
72pub use crate::db::session::sql::projection::{
73 SqlProjectionMaterializationMetrics, with_sql_projection_materialization_metrics,
74};
75
76#[cfg(test)]
79pub(in crate::db) fn parse_sql_statement(sql: &str) -> Result<SqlStatement, QueryError> {
80 parse_sql(sql).map_err(QueryError::from_sql_parse_error)
81}
82
83#[doc(hidden)]
88pub fn sql_statement_entity_name(sql: &str) -> Result<Option<String>, QueryError> {
89 let (statement, _) =
90 parse_sql_with_attribution(sql).map_err(QueryError::from_sql_parse_error)?;
91
92 Ok(sql_statement_entity_name_from_statement(&statement).map(str::to_string))
93}
94
95const fn sql_statement_entity_name_from_statement(statement: &SqlStatement) -> Option<&str> {
96 match statement {
97 SqlStatement::Select(statement) => Some(statement.entity.as_str()),
98 SqlStatement::Delete(statement) => Some(statement.entity.as_str()),
99 SqlStatement::Insert(statement) => Some(statement.entity.as_str()),
100 SqlStatement::Update(statement) => Some(statement.entity.as_str()),
101 SqlStatement::Ddl(SqlDdlStatement::CreateIndex(statement)) => {
102 Some(statement.entity.as_str())
103 }
104 SqlStatement::Ddl(SqlDdlStatement::DropIndex(statement)) => match &statement.entity {
105 Some(entity) => Some(entity.as_str()),
106 None => None,
107 },
108 SqlStatement::Ddl(SqlDdlStatement::AlterTableAddColumn(statement)) => {
109 Some(statement.entity.as_str())
110 }
111 SqlStatement::Ddl(SqlDdlStatement::AlterTableAlterColumn(statement)) => {
112 Some(statement.entity.as_str())
113 }
114 SqlStatement::Ddl(SqlDdlStatement::AlterTableDropColumn(statement)) => {
115 Some(statement.entity.as_str())
116 }
117 SqlStatement::Ddl(SqlDdlStatement::AlterTableRenameColumn(statement)) => {
118 Some(statement.entity.as_str())
119 }
120 SqlStatement::Explain(statement) => match &statement.statement {
121 SqlExplainTarget::Select(statement) => Some(statement.entity.as_str()),
122 SqlExplainTarget::Delete(statement) => Some(statement.entity.as_str()),
123 },
124 SqlStatement::Describe(statement) => Some(statement.entity.as_str()),
125 SqlStatement::ShowIndexes(statement) => Some(statement.entity.as_str()),
126 SqlStatement::ShowColumns(statement) => Some(statement.entity.as_str()),
127 SqlStatement::ShowEntities(_) => None,
128 }
129}
130
131fn measured<T>(stage: impl FnOnce() -> Result<T, QueryError>) -> Result<(u64, T), QueryError> {
135 let (local_instructions, result) = measure_sql_stage(stage);
136 let value = result?;
137
138 Ok((local_instructions, value))
139}
140
141impl<C: CanisterKind> DbSession<C> {
142 fn sql_select_prepared_plan_for_accepted_authority(
146 &self,
147 query: &StructuralQuery,
148 authority: EntityAuthority,
149 accepted_schema: &AcceptedSchemaSnapshot,
150 ) -> Result<
151 (
152 SharedPreparedExecutionPlan,
153 SqlProjectionContract,
154 SqlCacheAttribution,
155 ),
156 QueryError,
157 > {
158 let (prepared_plan, cache_attribution) = self
159 .cached_shared_query_plan_for_accepted_authority(
160 authority.clone(),
161 accepted_schema,
162 query,
163 )?;
164 Ok(Self::sql_select_projection_from_prepared_plan(
165 prepared_plan,
166 authority,
167 cache_attribution,
168 ))
169 }
170
171 fn sql_select_prepared_plan_for_entity<E>(
174 &self,
175 query: &StructuralQuery,
176 ) -> Result<
177 (
178 SharedPreparedExecutionPlan,
179 SqlProjectionContract,
180 SqlCacheAttribution,
181 ),
182 QueryError,
183 >
184 where
185 E: PersistedRow<Canister = C> + EntityValue,
186 {
187 let (accepted_schema, authority) = self
188 .accepted_entity_authority::<E>()
189 .map_err(QueryError::execute)?;
190
191 self.sql_select_prepared_plan_for_accepted_authority(query, authority, &accepted_schema)
192 }
193
194 fn sql_select_projection_from_prepared_plan(
195 prepared_plan: SharedPreparedExecutionPlan,
196 authority: EntityAuthority,
197 cache_attribution: QueryPlanCacheAttribution,
198 ) -> (
199 SharedPreparedExecutionPlan,
200 SqlProjectionContract,
201 SqlCacheAttribution,
202 ) {
203 let projection_spec = prepared_plan
204 .logical_plan()
205 .projection_spec(authority.model());
206 let projection = SqlProjectionContract::new(
207 projection_labels_from_projection_spec(&projection_spec),
208 projection_fixed_scales_from_projection_spec(&projection_spec),
209 );
210
211 (
212 prepared_plan,
213 projection,
214 SqlCacheAttribution::from_shared_query_plan_cache(cache_attribution),
215 )
216 }
217
218 fn ensure_sql_statement_supported_for_surface(
222 statement: &SqlStatement,
223 surface: SqlCompiledCommandSurface,
224 ) -> Result<(), QueryError> {
225 match (surface, statement) {
226 (
227 SqlCompiledCommandSurface::Query,
228 SqlStatement::Select(_)
229 | SqlStatement::Explain(_)
230 | SqlStatement::Describe(_)
231 | SqlStatement::ShowIndexes(_)
232 | SqlStatement::ShowColumns(_)
233 | SqlStatement::ShowEntities(_),
234 )
235 | (
236 SqlCompiledCommandSurface::Update,
237 SqlStatement::Insert(_) | SqlStatement::Update(_) | SqlStatement::Delete(_),
238 ) => Ok(()),
239 (_, SqlStatement::Ddl(_)) => Err(QueryError::unsupported_query(
240 "SQL DDL execution is not supported in this release",
241 )),
242 (SqlCompiledCommandSurface::Query, SqlStatement::Insert(_)) => {
243 Err(QueryError::unsupported_query(
244 "execute_sql_query rejects INSERT; use execute_sql_update::<E>()",
245 ))
246 }
247 (SqlCompiledCommandSurface::Query, SqlStatement::Update(_)) => {
248 Err(QueryError::unsupported_query(
249 "execute_sql_query rejects UPDATE; use execute_sql_update::<E>()",
250 ))
251 }
252 (SqlCompiledCommandSurface::Query, SqlStatement::Delete(_)) => {
253 Err(QueryError::unsupported_query(
254 "execute_sql_query rejects DELETE; use execute_sql_update::<E>()",
255 ))
256 }
257 (SqlCompiledCommandSurface::Update, SqlStatement::Select(_)) => {
258 Err(QueryError::unsupported_query(
259 "execute_sql_update rejects SELECT; use execute_sql_query::<E>()",
260 ))
261 }
262 (SqlCompiledCommandSurface::Update, SqlStatement::Explain(_)) => {
263 Err(QueryError::unsupported_query(
264 "execute_sql_update rejects EXPLAIN; use execute_sql_query::<E>()",
265 ))
266 }
267 (SqlCompiledCommandSurface::Update, SqlStatement::Describe(_)) => {
268 Err(QueryError::unsupported_query(
269 "execute_sql_update rejects DESCRIBE; use execute_sql_query::<E>()",
270 ))
271 }
272 (SqlCompiledCommandSurface::Update, SqlStatement::ShowIndexes(_)) => {
273 Err(QueryError::unsupported_query(
274 "execute_sql_update rejects SHOW INDEXES; use execute_sql_query::<E>()",
275 ))
276 }
277 (SqlCompiledCommandSurface::Update, SqlStatement::ShowColumns(_)) => {
278 Err(QueryError::unsupported_query(
279 "execute_sql_update rejects SHOW COLUMNS; use execute_sql_query::<E>()",
280 ))
281 }
282 (SqlCompiledCommandSurface::Update, SqlStatement::ShowEntities(_)) => {
283 Err(QueryError::unsupported_query(
284 "execute_sql_update rejects SHOW ENTITIES; use execute_sql_query::<E>()",
285 ))
286 }
287 }
288 }
289
290 pub fn execute_sql_query<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
295 where
296 E: PersistedRow<Canister = C> + EntityValue,
297 {
298 let compiled = self.compile_sql_query::<E>(sql)?;
299
300 self.execute_compiled_sql_owned::<E>(compiled)
301 }
302
303 #[cfg(feature = "diagnostics")]
306 #[doc(hidden)]
307 pub fn execute_sql_query_with_attribution<E>(
308 &self,
309 sql: &str,
310 ) -> Result<(SqlStatementResult, SqlQueryExecutionAttribution), QueryError>
311 where
312 E: PersistedRow<Canister = C> + EntityValue,
313 {
314 let (compile_local_instructions, compiled) =
317 measure_sql_stage(|| self.compile_sql_query_with_cache_attribution::<E>(sql));
318 let (compiled, compile_cache_attribution, compile_phase_attribution) = compiled?;
319
320 let store_get_calls_before = DataStore::current_get_call_count();
323 let pure_covering_decode_before = current_pure_covering_decode_local_instructions();
324 let pure_covering_row_assembly_before =
325 current_pure_covering_row_assembly_local_instructions();
326 let (result, execute_cache_attribution, execute_phase_attribution) =
327 self.execute_compiled_sql_with_phase_attribution::<E>(&compiled)?;
328 let store_get_calls =
329 DataStore::current_get_call_count().saturating_sub(store_get_calls_before);
330 let pure_covering_decode_local_instructions =
331 current_pure_covering_decode_local_instructions()
332 .saturating_sub(pure_covering_decode_before);
333 let pure_covering_row_assembly_local_instructions =
334 current_pure_covering_row_assembly_local_instructions()
335 .saturating_sub(pure_covering_row_assembly_before);
336 let execute_local_instructions = execute_phase_attribution
337 .planner_local_instructions
338 .saturating_add(execute_phase_attribution.store_local_instructions)
339 .saturating_add(execute_phase_attribution.executor_local_instructions)
340 .saturating_add(execute_phase_attribution.response_finalization_local_instructions);
341 let cache_attribution = compile_cache_attribution.merge(execute_cache_attribution);
342 let total_local_instructions =
343 compile_local_instructions.saturating_add(execute_local_instructions);
344 let grouped = matches!(&result, SqlStatementResult::Grouped { .. }).then_some(
345 GroupedExecutionAttribution {
346 stream_local_instructions: execute_phase_attribution
347 .grouped_stream_local_instructions,
348 fold_local_instructions: execute_phase_attribution.grouped_fold_local_instructions,
349 finalize_local_instructions: execute_phase_attribution
350 .grouped_finalize_local_instructions,
351 count: GroupedCountAttribution::from_executor(
352 execute_phase_attribution.grouped_count,
353 ),
354 },
355 );
356 let pure_covering = (pure_covering_decode_local_instructions > 0
357 || pure_covering_row_assembly_local_instructions > 0)
358 .then_some(SqlPureCoveringAttribution {
359 decode_local_instructions: pure_covering_decode_local_instructions,
360 row_assembly_local_instructions: pure_covering_row_assembly_local_instructions,
361 });
362
363 Ok((
364 result,
365 SqlQueryExecutionAttribution {
366 compile_local_instructions,
367 compile: SqlCompileAttribution {
368 cache_key_local_instructions: compile_phase_attribution.cache_key,
369 cache_lookup_local_instructions: compile_phase_attribution.cache_lookup,
370 parse_local_instructions: compile_phase_attribution.parse,
371 parse_tokenize_local_instructions: compile_phase_attribution.parse_tokenize,
372 parse_select_local_instructions: compile_phase_attribution.parse_select,
373 parse_expr_local_instructions: compile_phase_attribution.parse_expr,
374 parse_predicate_local_instructions: compile_phase_attribution.parse_predicate,
375 aggregate_lane_check_local_instructions: compile_phase_attribution
376 .aggregate_lane_check,
377 prepare_local_instructions: compile_phase_attribution.prepare,
378 lower_local_instructions: compile_phase_attribution.lower,
379 bind_local_instructions: compile_phase_attribution.bind,
380 cache_insert_local_instructions: compile_phase_attribution.cache_insert,
381 },
382 plan_lookup_local_instructions: execute_phase_attribution
383 .planner_local_instructions,
384 execution: SqlExecutionAttribution {
385 planner_local_instructions: execute_phase_attribution
386 .planner_local_instructions,
387 store_local_instructions: execute_phase_attribution.store_local_instructions,
388 executor_invocation_local_instructions: execute_phase_attribution
389 .executor_invocation_local_instructions,
390 executor_local_instructions: execute_phase_attribution
391 .executor_local_instructions,
392 response_finalization_local_instructions: execute_phase_attribution
393 .response_finalization_local_instructions,
394 },
395 grouped,
396 scalar_aggregate: SqlScalarAggregateAttribution::from_executor(
397 execute_phase_attribution.scalar_aggregate_terminal,
398 ),
399 pure_covering,
400 store_get_calls,
401 response_decode_local_instructions: 0,
402 execute_local_instructions,
403 total_local_instructions,
404 cache: SqlQueryCacheAttribution {
405 sql_compiled_command_hits: cache_attribution.sql_compiled_command_cache_hits,
406 sql_compiled_command_misses: cache_attribution
407 .sql_compiled_command_cache_misses,
408 shared_query_plan_hits: cache_attribution.shared_query_plan_cache_hits,
409 shared_query_plan_misses: cache_attribution.shared_query_plan_cache_misses,
410 },
411 },
412 ))
413 }
414
415 pub fn execute_sql_update<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
420 where
421 E: PersistedRow<Canister = C> + EntityValue,
422 {
423 let compiled = self.compile_sql_update::<E>(sql)?;
424
425 self.execute_compiled_sql_owned::<E>(compiled)
426 }
427
428 pub fn prepare_sql_ddl<E>(&self, sql: &str) -> Result<SqlDdlPreparationReport, QueryError>
435 where
436 E: PersistedRow<Canister = C> + EntityValue,
437 {
438 let (_, prepared) = self.prepare_sql_ddl_command::<E>(sql)?;
439
440 Ok(prepared.report().clone())
441 }
442
443 fn prepare_sql_ddl_command<E>(
444 &self,
445 sql: &str,
446 ) -> Result<(AcceptedSchemaSnapshot, PreparedSqlDdlCommand), QueryError>
447 where
448 E: PersistedRow<Canister = C> + EntityValue,
449 {
450 let (statement, _) =
451 parse_sql_with_attribution(sql).map_err(QueryError::from_sql_parse_error)?;
452 let (accepted_schema, _) = self
453 .accepted_entity_authority::<E>()
454 .map_err(QueryError::execute)?;
455 let schema_info = SchemaInfo::from_accepted_snapshot_for_model_with_expression_indexes(
456 E::MODEL,
457 &accepted_schema,
458 true,
459 );
460 let prepared = match prepare_sql_ddl_statement(
461 &statement,
462 &accepted_schema,
463 &schema_info,
464 E::Store::PATH,
465 ) {
466 Ok(prepared) => prepared,
467 Err(err) => {
468 return Err(QueryError::unsupported_query(format!(
469 "SQL DDL preparation failed before execution: {err}"
470 )));
471 }
472 };
473
474 Ok((accepted_schema, prepared))
475 }
476
477 pub fn execute_sql_ddl<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
482 where
483 E: PersistedRow<Canister = C> + EntityValue,
484 {
485 let (accepted_before, prepared) = self.prepare_sql_ddl_command::<E>(sql)?;
486 if !prepared.mutates_schema() {
487 return Ok(SqlStatementResult::Ddl(
488 prepared
489 .report()
490 .clone()
491 .with_execution_status(SqlDdlExecutionStatus::NoOp),
492 ));
493 }
494
495 let Some(derivation) = prepared.derivation() else {
496 return Err(QueryError::unsupported_query(
497 "SQL DDL execution could not find a prepared schema derivation".to_string(),
498 ));
499 };
500 let store = self
501 .db
502 .recovered_store(E::Store::PATH)
503 .map_err(QueryError::execute)?;
504
505 let (rows_scanned, index_keys_written) = match prepared.bound().statement() {
506 crate::db::sql::ddl::BoundSqlDdlStatement::AddColumn(_) => {
507 execute_sql_ddl_field_addition(
508 store,
509 E::ENTITY_TAG,
510 E::PATH,
511 &accepted_before,
512 derivation,
513 )
514 .map_err(QueryError::execute)?;
515
516 (0, 0)
517 }
518 crate::db::sql::ddl::BoundSqlDdlStatement::AlterColumnDefault(_) => {
519 execute_sql_ddl_field_default_change(
520 store,
521 E::ENTITY_TAG,
522 E::PATH,
523 &accepted_before,
524 derivation,
525 )
526 .map_err(QueryError::execute)?;
527
528 (0, 0)
529 }
530 crate::db::sql::ddl::BoundSqlDdlStatement::AlterColumnNullability(_) => {
531 let rows_scanned = execute_sql_ddl_field_nullability_change(
532 store,
533 E::ENTITY_TAG,
534 E::PATH,
535 &accepted_before,
536 derivation,
537 )
538 .map_err(QueryError::execute)?;
539
540 (rows_scanned, 0)
541 }
542 crate::db::sql::ddl::BoundSqlDdlStatement::CreateIndex(create)
543 if create.candidate_index().key().is_field_path_only() =>
544 {
545 execute_sql_ddl_field_path_index_addition(
546 store,
547 E::ENTITY_TAG,
548 E::PATH,
549 &accepted_before,
550 derivation,
551 )
552 .map_err(QueryError::execute)?
553 }
554 crate::db::sql::ddl::BoundSqlDdlStatement::CreateIndex(_) => {
555 execute_sql_ddl_expression_index_addition(
556 store,
557 E::ENTITY_TAG,
558 E::PATH,
559 &accepted_before,
560 derivation,
561 )
562 .map_err(QueryError::execute)?
563 }
564 crate::db::sql::ddl::BoundSqlDdlStatement::DropIndex(_) => {
565 execute_sql_ddl_secondary_index_drop(
566 store,
567 E::ENTITY_TAG,
568 E::PATH,
569 &accepted_before,
570 derivation,
571 )
572 .map_err(QueryError::execute)?;
573
574 (0, 0)
575 }
576 crate::db::sql::ddl::BoundSqlDdlStatement::NoOp(_) => (0, 0),
577 };
578
579 Ok(SqlStatementResult::Ddl(
580 prepared
581 .report()
582 .clone()
583 .with_execution_status(SqlDdlExecutionStatus::Published)
584 .with_execution_metrics(rows_scanned, index_keys_written),
585 ))
586 }
587}