1mod attribution;
8mod cache;
9mod compile;
10mod compile_cache;
11mod compiled;
12mod execute;
13mod projection;
14mod result;
15
16#[cfg(feature = "diagnostics")]
17use crate::db::DataStore;
18#[cfg(feature = "diagnostics")]
19use crate::db::executor::{
20 current_pure_covering_decode_local_instructions,
21 current_pure_covering_row_assembly_local_instructions,
22};
23#[cfg(test)]
24use crate::db::sql::parser::parse_sql;
25#[cfg(feature = "diagnostics")]
26use crate::db::{GroupedCountAttribution, GroupedExecutionAttribution};
27use crate::{
28 db::{
29 DbSession, PersistedRow, QueryError,
30 executor::{EntityAuthority, SharedPreparedExecutionPlan},
31 query::intent::StructuralQuery,
32 schema::{AcceptedSchemaSnapshot, SchemaInfo},
33 schema::{
34 execute_sql_ddl_expression_index_addition, execute_sql_ddl_field_addition,
35 execute_sql_ddl_field_path_index_addition, execute_sql_ddl_secondary_index_drop,
36 },
37 session::query::QueryPlanCacheAttribution,
38 session::sql::projection::{
39 projection_fixed_scales_from_projection_spec, projection_labels_from_projection_spec,
40 },
41 sql::{
42 ddl::{PreparedSqlDdlCommand, prepare_sql_ddl_statement},
43 parser::{SqlDdlStatement, SqlExplainTarget, SqlStatement, parse_sql_with_attribution},
44 },
45 },
46 traits::{CanisterKind, EntityValue, Path},
47};
48
49pub(in crate::db::session::sql) use crate::db::diagnostics::measure_local_instruction_delta as measure_sql_stage;
50pub use crate::db::sql::ddl::{SqlDdlExecutionStatus, SqlDdlMutationKind, SqlDdlPreparationReport};
51#[cfg(feature = "diagnostics")]
52pub(in crate::db) use attribution::SqlExecutePhaseAttribution;
53#[cfg(feature = "diagnostics")]
54pub use attribution::{
55 SqlCompileAttribution, SqlExecutionAttribution, SqlPureCoveringAttribution,
56 SqlQueryCacheAttribution, SqlQueryExecutionAttribution, SqlScalarAggregateAttribution,
57};
58pub(in crate::db) use cache::{SqlCacheAttribution, SqlCompiledCommandCacheKey};
59pub(in crate::db::session::sql) use cache::{
60 SqlCompiledCommandSurface, sql_compiled_command_cache_miss_reason,
61};
62pub(in crate::db::session::sql) use compile::{
63 SqlCompileAttributionBuilder, SqlCompilePhaseAttribution,
64};
65pub(in crate::db) use compiled::{CompiledSqlCommand, SqlProjectionContract};
66pub use result::SqlStatementResult;
67
68#[cfg(all(test, not(feature = "diagnostics")))]
69pub(crate) use crate::db::session::sql::projection::with_sql_projection_materialization_metrics;
70#[cfg(feature = "diagnostics")]
71pub use crate::db::session::sql::projection::{
72 SqlProjectionMaterializationMetrics, with_sql_projection_materialization_metrics,
73};
74
75#[cfg(test)]
78pub(in crate::db) fn parse_sql_statement(sql: &str) -> Result<SqlStatement, QueryError> {
79 parse_sql(sql).map_err(QueryError::from_sql_parse_error)
80}
81
82#[doc(hidden)]
87pub fn sql_statement_entity_name(sql: &str) -> Result<Option<String>, QueryError> {
88 let (statement, _) =
89 parse_sql_with_attribution(sql).map_err(QueryError::from_sql_parse_error)?;
90
91 Ok(sql_statement_entity_name_from_statement(&statement).map(str::to_string))
92}
93
94const fn sql_statement_entity_name_from_statement(statement: &SqlStatement) -> Option<&str> {
95 match statement {
96 SqlStatement::Select(statement) => Some(statement.entity.as_str()),
97 SqlStatement::Delete(statement) => Some(statement.entity.as_str()),
98 SqlStatement::Insert(statement) => Some(statement.entity.as_str()),
99 SqlStatement::Update(statement) => Some(statement.entity.as_str()),
100 SqlStatement::Ddl(SqlDdlStatement::CreateIndex(statement)) => {
101 Some(statement.entity.as_str())
102 }
103 SqlStatement::Ddl(SqlDdlStatement::DropIndex(statement)) => match &statement.entity {
104 Some(entity) => Some(entity.as_str()),
105 None => None,
106 },
107 SqlStatement::Ddl(SqlDdlStatement::AlterTableAddColumn(statement)) => {
108 Some(statement.entity.as_str())
109 }
110 SqlStatement::Ddl(SqlDdlStatement::AlterTableAlterColumn(statement)) => {
111 Some(statement.entity.as_str())
112 }
113 SqlStatement::Explain(statement) => match &statement.statement {
114 SqlExplainTarget::Select(statement) => Some(statement.entity.as_str()),
115 SqlExplainTarget::Delete(statement) => Some(statement.entity.as_str()),
116 },
117 SqlStatement::Describe(statement) => Some(statement.entity.as_str()),
118 SqlStatement::ShowIndexes(statement) => Some(statement.entity.as_str()),
119 SqlStatement::ShowColumns(statement) => Some(statement.entity.as_str()),
120 SqlStatement::ShowEntities(_) => None,
121 }
122}
123
124fn measured<T>(stage: impl FnOnce() -> Result<T, QueryError>) -> Result<(u64, T), QueryError> {
128 let (local_instructions, result) = measure_sql_stage(stage);
129 let value = result?;
130
131 Ok((local_instructions, value))
132}
133
134impl<C: CanisterKind> DbSession<C> {
135 fn sql_select_prepared_plan_for_accepted_authority(
139 &self,
140 query: &StructuralQuery,
141 authority: EntityAuthority,
142 accepted_schema: &AcceptedSchemaSnapshot,
143 ) -> Result<
144 (
145 SharedPreparedExecutionPlan,
146 SqlProjectionContract,
147 SqlCacheAttribution,
148 ),
149 QueryError,
150 > {
151 let (prepared_plan, cache_attribution) = self
152 .cached_shared_query_plan_for_accepted_authority(
153 authority.clone(),
154 accepted_schema,
155 query,
156 )?;
157 Ok(Self::sql_select_projection_from_prepared_plan(
158 prepared_plan,
159 authority,
160 cache_attribution,
161 ))
162 }
163
164 fn sql_select_prepared_plan_for_entity<E>(
167 &self,
168 query: &StructuralQuery,
169 ) -> Result<
170 (
171 SharedPreparedExecutionPlan,
172 SqlProjectionContract,
173 SqlCacheAttribution,
174 ),
175 QueryError,
176 >
177 where
178 E: PersistedRow<Canister = C> + EntityValue,
179 {
180 let (accepted_schema, authority) = self
181 .accepted_entity_authority::<E>()
182 .map_err(QueryError::execute)?;
183
184 self.sql_select_prepared_plan_for_accepted_authority(query, authority, &accepted_schema)
185 }
186
187 fn sql_select_projection_from_prepared_plan(
188 prepared_plan: SharedPreparedExecutionPlan,
189 authority: EntityAuthority,
190 cache_attribution: QueryPlanCacheAttribution,
191 ) -> (
192 SharedPreparedExecutionPlan,
193 SqlProjectionContract,
194 SqlCacheAttribution,
195 ) {
196 let projection_spec = prepared_plan
197 .logical_plan()
198 .projection_spec(authority.model());
199 let projection = SqlProjectionContract::new(
200 projection_labels_from_projection_spec(&projection_spec),
201 projection_fixed_scales_from_projection_spec(&projection_spec),
202 );
203
204 (
205 prepared_plan,
206 projection,
207 SqlCacheAttribution::from_shared_query_plan_cache(cache_attribution),
208 )
209 }
210
211 fn ensure_sql_statement_supported_for_surface(
215 statement: &SqlStatement,
216 surface: SqlCompiledCommandSurface,
217 ) -> Result<(), QueryError> {
218 match (surface, statement) {
219 (
220 SqlCompiledCommandSurface::Query,
221 SqlStatement::Select(_)
222 | SqlStatement::Explain(_)
223 | SqlStatement::Describe(_)
224 | SqlStatement::ShowIndexes(_)
225 | SqlStatement::ShowColumns(_)
226 | SqlStatement::ShowEntities(_),
227 )
228 | (
229 SqlCompiledCommandSurface::Update,
230 SqlStatement::Insert(_) | SqlStatement::Update(_) | SqlStatement::Delete(_),
231 ) => Ok(()),
232 (_, SqlStatement::Ddl(_)) => Err(QueryError::unsupported_query(
233 "SQL DDL execution is not supported in this release",
234 )),
235 (SqlCompiledCommandSurface::Query, SqlStatement::Insert(_)) => {
236 Err(QueryError::unsupported_query(
237 "execute_sql_query rejects INSERT; use execute_sql_update::<E>()",
238 ))
239 }
240 (SqlCompiledCommandSurface::Query, SqlStatement::Update(_)) => {
241 Err(QueryError::unsupported_query(
242 "execute_sql_query rejects UPDATE; use execute_sql_update::<E>()",
243 ))
244 }
245 (SqlCompiledCommandSurface::Query, SqlStatement::Delete(_)) => {
246 Err(QueryError::unsupported_query(
247 "execute_sql_query rejects DELETE; use execute_sql_update::<E>()",
248 ))
249 }
250 (SqlCompiledCommandSurface::Update, SqlStatement::Select(_)) => {
251 Err(QueryError::unsupported_query(
252 "execute_sql_update rejects SELECT; use execute_sql_query::<E>()",
253 ))
254 }
255 (SqlCompiledCommandSurface::Update, SqlStatement::Explain(_)) => {
256 Err(QueryError::unsupported_query(
257 "execute_sql_update rejects EXPLAIN; use execute_sql_query::<E>()",
258 ))
259 }
260 (SqlCompiledCommandSurface::Update, SqlStatement::Describe(_)) => {
261 Err(QueryError::unsupported_query(
262 "execute_sql_update rejects DESCRIBE; use execute_sql_query::<E>()",
263 ))
264 }
265 (SqlCompiledCommandSurface::Update, SqlStatement::ShowIndexes(_)) => {
266 Err(QueryError::unsupported_query(
267 "execute_sql_update rejects SHOW INDEXES; use execute_sql_query::<E>()",
268 ))
269 }
270 (SqlCompiledCommandSurface::Update, SqlStatement::ShowColumns(_)) => {
271 Err(QueryError::unsupported_query(
272 "execute_sql_update rejects SHOW COLUMNS; use execute_sql_query::<E>()",
273 ))
274 }
275 (SqlCompiledCommandSurface::Update, SqlStatement::ShowEntities(_)) => {
276 Err(QueryError::unsupported_query(
277 "execute_sql_update rejects SHOW ENTITIES; use execute_sql_query::<E>()",
278 ))
279 }
280 }
281 }
282
283 pub fn execute_sql_query<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
288 where
289 E: PersistedRow<Canister = C> + EntityValue,
290 {
291 let compiled = self.compile_sql_query::<E>(sql)?;
292
293 self.execute_compiled_sql_owned::<E>(compiled)
294 }
295
296 #[cfg(feature = "diagnostics")]
299 #[doc(hidden)]
300 pub fn execute_sql_query_with_attribution<E>(
301 &self,
302 sql: &str,
303 ) -> Result<(SqlStatementResult, SqlQueryExecutionAttribution), QueryError>
304 where
305 E: PersistedRow<Canister = C> + EntityValue,
306 {
307 let (compile_local_instructions, compiled) =
310 measure_sql_stage(|| self.compile_sql_query_with_cache_attribution::<E>(sql));
311 let (compiled, compile_cache_attribution, compile_phase_attribution) = compiled?;
312
313 let store_get_calls_before = DataStore::current_get_call_count();
316 let pure_covering_decode_before = current_pure_covering_decode_local_instructions();
317 let pure_covering_row_assembly_before =
318 current_pure_covering_row_assembly_local_instructions();
319 let (result, execute_cache_attribution, execute_phase_attribution) =
320 self.execute_compiled_sql_with_phase_attribution::<E>(&compiled)?;
321 let store_get_calls =
322 DataStore::current_get_call_count().saturating_sub(store_get_calls_before);
323 let pure_covering_decode_local_instructions =
324 current_pure_covering_decode_local_instructions()
325 .saturating_sub(pure_covering_decode_before);
326 let pure_covering_row_assembly_local_instructions =
327 current_pure_covering_row_assembly_local_instructions()
328 .saturating_sub(pure_covering_row_assembly_before);
329 let execute_local_instructions = execute_phase_attribution
330 .planner_local_instructions
331 .saturating_add(execute_phase_attribution.store_local_instructions)
332 .saturating_add(execute_phase_attribution.executor_local_instructions)
333 .saturating_add(execute_phase_attribution.response_finalization_local_instructions);
334 let cache_attribution = compile_cache_attribution.merge(execute_cache_attribution);
335 let total_local_instructions =
336 compile_local_instructions.saturating_add(execute_local_instructions);
337 let grouped = matches!(&result, SqlStatementResult::Grouped { .. }).then_some(
338 GroupedExecutionAttribution {
339 stream_local_instructions: execute_phase_attribution
340 .grouped_stream_local_instructions,
341 fold_local_instructions: execute_phase_attribution.grouped_fold_local_instructions,
342 finalize_local_instructions: execute_phase_attribution
343 .grouped_finalize_local_instructions,
344 count: GroupedCountAttribution::from_executor(
345 execute_phase_attribution.grouped_count,
346 ),
347 },
348 );
349 let pure_covering = (pure_covering_decode_local_instructions > 0
350 || pure_covering_row_assembly_local_instructions > 0)
351 .then_some(SqlPureCoveringAttribution {
352 decode_local_instructions: pure_covering_decode_local_instructions,
353 row_assembly_local_instructions: pure_covering_row_assembly_local_instructions,
354 });
355
356 Ok((
357 result,
358 SqlQueryExecutionAttribution {
359 compile_local_instructions,
360 compile: SqlCompileAttribution {
361 cache_key_local_instructions: compile_phase_attribution.cache_key,
362 cache_lookup_local_instructions: compile_phase_attribution.cache_lookup,
363 parse_local_instructions: compile_phase_attribution.parse,
364 parse_tokenize_local_instructions: compile_phase_attribution.parse_tokenize,
365 parse_select_local_instructions: compile_phase_attribution.parse_select,
366 parse_expr_local_instructions: compile_phase_attribution.parse_expr,
367 parse_predicate_local_instructions: compile_phase_attribution.parse_predicate,
368 aggregate_lane_check_local_instructions: compile_phase_attribution
369 .aggregate_lane_check,
370 prepare_local_instructions: compile_phase_attribution.prepare,
371 lower_local_instructions: compile_phase_attribution.lower,
372 bind_local_instructions: compile_phase_attribution.bind,
373 cache_insert_local_instructions: compile_phase_attribution.cache_insert,
374 },
375 plan_lookup_local_instructions: execute_phase_attribution
376 .planner_local_instructions,
377 execution: SqlExecutionAttribution {
378 planner_local_instructions: execute_phase_attribution
379 .planner_local_instructions,
380 store_local_instructions: execute_phase_attribution.store_local_instructions,
381 executor_invocation_local_instructions: execute_phase_attribution
382 .executor_invocation_local_instructions,
383 executor_local_instructions: execute_phase_attribution
384 .executor_local_instructions,
385 response_finalization_local_instructions: execute_phase_attribution
386 .response_finalization_local_instructions,
387 },
388 grouped,
389 scalar_aggregate: SqlScalarAggregateAttribution::from_executor(
390 execute_phase_attribution.scalar_aggregate_terminal,
391 ),
392 pure_covering,
393 store_get_calls,
394 response_decode_local_instructions: 0,
395 execute_local_instructions,
396 total_local_instructions,
397 cache: SqlQueryCacheAttribution {
398 sql_compiled_command_hits: cache_attribution.sql_compiled_command_cache_hits,
399 sql_compiled_command_misses: cache_attribution
400 .sql_compiled_command_cache_misses,
401 shared_query_plan_hits: cache_attribution.shared_query_plan_cache_hits,
402 shared_query_plan_misses: cache_attribution.shared_query_plan_cache_misses,
403 },
404 },
405 ))
406 }
407
408 pub fn execute_sql_update<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
413 where
414 E: PersistedRow<Canister = C> + EntityValue,
415 {
416 let compiled = self.compile_sql_update::<E>(sql)?;
417
418 self.execute_compiled_sql_owned::<E>(compiled)
419 }
420
421 pub fn prepare_sql_ddl<E>(&self, sql: &str) -> Result<SqlDdlPreparationReport, QueryError>
428 where
429 E: PersistedRow<Canister = C> + EntityValue,
430 {
431 let (_, prepared) = self.prepare_sql_ddl_command::<E>(sql)?;
432
433 Ok(prepared.report().clone())
434 }
435
436 fn prepare_sql_ddl_command<E>(
437 &self,
438 sql: &str,
439 ) -> Result<(AcceptedSchemaSnapshot, PreparedSqlDdlCommand), QueryError>
440 where
441 E: PersistedRow<Canister = C> + EntityValue,
442 {
443 let (statement, _) =
444 parse_sql_with_attribution(sql).map_err(QueryError::from_sql_parse_error)?;
445 let (accepted_schema, _) = self
446 .accepted_entity_authority::<E>()
447 .map_err(QueryError::execute)?;
448 let schema_info = SchemaInfo::from_accepted_snapshot_for_model_with_expression_indexes(
449 E::MODEL,
450 &accepted_schema,
451 true,
452 );
453 let prepared = match prepare_sql_ddl_statement(
454 &statement,
455 &accepted_schema,
456 &schema_info,
457 E::Store::PATH,
458 ) {
459 Ok(prepared) => prepared,
460 Err(err) => {
461 return Err(QueryError::unsupported_query(format!(
462 "SQL DDL preparation failed before execution: {err}"
463 )));
464 }
465 };
466
467 Ok((accepted_schema, prepared))
468 }
469
470 pub fn execute_sql_ddl<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
475 where
476 E: PersistedRow<Canister = C> + EntityValue,
477 {
478 let (accepted_before, prepared) = self.prepare_sql_ddl_command::<E>(sql)?;
479 if !prepared.mutates_schema() {
480 return Ok(SqlStatementResult::Ddl(
481 prepared
482 .report()
483 .clone()
484 .with_execution_status(SqlDdlExecutionStatus::NoOp),
485 ));
486 }
487
488 let Some(derivation) = prepared.derivation() else {
489 return Err(QueryError::unsupported_query(
490 "SQL DDL execution could not find a prepared schema derivation".to_string(),
491 ));
492 };
493 let store = self
494 .db
495 .recovered_store(E::Store::PATH)
496 .map_err(QueryError::execute)?;
497
498 let (rows_scanned, index_keys_written) = match prepared.bound().statement() {
499 crate::db::sql::ddl::BoundSqlDdlStatement::AddColumn(_) => {
500 execute_sql_ddl_field_addition(
501 store,
502 E::ENTITY_TAG,
503 E::PATH,
504 &accepted_before,
505 derivation,
506 )
507 .map_err(QueryError::execute)?;
508
509 (0, 0)
510 }
511 crate::db::sql::ddl::BoundSqlDdlStatement::CreateIndex(create)
512 if create.candidate_index().key().is_field_path_only() =>
513 {
514 execute_sql_ddl_field_path_index_addition(
515 store,
516 E::ENTITY_TAG,
517 E::PATH,
518 &accepted_before,
519 derivation,
520 )
521 .map_err(QueryError::execute)?
522 }
523 crate::db::sql::ddl::BoundSqlDdlStatement::CreateIndex(_) => {
524 execute_sql_ddl_expression_index_addition(
525 store,
526 E::ENTITY_TAG,
527 E::PATH,
528 &accepted_before,
529 derivation,
530 )
531 .map_err(QueryError::execute)?
532 }
533 crate::db::sql::ddl::BoundSqlDdlStatement::DropIndex(_) => {
534 execute_sql_ddl_secondary_index_drop(
535 store,
536 E::ENTITY_TAG,
537 E::PATH,
538 &accepted_before,
539 derivation,
540 )
541 .map_err(QueryError::execute)?;
542
543 (0, 0)
544 }
545 crate::db::sql::ddl::BoundSqlDdlStatement::NoOp(_) => (0, 0),
546 };
547
548 Ok(SqlStatementResult::Ddl(
549 prepared
550 .report()
551 .clone()
552 .with_execution_status(SqlDdlExecutionStatus::Published)
553 .with_execution_metrics(rows_scanned, index_keys_written),
554 ))
555 }
556}