1mod attribution;
8mod cache;
9mod compile;
10mod compile_cache;
11mod compiled;
12mod execute;
13mod projection;
14mod result;
15
16#[cfg(feature = "diagnostics")]
17use crate::db::DataStore;
18#[cfg(feature = "diagnostics")]
19use crate::db::executor::{
20 current_pure_covering_decode_local_instructions,
21 current_pure_covering_row_assembly_local_instructions,
22};
23#[cfg(test)]
24use crate::db::sql::parser::parse_sql;
25#[cfg(feature = "diagnostics")]
26use crate::db::{GroupedCountAttribution, GroupedExecutionAttribution};
27use crate::{
28 db::{
29 DbSession, PersistedRow, QueryError,
30 executor::{EntityAuthority, SharedPreparedExecutionPlan},
31 query::intent::StructuralQuery,
32 schema::{AcceptedSchemaSnapshot, SchemaInfo},
33 schema::{
34 execute_sql_ddl_expression_index_addition, execute_sql_ddl_field_addition,
35 execute_sql_ddl_field_default_change, execute_sql_ddl_field_nullability_change,
36 execute_sql_ddl_field_path_index_addition, execute_sql_ddl_secondary_index_drop,
37 },
38 session::query::QueryPlanCacheAttribution,
39 session::sql::projection::{
40 projection_fixed_scales_from_projection_spec, projection_labels_from_projection_spec,
41 },
42 sql::{
43 ddl::{PreparedSqlDdlCommand, prepare_sql_ddl_statement},
44 parser::{SqlDdlStatement, SqlExplainTarget, SqlStatement, parse_sql_with_attribution},
45 },
46 },
47 traits::{CanisterKind, EntityValue, Path},
48};
49
50pub(in crate::db::session::sql) use crate::db::diagnostics::measure_local_instruction_delta as measure_sql_stage;
51pub use crate::db::sql::ddl::{SqlDdlExecutionStatus, SqlDdlMutationKind, SqlDdlPreparationReport};
52#[cfg(feature = "diagnostics")]
53pub(in crate::db) use attribution::SqlExecutePhaseAttribution;
54#[cfg(feature = "diagnostics")]
55pub use attribution::{
56 SqlCompileAttribution, SqlExecutionAttribution, SqlPureCoveringAttribution,
57 SqlQueryCacheAttribution, SqlQueryExecutionAttribution, SqlScalarAggregateAttribution,
58};
59pub(in crate::db) use cache::{SqlCacheAttribution, SqlCompiledCommandCacheKey};
60pub(in crate::db::session::sql) use cache::{
61 SqlCompiledCommandSurface, sql_compiled_command_cache_miss_reason,
62};
63pub(in crate::db::session::sql) use compile::{
64 SqlCompileAttributionBuilder, SqlCompilePhaseAttribution,
65};
66pub(in crate::db) use compiled::{CompiledSqlCommand, SqlProjectionContract};
67pub use result::SqlStatementResult;
68
69#[cfg(all(test, not(feature = "diagnostics")))]
70pub(crate) use crate::db::session::sql::projection::with_sql_projection_materialization_metrics;
71#[cfg(feature = "diagnostics")]
72pub use crate::db::session::sql::projection::{
73 SqlProjectionMaterializationMetrics, with_sql_projection_materialization_metrics,
74};
75
76#[cfg(test)]
79pub(in crate::db) fn parse_sql_statement(sql: &str) -> Result<SqlStatement, QueryError> {
80 parse_sql(sql).map_err(QueryError::from_sql_parse_error)
81}
82
83#[doc(hidden)]
88pub fn sql_statement_entity_name(sql: &str) -> Result<Option<String>, QueryError> {
89 let (statement, _) =
90 parse_sql_with_attribution(sql).map_err(QueryError::from_sql_parse_error)?;
91
92 Ok(sql_statement_entity_name_from_statement(&statement).map(str::to_string))
93}
94
95const fn sql_statement_entity_name_from_statement(statement: &SqlStatement) -> Option<&str> {
96 match statement {
97 SqlStatement::Select(statement) => Some(statement.entity.as_str()),
98 SqlStatement::Delete(statement) => Some(statement.entity.as_str()),
99 SqlStatement::Insert(statement) => Some(statement.entity.as_str()),
100 SqlStatement::Update(statement) => Some(statement.entity.as_str()),
101 SqlStatement::Ddl(SqlDdlStatement::CreateIndex(statement)) => {
102 Some(statement.entity.as_str())
103 }
104 SqlStatement::Ddl(SqlDdlStatement::DropIndex(statement)) => match &statement.entity {
105 Some(entity) => Some(entity.as_str()),
106 None => None,
107 },
108 SqlStatement::Ddl(SqlDdlStatement::AlterTableAddColumn(statement)) => {
109 Some(statement.entity.as_str())
110 }
111 SqlStatement::Ddl(SqlDdlStatement::AlterTableAlterColumn(statement)) => {
112 Some(statement.entity.as_str())
113 }
114 SqlStatement::Ddl(SqlDdlStatement::AlterTableDropColumn(statement)) => {
115 Some(statement.entity.as_str())
116 }
117 SqlStatement::Explain(statement) => match &statement.statement {
118 SqlExplainTarget::Select(statement) => Some(statement.entity.as_str()),
119 SqlExplainTarget::Delete(statement) => Some(statement.entity.as_str()),
120 },
121 SqlStatement::Describe(statement) => Some(statement.entity.as_str()),
122 SqlStatement::ShowIndexes(statement) => Some(statement.entity.as_str()),
123 SqlStatement::ShowColumns(statement) => Some(statement.entity.as_str()),
124 SqlStatement::ShowEntities(_) => None,
125 }
126}
127
128fn measured<T>(stage: impl FnOnce() -> Result<T, QueryError>) -> Result<(u64, T), QueryError> {
132 let (local_instructions, result) = measure_sql_stage(stage);
133 let value = result?;
134
135 Ok((local_instructions, value))
136}
137
138impl<C: CanisterKind> DbSession<C> {
139 fn sql_select_prepared_plan_for_accepted_authority(
143 &self,
144 query: &StructuralQuery,
145 authority: EntityAuthority,
146 accepted_schema: &AcceptedSchemaSnapshot,
147 ) -> Result<
148 (
149 SharedPreparedExecutionPlan,
150 SqlProjectionContract,
151 SqlCacheAttribution,
152 ),
153 QueryError,
154 > {
155 let (prepared_plan, cache_attribution) = self
156 .cached_shared_query_plan_for_accepted_authority(
157 authority.clone(),
158 accepted_schema,
159 query,
160 )?;
161 Ok(Self::sql_select_projection_from_prepared_plan(
162 prepared_plan,
163 authority,
164 cache_attribution,
165 ))
166 }
167
168 fn sql_select_prepared_plan_for_entity<E>(
171 &self,
172 query: &StructuralQuery,
173 ) -> Result<
174 (
175 SharedPreparedExecutionPlan,
176 SqlProjectionContract,
177 SqlCacheAttribution,
178 ),
179 QueryError,
180 >
181 where
182 E: PersistedRow<Canister = C> + EntityValue,
183 {
184 let (accepted_schema, authority) = self
185 .accepted_entity_authority::<E>()
186 .map_err(QueryError::execute)?;
187
188 self.sql_select_prepared_plan_for_accepted_authority(query, authority, &accepted_schema)
189 }
190
191 fn sql_select_projection_from_prepared_plan(
192 prepared_plan: SharedPreparedExecutionPlan,
193 authority: EntityAuthority,
194 cache_attribution: QueryPlanCacheAttribution,
195 ) -> (
196 SharedPreparedExecutionPlan,
197 SqlProjectionContract,
198 SqlCacheAttribution,
199 ) {
200 let projection_spec = prepared_plan
201 .logical_plan()
202 .projection_spec(authority.model());
203 let projection = SqlProjectionContract::new(
204 projection_labels_from_projection_spec(&projection_spec),
205 projection_fixed_scales_from_projection_spec(&projection_spec),
206 );
207
208 (
209 prepared_plan,
210 projection,
211 SqlCacheAttribution::from_shared_query_plan_cache(cache_attribution),
212 )
213 }
214
215 fn ensure_sql_statement_supported_for_surface(
219 statement: &SqlStatement,
220 surface: SqlCompiledCommandSurface,
221 ) -> Result<(), QueryError> {
222 match (surface, statement) {
223 (
224 SqlCompiledCommandSurface::Query,
225 SqlStatement::Select(_)
226 | SqlStatement::Explain(_)
227 | SqlStatement::Describe(_)
228 | SqlStatement::ShowIndexes(_)
229 | SqlStatement::ShowColumns(_)
230 | SqlStatement::ShowEntities(_),
231 )
232 | (
233 SqlCompiledCommandSurface::Update,
234 SqlStatement::Insert(_) | SqlStatement::Update(_) | SqlStatement::Delete(_),
235 ) => Ok(()),
236 (_, SqlStatement::Ddl(_)) => Err(QueryError::unsupported_query(
237 "SQL DDL execution is not supported in this release",
238 )),
239 (SqlCompiledCommandSurface::Query, SqlStatement::Insert(_)) => {
240 Err(QueryError::unsupported_query(
241 "execute_sql_query rejects INSERT; use execute_sql_update::<E>()",
242 ))
243 }
244 (SqlCompiledCommandSurface::Query, SqlStatement::Update(_)) => {
245 Err(QueryError::unsupported_query(
246 "execute_sql_query rejects UPDATE; use execute_sql_update::<E>()",
247 ))
248 }
249 (SqlCompiledCommandSurface::Query, SqlStatement::Delete(_)) => {
250 Err(QueryError::unsupported_query(
251 "execute_sql_query rejects DELETE; use execute_sql_update::<E>()",
252 ))
253 }
254 (SqlCompiledCommandSurface::Update, SqlStatement::Select(_)) => {
255 Err(QueryError::unsupported_query(
256 "execute_sql_update rejects SELECT; use execute_sql_query::<E>()",
257 ))
258 }
259 (SqlCompiledCommandSurface::Update, SqlStatement::Explain(_)) => {
260 Err(QueryError::unsupported_query(
261 "execute_sql_update rejects EXPLAIN; use execute_sql_query::<E>()",
262 ))
263 }
264 (SqlCompiledCommandSurface::Update, SqlStatement::Describe(_)) => {
265 Err(QueryError::unsupported_query(
266 "execute_sql_update rejects DESCRIBE; use execute_sql_query::<E>()",
267 ))
268 }
269 (SqlCompiledCommandSurface::Update, SqlStatement::ShowIndexes(_)) => {
270 Err(QueryError::unsupported_query(
271 "execute_sql_update rejects SHOW INDEXES; use execute_sql_query::<E>()",
272 ))
273 }
274 (SqlCompiledCommandSurface::Update, SqlStatement::ShowColumns(_)) => {
275 Err(QueryError::unsupported_query(
276 "execute_sql_update rejects SHOW COLUMNS; use execute_sql_query::<E>()",
277 ))
278 }
279 (SqlCompiledCommandSurface::Update, SqlStatement::ShowEntities(_)) => {
280 Err(QueryError::unsupported_query(
281 "execute_sql_update rejects SHOW ENTITIES; use execute_sql_query::<E>()",
282 ))
283 }
284 }
285 }
286
287 pub fn execute_sql_query<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
292 where
293 E: PersistedRow<Canister = C> + EntityValue,
294 {
295 let compiled = self.compile_sql_query::<E>(sql)?;
296
297 self.execute_compiled_sql_owned::<E>(compiled)
298 }
299
300 #[cfg(feature = "diagnostics")]
303 #[doc(hidden)]
304 pub fn execute_sql_query_with_attribution<E>(
305 &self,
306 sql: &str,
307 ) -> Result<(SqlStatementResult, SqlQueryExecutionAttribution), QueryError>
308 where
309 E: PersistedRow<Canister = C> + EntityValue,
310 {
311 let (compile_local_instructions, compiled) =
314 measure_sql_stage(|| self.compile_sql_query_with_cache_attribution::<E>(sql));
315 let (compiled, compile_cache_attribution, compile_phase_attribution) = compiled?;
316
317 let store_get_calls_before = DataStore::current_get_call_count();
320 let pure_covering_decode_before = current_pure_covering_decode_local_instructions();
321 let pure_covering_row_assembly_before =
322 current_pure_covering_row_assembly_local_instructions();
323 let (result, execute_cache_attribution, execute_phase_attribution) =
324 self.execute_compiled_sql_with_phase_attribution::<E>(&compiled)?;
325 let store_get_calls =
326 DataStore::current_get_call_count().saturating_sub(store_get_calls_before);
327 let pure_covering_decode_local_instructions =
328 current_pure_covering_decode_local_instructions()
329 .saturating_sub(pure_covering_decode_before);
330 let pure_covering_row_assembly_local_instructions =
331 current_pure_covering_row_assembly_local_instructions()
332 .saturating_sub(pure_covering_row_assembly_before);
333 let execute_local_instructions = execute_phase_attribution
334 .planner_local_instructions
335 .saturating_add(execute_phase_attribution.store_local_instructions)
336 .saturating_add(execute_phase_attribution.executor_local_instructions)
337 .saturating_add(execute_phase_attribution.response_finalization_local_instructions);
338 let cache_attribution = compile_cache_attribution.merge(execute_cache_attribution);
339 let total_local_instructions =
340 compile_local_instructions.saturating_add(execute_local_instructions);
341 let grouped = matches!(&result, SqlStatementResult::Grouped { .. }).then_some(
342 GroupedExecutionAttribution {
343 stream_local_instructions: execute_phase_attribution
344 .grouped_stream_local_instructions,
345 fold_local_instructions: execute_phase_attribution.grouped_fold_local_instructions,
346 finalize_local_instructions: execute_phase_attribution
347 .grouped_finalize_local_instructions,
348 count: GroupedCountAttribution::from_executor(
349 execute_phase_attribution.grouped_count,
350 ),
351 },
352 );
353 let pure_covering = (pure_covering_decode_local_instructions > 0
354 || pure_covering_row_assembly_local_instructions > 0)
355 .then_some(SqlPureCoveringAttribution {
356 decode_local_instructions: pure_covering_decode_local_instructions,
357 row_assembly_local_instructions: pure_covering_row_assembly_local_instructions,
358 });
359
360 Ok((
361 result,
362 SqlQueryExecutionAttribution {
363 compile_local_instructions,
364 compile: SqlCompileAttribution {
365 cache_key_local_instructions: compile_phase_attribution.cache_key,
366 cache_lookup_local_instructions: compile_phase_attribution.cache_lookup,
367 parse_local_instructions: compile_phase_attribution.parse,
368 parse_tokenize_local_instructions: compile_phase_attribution.parse_tokenize,
369 parse_select_local_instructions: compile_phase_attribution.parse_select,
370 parse_expr_local_instructions: compile_phase_attribution.parse_expr,
371 parse_predicate_local_instructions: compile_phase_attribution.parse_predicate,
372 aggregate_lane_check_local_instructions: compile_phase_attribution
373 .aggregate_lane_check,
374 prepare_local_instructions: compile_phase_attribution.prepare,
375 lower_local_instructions: compile_phase_attribution.lower,
376 bind_local_instructions: compile_phase_attribution.bind,
377 cache_insert_local_instructions: compile_phase_attribution.cache_insert,
378 },
379 plan_lookup_local_instructions: execute_phase_attribution
380 .planner_local_instructions,
381 execution: SqlExecutionAttribution {
382 planner_local_instructions: execute_phase_attribution
383 .planner_local_instructions,
384 store_local_instructions: execute_phase_attribution.store_local_instructions,
385 executor_invocation_local_instructions: execute_phase_attribution
386 .executor_invocation_local_instructions,
387 executor_local_instructions: execute_phase_attribution
388 .executor_local_instructions,
389 response_finalization_local_instructions: execute_phase_attribution
390 .response_finalization_local_instructions,
391 },
392 grouped,
393 scalar_aggregate: SqlScalarAggregateAttribution::from_executor(
394 execute_phase_attribution.scalar_aggregate_terminal,
395 ),
396 pure_covering,
397 store_get_calls,
398 response_decode_local_instructions: 0,
399 execute_local_instructions,
400 total_local_instructions,
401 cache: SqlQueryCacheAttribution {
402 sql_compiled_command_hits: cache_attribution.sql_compiled_command_cache_hits,
403 sql_compiled_command_misses: cache_attribution
404 .sql_compiled_command_cache_misses,
405 shared_query_plan_hits: cache_attribution.shared_query_plan_cache_hits,
406 shared_query_plan_misses: cache_attribution.shared_query_plan_cache_misses,
407 },
408 },
409 ))
410 }
411
412 pub fn execute_sql_update<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
417 where
418 E: PersistedRow<Canister = C> + EntityValue,
419 {
420 let compiled = self.compile_sql_update::<E>(sql)?;
421
422 self.execute_compiled_sql_owned::<E>(compiled)
423 }
424
425 pub fn prepare_sql_ddl<E>(&self, sql: &str) -> Result<SqlDdlPreparationReport, QueryError>
432 where
433 E: PersistedRow<Canister = C> + EntityValue,
434 {
435 let (_, prepared) = self.prepare_sql_ddl_command::<E>(sql)?;
436
437 Ok(prepared.report().clone())
438 }
439
440 fn prepare_sql_ddl_command<E>(
441 &self,
442 sql: &str,
443 ) -> Result<(AcceptedSchemaSnapshot, PreparedSqlDdlCommand), QueryError>
444 where
445 E: PersistedRow<Canister = C> + EntityValue,
446 {
447 let (statement, _) =
448 parse_sql_with_attribution(sql).map_err(QueryError::from_sql_parse_error)?;
449 let (accepted_schema, _) = self
450 .accepted_entity_authority::<E>()
451 .map_err(QueryError::execute)?;
452 let schema_info = SchemaInfo::from_accepted_snapshot_for_model_with_expression_indexes(
453 E::MODEL,
454 &accepted_schema,
455 true,
456 );
457 let prepared = match prepare_sql_ddl_statement(
458 &statement,
459 &accepted_schema,
460 &schema_info,
461 E::Store::PATH,
462 ) {
463 Ok(prepared) => prepared,
464 Err(err) => {
465 return Err(QueryError::unsupported_query(format!(
466 "SQL DDL preparation failed before execution: {err}"
467 )));
468 }
469 };
470
471 Ok((accepted_schema, prepared))
472 }
473
474 pub fn execute_sql_ddl<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
479 where
480 E: PersistedRow<Canister = C> + EntityValue,
481 {
482 let (accepted_before, prepared) = self.prepare_sql_ddl_command::<E>(sql)?;
483 if !prepared.mutates_schema() {
484 return Ok(SqlStatementResult::Ddl(
485 prepared
486 .report()
487 .clone()
488 .with_execution_status(SqlDdlExecutionStatus::NoOp),
489 ));
490 }
491
492 let Some(derivation) = prepared.derivation() else {
493 return Err(QueryError::unsupported_query(
494 "SQL DDL execution could not find a prepared schema derivation".to_string(),
495 ));
496 };
497 let store = self
498 .db
499 .recovered_store(E::Store::PATH)
500 .map_err(QueryError::execute)?;
501
502 let (rows_scanned, index_keys_written) = match prepared.bound().statement() {
503 crate::db::sql::ddl::BoundSqlDdlStatement::AddColumn(_) => {
504 execute_sql_ddl_field_addition(
505 store,
506 E::ENTITY_TAG,
507 E::PATH,
508 &accepted_before,
509 derivation,
510 )
511 .map_err(QueryError::execute)?;
512
513 (0, 0)
514 }
515 crate::db::sql::ddl::BoundSqlDdlStatement::AlterColumnDefault(_) => {
516 execute_sql_ddl_field_default_change(
517 store,
518 E::ENTITY_TAG,
519 E::PATH,
520 &accepted_before,
521 derivation,
522 )
523 .map_err(QueryError::execute)?;
524
525 (0, 0)
526 }
527 crate::db::sql::ddl::BoundSqlDdlStatement::AlterColumnNullability(_) => {
528 let rows_scanned = execute_sql_ddl_field_nullability_change(
529 store,
530 E::ENTITY_TAG,
531 E::PATH,
532 &accepted_before,
533 derivation,
534 )
535 .map_err(QueryError::execute)?;
536
537 (rows_scanned, 0)
538 }
539 crate::db::sql::ddl::BoundSqlDdlStatement::CreateIndex(create)
540 if create.candidate_index().key().is_field_path_only() =>
541 {
542 execute_sql_ddl_field_path_index_addition(
543 store,
544 E::ENTITY_TAG,
545 E::PATH,
546 &accepted_before,
547 derivation,
548 )
549 .map_err(QueryError::execute)?
550 }
551 crate::db::sql::ddl::BoundSqlDdlStatement::CreateIndex(_) => {
552 execute_sql_ddl_expression_index_addition(
553 store,
554 E::ENTITY_TAG,
555 E::PATH,
556 &accepted_before,
557 derivation,
558 )
559 .map_err(QueryError::execute)?
560 }
561 crate::db::sql::ddl::BoundSqlDdlStatement::DropIndex(_) => {
562 execute_sql_ddl_secondary_index_drop(
563 store,
564 E::ENTITY_TAG,
565 E::PATH,
566 &accepted_before,
567 derivation,
568 )
569 .map_err(QueryError::execute)?;
570
571 (0, 0)
572 }
573 crate::db::sql::ddl::BoundSqlDdlStatement::NoOp(_) => (0, 0),
574 };
575
576 Ok(SqlStatementResult::Ddl(
577 prepared
578 .report()
579 .clone()
580 .with_execution_status(SqlDdlExecutionStatus::Published)
581 .with_execution_metrics(rows_scanned, index_keys_written),
582 ))
583 }
584}