1mod attribution;
8mod cache;
9mod compile;
10mod compile_cache;
11mod compiled;
12mod execute;
13mod projection;
14mod result;
15
16#[cfg(feature = "diagnostics")]
17use crate::db::DataStore;
18#[cfg(feature = "diagnostics")]
19use crate::db::executor::{
20 current_pure_covering_decode_local_instructions,
21 current_pure_covering_row_assembly_local_instructions,
22};
23#[cfg(test)]
24use crate::db::sql::parser::parse_sql;
25#[cfg(feature = "diagnostics")]
26use crate::db::{GroupedCountAttribution, GroupedExecutionAttribution};
27use crate::{
28 db::{
29 DbSession, PersistedRow, QueryError,
30 executor::{EntityAuthority, SharedPreparedExecutionPlan},
31 query::intent::StructuralQuery,
32 schema::{AcceptedSchemaSnapshot, SchemaInfo},
33 schema::{
34 execute_sql_ddl_expression_index_addition, execute_sql_ddl_field_addition,
35 execute_sql_ddl_field_default_change, execute_sql_ddl_field_nullability_change,
36 execute_sql_ddl_field_path_index_addition, execute_sql_ddl_secondary_index_drop,
37 },
38 session::query::QueryPlanCacheAttribution,
39 session::sql::projection::{
40 projection_fixed_scales_from_projection_spec, projection_labels_from_projection_spec,
41 },
42 sql::{
43 ddl::{PreparedSqlDdlCommand, prepare_sql_ddl_statement},
44 parser::{SqlDdlStatement, SqlExplainTarget, SqlStatement, parse_sql_with_attribution},
45 },
46 },
47 traits::{CanisterKind, EntityValue, Path},
48};
49
50pub(in crate::db::session::sql) use crate::db::diagnostics::measure_local_instruction_delta as measure_sql_stage;
51pub use crate::db::sql::ddl::{SqlDdlExecutionStatus, SqlDdlMutationKind, SqlDdlPreparationReport};
52#[cfg(feature = "diagnostics")]
53pub(in crate::db) use attribution::SqlExecutePhaseAttribution;
54#[cfg(feature = "diagnostics")]
55pub use attribution::{
56 SqlCompileAttribution, SqlExecutionAttribution, SqlPureCoveringAttribution,
57 SqlQueryCacheAttribution, SqlQueryExecutionAttribution, SqlScalarAggregateAttribution,
58};
59pub(in crate::db) use cache::{SqlCacheAttribution, SqlCompiledCommandCacheKey};
60pub(in crate::db::session::sql) use cache::{
61 SqlCompiledCommandSurface, sql_compiled_command_cache_miss_reason,
62};
63pub(in crate::db::session::sql) use compile::{
64 SqlCompileAttributionBuilder, SqlCompilePhaseAttribution,
65};
66pub(in crate::db) use compiled::{CompiledSqlCommand, SqlProjectionContract};
67pub use result::SqlStatementResult;
68
69#[cfg(all(test, not(feature = "diagnostics")))]
70pub(crate) use crate::db::session::sql::projection::with_sql_projection_materialization_metrics;
71#[cfg(feature = "diagnostics")]
72pub use crate::db::session::sql::projection::{
73 SqlProjectionMaterializationMetrics, with_sql_projection_materialization_metrics,
74};
75
76#[cfg(test)]
79pub(in crate::db) fn parse_sql_statement(sql: &str) -> Result<SqlStatement, QueryError> {
80 parse_sql(sql).map_err(QueryError::from_sql_parse_error)
81}
82
83#[doc(hidden)]
88pub fn sql_statement_entity_name(sql: &str) -> Result<Option<String>, QueryError> {
89 let (statement, _) =
90 parse_sql_with_attribution(sql).map_err(QueryError::from_sql_parse_error)?;
91
92 Ok(sql_statement_entity_name_from_statement(&statement).map(str::to_string))
93}
94
95const fn sql_statement_entity_name_from_statement(statement: &SqlStatement) -> Option<&str> {
96 match statement {
97 SqlStatement::Select(statement) => Some(statement.entity.as_str()),
98 SqlStatement::Delete(statement) => Some(statement.entity.as_str()),
99 SqlStatement::Insert(statement) => Some(statement.entity.as_str()),
100 SqlStatement::Update(statement) => Some(statement.entity.as_str()),
101 SqlStatement::Ddl(SqlDdlStatement::CreateIndex(statement)) => {
102 Some(statement.entity.as_str())
103 }
104 SqlStatement::Ddl(SqlDdlStatement::DropIndex(statement)) => match &statement.entity {
105 Some(entity) => Some(entity.as_str()),
106 None => None,
107 },
108 SqlStatement::Ddl(SqlDdlStatement::AlterTableAddColumn(statement)) => {
109 Some(statement.entity.as_str())
110 }
111 SqlStatement::Ddl(SqlDdlStatement::AlterTableAlterColumn(statement)) => {
112 Some(statement.entity.as_str())
113 }
114 SqlStatement::Explain(statement) => match &statement.statement {
115 SqlExplainTarget::Select(statement) => Some(statement.entity.as_str()),
116 SqlExplainTarget::Delete(statement) => Some(statement.entity.as_str()),
117 },
118 SqlStatement::Describe(statement) => Some(statement.entity.as_str()),
119 SqlStatement::ShowIndexes(statement) => Some(statement.entity.as_str()),
120 SqlStatement::ShowColumns(statement) => Some(statement.entity.as_str()),
121 SqlStatement::ShowEntities(_) => None,
122 }
123}
124
125fn measured<T>(stage: impl FnOnce() -> Result<T, QueryError>) -> Result<(u64, T), QueryError> {
129 let (local_instructions, result) = measure_sql_stage(stage);
130 let value = result?;
131
132 Ok((local_instructions, value))
133}
134
135impl<C: CanisterKind> DbSession<C> {
136 fn sql_select_prepared_plan_for_accepted_authority(
140 &self,
141 query: &StructuralQuery,
142 authority: EntityAuthority,
143 accepted_schema: &AcceptedSchemaSnapshot,
144 ) -> Result<
145 (
146 SharedPreparedExecutionPlan,
147 SqlProjectionContract,
148 SqlCacheAttribution,
149 ),
150 QueryError,
151 > {
152 let (prepared_plan, cache_attribution) = self
153 .cached_shared_query_plan_for_accepted_authority(
154 authority.clone(),
155 accepted_schema,
156 query,
157 )?;
158 Ok(Self::sql_select_projection_from_prepared_plan(
159 prepared_plan,
160 authority,
161 cache_attribution,
162 ))
163 }
164
165 fn sql_select_prepared_plan_for_entity<E>(
168 &self,
169 query: &StructuralQuery,
170 ) -> Result<
171 (
172 SharedPreparedExecutionPlan,
173 SqlProjectionContract,
174 SqlCacheAttribution,
175 ),
176 QueryError,
177 >
178 where
179 E: PersistedRow<Canister = C> + EntityValue,
180 {
181 let (accepted_schema, authority) = self
182 .accepted_entity_authority::<E>()
183 .map_err(QueryError::execute)?;
184
185 self.sql_select_prepared_plan_for_accepted_authority(query, authority, &accepted_schema)
186 }
187
188 fn sql_select_projection_from_prepared_plan(
189 prepared_plan: SharedPreparedExecutionPlan,
190 authority: EntityAuthority,
191 cache_attribution: QueryPlanCacheAttribution,
192 ) -> (
193 SharedPreparedExecutionPlan,
194 SqlProjectionContract,
195 SqlCacheAttribution,
196 ) {
197 let projection_spec = prepared_plan
198 .logical_plan()
199 .projection_spec(authority.model());
200 let projection = SqlProjectionContract::new(
201 projection_labels_from_projection_spec(&projection_spec),
202 projection_fixed_scales_from_projection_spec(&projection_spec),
203 );
204
205 (
206 prepared_plan,
207 projection,
208 SqlCacheAttribution::from_shared_query_plan_cache(cache_attribution),
209 )
210 }
211
212 fn ensure_sql_statement_supported_for_surface(
216 statement: &SqlStatement,
217 surface: SqlCompiledCommandSurface,
218 ) -> Result<(), QueryError> {
219 match (surface, statement) {
220 (
221 SqlCompiledCommandSurface::Query,
222 SqlStatement::Select(_)
223 | SqlStatement::Explain(_)
224 | SqlStatement::Describe(_)
225 | SqlStatement::ShowIndexes(_)
226 | SqlStatement::ShowColumns(_)
227 | SqlStatement::ShowEntities(_),
228 )
229 | (
230 SqlCompiledCommandSurface::Update,
231 SqlStatement::Insert(_) | SqlStatement::Update(_) | SqlStatement::Delete(_),
232 ) => Ok(()),
233 (_, SqlStatement::Ddl(_)) => Err(QueryError::unsupported_query(
234 "SQL DDL execution is not supported in this release",
235 )),
236 (SqlCompiledCommandSurface::Query, SqlStatement::Insert(_)) => {
237 Err(QueryError::unsupported_query(
238 "execute_sql_query rejects INSERT; use execute_sql_update::<E>()",
239 ))
240 }
241 (SqlCompiledCommandSurface::Query, SqlStatement::Update(_)) => {
242 Err(QueryError::unsupported_query(
243 "execute_sql_query rejects UPDATE; use execute_sql_update::<E>()",
244 ))
245 }
246 (SqlCompiledCommandSurface::Query, SqlStatement::Delete(_)) => {
247 Err(QueryError::unsupported_query(
248 "execute_sql_query rejects DELETE; use execute_sql_update::<E>()",
249 ))
250 }
251 (SqlCompiledCommandSurface::Update, SqlStatement::Select(_)) => {
252 Err(QueryError::unsupported_query(
253 "execute_sql_update rejects SELECT; use execute_sql_query::<E>()",
254 ))
255 }
256 (SqlCompiledCommandSurface::Update, SqlStatement::Explain(_)) => {
257 Err(QueryError::unsupported_query(
258 "execute_sql_update rejects EXPLAIN; use execute_sql_query::<E>()",
259 ))
260 }
261 (SqlCompiledCommandSurface::Update, SqlStatement::Describe(_)) => {
262 Err(QueryError::unsupported_query(
263 "execute_sql_update rejects DESCRIBE; use execute_sql_query::<E>()",
264 ))
265 }
266 (SqlCompiledCommandSurface::Update, SqlStatement::ShowIndexes(_)) => {
267 Err(QueryError::unsupported_query(
268 "execute_sql_update rejects SHOW INDEXES; use execute_sql_query::<E>()",
269 ))
270 }
271 (SqlCompiledCommandSurface::Update, SqlStatement::ShowColumns(_)) => {
272 Err(QueryError::unsupported_query(
273 "execute_sql_update rejects SHOW COLUMNS; use execute_sql_query::<E>()",
274 ))
275 }
276 (SqlCompiledCommandSurface::Update, SqlStatement::ShowEntities(_)) => {
277 Err(QueryError::unsupported_query(
278 "execute_sql_update rejects SHOW ENTITIES; use execute_sql_query::<E>()",
279 ))
280 }
281 }
282 }
283
284 pub fn execute_sql_query<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
289 where
290 E: PersistedRow<Canister = C> + EntityValue,
291 {
292 let compiled = self.compile_sql_query::<E>(sql)?;
293
294 self.execute_compiled_sql_owned::<E>(compiled)
295 }
296
297 #[cfg(feature = "diagnostics")]
300 #[doc(hidden)]
301 pub fn execute_sql_query_with_attribution<E>(
302 &self,
303 sql: &str,
304 ) -> Result<(SqlStatementResult, SqlQueryExecutionAttribution), QueryError>
305 where
306 E: PersistedRow<Canister = C> + EntityValue,
307 {
308 let (compile_local_instructions, compiled) =
311 measure_sql_stage(|| self.compile_sql_query_with_cache_attribution::<E>(sql));
312 let (compiled, compile_cache_attribution, compile_phase_attribution) = compiled?;
313
314 let store_get_calls_before = DataStore::current_get_call_count();
317 let pure_covering_decode_before = current_pure_covering_decode_local_instructions();
318 let pure_covering_row_assembly_before =
319 current_pure_covering_row_assembly_local_instructions();
320 let (result, execute_cache_attribution, execute_phase_attribution) =
321 self.execute_compiled_sql_with_phase_attribution::<E>(&compiled)?;
322 let store_get_calls =
323 DataStore::current_get_call_count().saturating_sub(store_get_calls_before);
324 let pure_covering_decode_local_instructions =
325 current_pure_covering_decode_local_instructions()
326 .saturating_sub(pure_covering_decode_before);
327 let pure_covering_row_assembly_local_instructions =
328 current_pure_covering_row_assembly_local_instructions()
329 .saturating_sub(pure_covering_row_assembly_before);
330 let execute_local_instructions = execute_phase_attribution
331 .planner_local_instructions
332 .saturating_add(execute_phase_attribution.store_local_instructions)
333 .saturating_add(execute_phase_attribution.executor_local_instructions)
334 .saturating_add(execute_phase_attribution.response_finalization_local_instructions);
335 let cache_attribution = compile_cache_attribution.merge(execute_cache_attribution);
336 let total_local_instructions =
337 compile_local_instructions.saturating_add(execute_local_instructions);
338 let grouped = matches!(&result, SqlStatementResult::Grouped { .. }).then_some(
339 GroupedExecutionAttribution {
340 stream_local_instructions: execute_phase_attribution
341 .grouped_stream_local_instructions,
342 fold_local_instructions: execute_phase_attribution.grouped_fold_local_instructions,
343 finalize_local_instructions: execute_phase_attribution
344 .grouped_finalize_local_instructions,
345 count: GroupedCountAttribution::from_executor(
346 execute_phase_attribution.grouped_count,
347 ),
348 },
349 );
350 let pure_covering = (pure_covering_decode_local_instructions > 0
351 || pure_covering_row_assembly_local_instructions > 0)
352 .then_some(SqlPureCoveringAttribution {
353 decode_local_instructions: pure_covering_decode_local_instructions,
354 row_assembly_local_instructions: pure_covering_row_assembly_local_instructions,
355 });
356
357 Ok((
358 result,
359 SqlQueryExecutionAttribution {
360 compile_local_instructions,
361 compile: SqlCompileAttribution {
362 cache_key_local_instructions: compile_phase_attribution.cache_key,
363 cache_lookup_local_instructions: compile_phase_attribution.cache_lookup,
364 parse_local_instructions: compile_phase_attribution.parse,
365 parse_tokenize_local_instructions: compile_phase_attribution.parse_tokenize,
366 parse_select_local_instructions: compile_phase_attribution.parse_select,
367 parse_expr_local_instructions: compile_phase_attribution.parse_expr,
368 parse_predicate_local_instructions: compile_phase_attribution.parse_predicate,
369 aggregate_lane_check_local_instructions: compile_phase_attribution
370 .aggregate_lane_check,
371 prepare_local_instructions: compile_phase_attribution.prepare,
372 lower_local_instructions: compile_phase_attribution.lower,
373 bind_local_instructions: compile_phase_attribution.bind,
374 cache_insert_local_instructions: compile_phase_attribution.cache_insert,
375 },
376 plan_lookup_local_instructions: execute_phase_attribution
377 .planner_local_instructions,
378 execution: SqlExecutionAttribution {
379 planner_local_instructions: execute_phase_attribution
380 .planner_local_instructions,
381 store_local_instructions: execute_phase_attribution.store_local_instructions,
382 executor_invocation_local_instructions: execute_phase_attribution
383 .executor_invocation_local_instructions,
384 executor_local_instructions: execute_phase_attribution
385 .executor_local_instructions,
386 response_finalization_local_instructions: execute_phase_attribution
387 .response_finalization_local_instructions,
388 },
389 grouped,
390 scalar_aggregate: SqlScalarAggregateAttribution::from_executor(
391 execute_phase_attribution.scalar_aggregate_terminal,
392 ),
393 pure_covering,
394 store_get_calls,
395 response_decode_local_instructions: 0,
396 execute_local_instructions,
397 total_local_instructions,
398 cache: SqlQueryCacheAttribution {
399 sql_compiled_command_hits: cache_attribution.sql_compiled_command_cache_hits,
400 sql_compiled_command_misses: cache_attribution
401 .sql_compiled_command_cache_misses,
402 shared_query_plan_hits: cache_attribution.shared_query_plan_cache_hits,
403 shared_query_plan_misses: cache_attribution.shared_query_plan_cache_misses,
404 },
405 },
406 ))
407 }
408
409 pub fn execute_sql_update<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
414 where
415 E: PersistedRow<Canister = C> + EntityValue,
416 {
417 let compiled = self.compile_sql_update::<E>(sql)?;
418
419 self.execute_compiled_sql_owned::<E>(compiled)
420 }
421
422 pub fn prepare_sql_ddl<E>(&self, sql: &str) -> Result<SqlDdlPreparationReport, QueryError>
429 where
430 E: PersistedRow<Canister = C> + EntityValue,
431 {
432 let (_, prepared) = self.prepare_sql_ddl_command::<E>(sql)?;
433
434 Ok(prepared.report().clone())
435 }
436
437 fn prepare_sql_ddl_command<E>(
438 &self,
439 sql: &str,
440 ) -> Result<(AcceptedSchemaSnapshot, PreparedSqlDdlCommand), QueryError>
441 where
442 E: PersistedRow<Canister = C> + EntityValue,
443 {
444 let (statement, _) =
445 parse_sql_with_attribution(sql).map_err(QueryError::from_sql_parse_error)?;
446 let (accepted_schema, _) = self
447 .accepted_entity_authority::<E>()
448 .map_err(QueryError::execute)?;
449 let schema_info = SchemaInfo::from_accepted_snapshot_for_model_with_expression_indexes(
450 E::MODEL,
451 &accepted_schema,
452 true,
453 );
454 let prepared = match prepare_sql_ddl_statement(
455 &statement,
456 &accepted_schema,
457 &schema_info,
458 E::Store::PATH,
459 ) {
460 Ok(prepared) => prepared,
461 Err(err) => {
462 return Err(QueryError::unsupported_query(format!(
463 "SQL DDL preparation failed before execution: {err}"
464 )));
465 }
466 };
467
468 Ok((accepted_schema, prepared))
469 }
470
471 pub fn execute_sql_ddl<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
476 where
477 E: PersistedRow<Canister = C> + EntityValue,
478 {
479 let (accepted_before, prepared) = self.prepare_sql_ddl_command::<E>(sql)?;
480 if !prepared.mutates_schema() {
481 return Ok(SqlStatementResult::Ddl(
482 prepared
483 .report()
484 .clone()
485 .with_execution_status(SqlDdlExecutionStatus::NoOp),
486 ));
487 }
488
489 let Some(derivation) = prepared.derivation() else {
490 return Err(QueryError::unsupported_query(
491 "SQL DDL execution could not find a prepared schema derivation".to_string(),
492 ));
493 };
494 let store = self
495 .db
496 .recovered_store(E::Store::PATH)
497 .map_err(QueryError::execute)?;
498
499 let (rows_scanned, index_keys_written) = match prepared.bound().statement() {
500 crate::db::sql::ddl::BoundSqlDdlStatement::AddColumn(_) => {
501 execute_sql_ddl_field_addition(
502 store,
503 E::ENTITY_TAG,
504 E::PATH,
505 &accepted_before,
506 derivation,
507 )
508 .map_err(QueryError::execute)?;
509
510 (0, 0)
511 }
512 crate::db::sql::ddl::BoundSqlDdlStatement::AlterColumnDefault(_) => {
513 execute_sql_ddl_field_default_change(
514 store,
515 E::ENTITY_TAG,
516 E::PATH,
517 &accepted_before,
518 derivation,
519 )
520 .map_err(QueryError::execute)?;
521
522 (0, 0)
523 }
524 crate::db::sql::ddl::BoundSqlDdlStatement::AlterColumnNullability(_) => {
525 let rows_scanned = execute_sql_ddl_field_nullability_change(
526 store,
527 E::ENTITY_TAG,
528 E::PATH,
529 &accepted_before,
530 derivation,
531 )
532 .map_err(QueryError::execute)?;
533
534 (rows_scanned, 0)
535 }
536 crate::db::sql::ddl::BoundSqlDdlStatement::CreateIndex(create)
537 if create.candidate_index().key().is_field_path_only() =>
538 {
539 execute_sql_ddl_field_path_index_addition(
540 store,
541 E::ENTITY_TAG,
542 E::PATH,
543 &accepted_before,
544 derivation,
545 )
546 .map_err(QueryError::execute)?
547 }
548 crate::db::sql::ddl::BoundSqlDdlStatement::CreateIndex(_) => {
549 execute_sql_ddl_expression_index_addition(
550 store,
551 E::ENTITY_TAG,
552 E::PATH,
553 &accepted_before,
554 derivation,
555 )
556 .map_err(QueryError::execute)?
557 }
558 crate::db::sql::ddl::BoundSqlDdlStatement::DropIndex(_) => {
559 execute_sql_ddl_secondary_index_drop(
560 store,
561 E::ENTITY_TAG,
562 E::PATH,
563 &accepted_before,
564 derivation,
565 )
566 .map_err(QueryError::execute)?;
567
568 (0, 0)
569 }
570 crate::db::sql::ddl::BoundSqlDdlStatement::NoOp(_) => (0, 0),
571 };
572
573 Ok(SqlStatementResult::Ddl(
574 prepared
575 .report()
576 .clone()
577 .with_execution_status(SqlDdlExecutionStatus::Published)
578 .with_execution_metrics(rows_scanned, index_keys_written),
579 ))
580 }
581}