1mod attribution;
8mod cache;
9mod compile;
10mod compile_cache;
11mod compiled;
12mod execute;
13mod projection;
14mod result;
15
16#[cfg(feature = "diagnostics")]
17use crate::db::DataStore;
18#[cfg(feature = "diagnostics")]
19use crate::db::executor::{
20 current_pure_covering_decode_local_instructions,
21 current_pure_covering_row_assembly_local_instructions,
22};
23#[cfg(test)]
24use crate::db::sql::parser::parse_sql;
25#[cfg(feature = "diagnostics")]
26use crate::db::{GroupedCountAttribution, GroupedExecutionAttribution};
27use crate::{
28 db::{
29 DbSession, PersistedRow, QueryError,
30 executor::{EntityAuthority, SharedPreparedExecutionPlan},
31 query::intent::StructuralQuery,
32 schema::{AcceptedSchemaSnapshot, SchemaInfo},
33 schema::{
34 execute_sql_ddl_expression_index_addition, execute_sql_ddl_field_path_index_addition,
35 execute_sql_ddl_secondary_index_drop,
36 },
37 session::query::QueryPlanCacheAttribution,
38 session::sql::projection::{
39 projection_fixed_scales_from_projection_spec, projection_labels_from_projection_spec,
40 },
41 sql::{
42 ddl::{PreparedSqlDdlCommand, prepare_sql_ddl_statement},
43 parser::{SqlDdlStatement, SqlExplainTarget, SqlStatement, parse_sql_with_attribution},
44 },
45 },
46 traits::{CanisterKind, EntityValue, Path},
47};
48
49pub(in crate::db::session::sql) use crate::db::diagnostics::measure_local_instruction_delta as measure_sql_stage;
50pub use crate::db::sql::ddl::{SqlDdlExecutionStatus, SqlDdlMutationKind, SqlDdlPreparationReport};
51#[cfg(feature = "diagnostics")]
52pub(in crate::db) use attribution::SqlExecutePhaseAttribution;
53#[cfg(feature = "diagnostics")]
54pub use attribution::{
55 SqlCompileAttribution, SqlExecutionAttribution, SqlPureCoveringAttribution,
56 SqlQueryCacheAttribution, SqlQueryExecutionAttribution, SqlScalarAggregateAttribution,
57};
58pub(in crate::db) use cache::{SqlCacheAttribution, SqlCompiledCommandCacheKey};
59pub(in crate::db::session::sql) use cache::{
60 SqlCompiledCommandSurface, sql_compiled_command_cache_miss_reason,
61};
62pub(in crate::db::session::sql) use compile::{
63 SqlCompileAttributionBuilder, SqlCompilePhaseAttribution,
64};
65pub(in crate::db) use compiled::{CompiledSqlCommand, SqlProjectionContract};
66pub use result::SqlStatementResult;
67
68#[cfg(all(test, not(feature = "diagnostics")))]
69pub(crate) use crate::db::session::sql::projection::with_sql_projection_materialization_metrics;
70#[cfg(feature = "diagnostics")]
71pub use crate::db::session::sql::projection::{
72 SqlProjectionMaterializationMetrics, with_sql_projection_materialization_metrics,
73};
74
75#[cfg(test)]
78pub(in crate::db) fn parse_sql_statement(sql: &str) -> Result<SqlStatement, QueryError> {
79 parse_sql(sql).map_err(QueryError::from_sql_parse_error)
80}
81
82#[doc(hidden)]
87pub fn sql_statement_entity_name(sql: &str) -> Result<Option<String>, QueryError> {
88 let (statement, _) =
89 parse_sql_with_attribution(sql).map_err(QueryError::from_sql_parse_error)?;
90
91 Ok(sql_statement_entity_name_from_statement(&statement).map(str::to_string))
92}
93
94const fn sql_statement_entity_name_from_statement(statement: &SqlStatement) -> Option<&str> {
95 match statement {
96 SqlStatement::Select(statement) => Some(statement.entity.as_str()),
97 SqlStatement::Delete(statement) => Some(statement.entity.as_str()),
98 SqlStatement::Insert(statement) => Some(statement.entity.as_str()),
99 SqlStatement::Update(statement) => Some(statement.entity.as_str()),
100 SqlStatement::Ddl(SqlDdlStatement::CreateIndex(statement)) => {
101 Some(statement.entity.as_str())
102 }
103 SqlStatement::Ddl(SqlDdlStatement::DropIndex(statement)) => match &statement.entity {
104 Some(entity) => Some(entity.as_str()),
105 None => None,
106 },
107 SqlStatement::Ddl(SqlDdlStatement::AlterTableAddColumn(statement)) => {
108 Some(statement.entity.as_str())
109 }
110 SqlStatement::Explain(statement) => match &statement.statement {
111 SqlExplainTarget::Select(statement) => Some(statement.entity.as_str()),
112 SqlExplainTarget::Delete(statement) => Some(statement.entity.as_str()),
113 },
114 SqlStatement::Describe(statement) => Some(statement.entity.as_str()),
115 SqlStatement::ShowIndexes(statement) => Some(statement.entity.as_str()),
116 SqlStatement::ShowColumns(statement) => Some(statement.entity.as_str()),
117 SqlStatement::ShowEntities(_) => None,
118 }
119}
120
121fn measured<T>(stage: impl FnOnce() -> Result<T, QueryError>) -> Result<(u64, T), QueryError> {
125 let (local_instructions, result) = measure_sql_stage(stage);
126 let value = result?;
127
128 Ok((local_instructions, value))
129}
130
131impl<C: CanisterKind> DbSession<C> {
132 fn sql_select_prepared_plan_for_accepted_authority(
136 &self,
137 query: &StructuralQuery,
138 authority: EntityAuthority,
139 accepted_schema: &AcceptedSchemaSnapshot,
140 ) -> Result<
141 (
142 SharedPreparedExecutionPlan,
143 SqlProjectionContract,
144 SqlCacheAttribution,
145 ),
146 QueryError,
147 > {
148 let (prepared_plan, cache_attribution) = self
149 .cached_shared_query_plan_for_accepted_authority(
150 authority.clone(),
151 accepted_schema,
152 query,
153 )?;
154 Ok(Self::sql_select_projection_from_prepared_plan(
155 prepared_plan,
156 authority,
157 cache_attribution,
158 ))
159 }
160
161 fn sql_select_prepared_plan_for_entity<E>(
164 &self,
165 query: &StructuralQuery,
166 ) -> Result<
167 (
168 SharedPreparedExecutionPlan,
169 SqlProjectionContract,
170 SqlCacheAttribution,
171 ),
172 QueryError,
173 >
174 where
175 E: PersistedRow<Canister = C> + EntityValue,
176 {
177 let (accepted_schema, authority) = self
178 .accepted_entity_authority::<E>()
179 .map_err(QueryError::execute)?;
180
181 self.sql_select_prepared_plan_for_accepted_authority(query, authority, &accepted_schema)
182 }
183
184 fn sql_select_projection_from_prepared_plan(
185 prepared_plan: SharedPreparedExecutionPlan,
186 authority: EntityAuthority,
187 cache_attribution: QueryPlanCacheAttribution,
188 ) -> (
189 SharedPreparedExecutionPlan,
190 SqlProjectionContract,
191 SqlCacheAttribution,
192 ) {
193 let projection_spec = prepared_plan
194 .logical_plan()
195 .projection_spec(authority.model());
196 let projection = SqlProjectionContract::new(
197 projection_labels_from_projection_spec(&projection_spec),
198 projection_fixed_scales_from_projection_spec(&projection_spec),
199 );
200
201 (
202 prepared_plan,
203 projection,
204 SqlCacheAttribution::from_shared_query_plan_cache(cache_attribution),
205 )
206 }
207
208 fn ensure_sql_statement_supported_for_surface(
212 statement: &SqlStatement,
213 surface: SqlCompiledCommandSurface,
214 ) -> Result<(), QueryError> {
215 match (surface, statement) {
216 (
217 SqlCompiledCommandSurface::Query,
218 SqlStatement::Select(_)
219 | SqlStatement::Explain(_)
220 | SqlStatement::Describe(_)
221 | SqlStatement::ShowIndexes(_)
222 | SqlStatement::ShowColumns(_)
223 | SqlStatement::ShowEntities(_),
224 )
225 | (
226 SqlCompiledCommandSurface::Update,
227 SqlStatement::Insert(_) | SqlStatement::Update(_) | SqlStatement::Delete(_),
228 ) => Ok(()),
229 (_, SqlStatement::Ddl(_)) => Err(QueryError::unsupported_query(
230 "SQL DDL execution is not supported in this release",
231 )),
232 (SqlCompiledCommandSurface::Query, SqlStatement::Insert(_)) => {
233 Err(QueryError::unsupported_query(
234 "execute_sql_query rejects INSERT; use execute_sql_update::<E>()",
235 ))
236 }
237 (SqlCompiledCommandSurface::Query, SqlStatement::Update(_)) => {
238 Err(QueryError::unsupported_query(
239 "execute_sql_query rejects UPDATE; use execute_sql_update::<E>()",
240 ))
241 }
242 (SqlCompiledCommandSurface::Query, SqlStatement::Delete(_)) => {
243 Err(QueryError::unsupported_query(
244 "execute_sql_query rejects DELETE; use execute_sql_update::<E>()",
245 ))
246 }
247 (SqlCompiledCommandSurface::Update, SqlStatement::Select(_)) => {
248 Err(QueryError::unsupported_query(
249 "execute_sql_update rejects SELECT; use execute_sql_query::<E>()",
250 ))
251 }
252 (SqlCompiledCommandSurface::Update, SqlStatement::Explain(_)) => {
253 Err(QueryError::unsupported_query(
254 "execute_sql_update rejects EXPLAIN; use execute_sql_query::<E>()",
255 ))
256 }
257 (SqlCompiledCommandSurface::Update, SqlStatement::Describe(_)) => {
258 Err(QueryError::unsupported_query(
259 "execute_sql_update rejects DESCRIBE; use execute_sql_query::<E>()",
260 ))
261 }
262 (SqlCompiledCommandSurface::Update, SqlStatement::ShowIndexes(_)) => {
263 Err(QueryError::unsupported_query(
264 "execute_sql_update rejects SHOW INDEXES; use execute_sql_query::<E>()",
265 ))
266 }
267 (SqlCompiledCommandSurface::Update, SqlStatement::ShowColumns(_)) => {
268 Err(QueryError::unsupported_query(
269 "execute_sql_update rejects SHOW COLUMNS; use execute_sql_query::<E>()",
270 ))
271 }
272 (SqlCompiledCommandSurface::Update, SqlStatement::ShowEntities(_)) => {
273 Err(QueryError::unsupported_query(
274 "execute_sql_update rejects SHOW ENTITIES; use execute_sql_query::<E>()",
275 ))
276 }
277 }
278 }
279
280 pub fn execute_sql_query<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
285 where
286 E: PersistedRow<Canister = C> + EntityValue,
287 {
288 let compiled = self.compile_sql_query::<E>(sql)?;
289
290 self.execute_compiled_sql_owned::<E>(compiled)
291 }
292
293 #[cfg(feature = "diagnostics")]
296 #[doc(hidden)]
297 pub fn execute_sql_query_with_attribution<E>(
298 &self,
299 sql: &str,
300 ) -> Result<(SqlStatementResult, SqlQueryExecutionAttribution), QueryError>
301 where
302 E: PersistedRow<Canister = C> + EntityValue,
303 {
304 let (compile_local_instructions, compiled) =
307 measure_sql_stage(|| self.compile_sql_query_with_cache_attribution::<E>(sql));
308 let (compiled, compile_cache_attribution, compile_phase_attribution) = compiled?;
309
310 let store_get_calls_before = DataStore::current_get_call_count();
313 let pure_covering_decode_before = current_pure_covering_decode_local_instructions();
314 let pure_covering_row_assembly_before =
315 current_pure_covering_row_assembly_local_instructions();
316 let (result, execute_cache_attribution, execute_phase_attribution) =
317 self.execute_compiled_sql_with_phase_attribution::<E>(&compiled)?;
318 let store_get_calls =
319 DataStore::current_get_call_count().saturating_sub(store_get_calls_before);
320 let pure_covering_decode_local_instructions =
321 current_pure_covering_decode_local_instructions()
322 .saturating_sub(pure_covering_decode_before);
323 let pure_covering_row_assembly_local_instructions =
324 current_pure_covering_row_assembly_local_instructions()
325 .saturating_sub(pure_covering_row_assembly_before);
326 let execute_local_instructions = execute_phase_attribution
327 .planner_local_instructions
328 .saturating_add(execute_phase_attribution.store_local_instructions)
329 .saturating_add(execute_phase_attribution.executor_local_instructions)
330 .saturating_add(execute_phase_attribution.response_finalization_local_instructions);
331 let cache_attribution = compile_cache_attribution.merge(execute_cache_attribution);
332 let total_local_instructions =
333 compile_local_instructions.saturating_add(execute_local_instructions);
334 let grouped = matches!(&result, SqlStatementResult::Grouped { .. }).then_some(
335 GroupedExecutionAttribution {
336 stream_local_instructions: execute_phase_attribution
337 .grouped_stream_local_instructions,
338 fold_local_instructions: execute_phase_attribution.grouped_fold_local_instructions,
339 finalize_local_instructions: execute_phase_attribution
340 .grouped_finalize_local_instructions,
341 count: GroupedCountAttribution::from_executor(
342 execute_phase_attribution.grouped_count,
343 ),
344 },
345 );
346 let pure_covering = (pure_covering_decode_local_instructions > 0
347 || pure_covering_row_assembly_local_instructions > 0)
348 .then_some(SqlPureCoveringAttribution {
349 decode_local_instructions: pure_covering_decode_local_instructions,
350 row_assembly_local_instructions: pure_covering_row_assembly_local_instructions,
351 });
352
353 Ok((
354 result,
355 SqlQueryExecutionAttribution {
356 compile_local_instructions,
357 compile: SqlCompileAttribution {
358 cache_key_local_instructions: compile_phase_attribution.cache_key,
359 cache_lookup_local_instructions: compile_phase_attribution.cache_lookup,
360 parse_local_instructions: compile_phase_attribution.parse,
361 parse_tokenize_local_instructions: compile_phase_attribution.parse_tokenize,
362 parse_select_local_instructions: compile_phase_attribution.parse_select,
363 parse_expr_local_instructions: compile_phase_attribution.parse_expr,
364 parse_predicate_local_instructions: compile_phase_attribution.parse_predicate,
365 aggregate_lane_check_local_instructions: compile_phase_attribution
366 .aggregate_lane_check,
367 prepare_local_instructions: compile_phase_attribution.prepare,
368 lower_local_instructions: compile_phase_attribution.lower,
369 bind_local_instructions: compile_phase_attribution.bind,
370 cache_insert_local_instructions: compile_phase_attribution.cache_insert,
371 },
372 plan_lookup_local_instructions: execute_phase_attribution
373 .planner_local_instructions,
374 execution: SqlExecutionAttribution {
375 planner_local_instructions: execute_phase_attribution
376 .planner_local_instructions,
377 store_local_instructions: execute_phase_attribution.store_local_instructions,
378 executor_invocation_local_instructions: execute_phase_attribution
379 .executor_invocation_local_instructions,
380 executor_local_instructions: execute_phase_attribution
381 .executor_local_instructions,
382 response_finalization_local_instructions: execute_phase_attribution
383 .response_finalization_local_instructions,
384 },
385 grouped,
386 scalar_aggregate: SqlScalarAggregateAttribution::from_executor(
387 execute_phase_attribution.scalar_aggregate_terminal,
388 ),
389 pure_covering,
390 store_get_calls,
391 response_decode_local_instructions: 0,
392 execute_local_instructions,
393 total_local_instructions,
394 cache: SqlQueryCacheAttribution {
395 sql_compiled_command_hits: cache_attribution.sql_compiled_command_cache_hits,
396 sql_compiled_command_misses: cache_attribution
397 .sql_compiled_command_cache_misses,
398 shared_query_plan_hits: cache_attribution.shared_query_plan_cache_hits,
399 shared_query_plan_misses: cache_attribution.shared_query_plan_cache_misses,
400 },
401 },
402 ))
403 }
404
405 pub fn execute_sql_update<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
410 where
411 E: PersistedRow<Canister = C> + EntityValue,
412 {
413 let compiled = self.compile_sql_update::<E>(sql)?;
414
415 self.execute_compiled_sql_owned::<E>(compiled)
416 }
417
418 pub fn prepare_sql_ddl<E>(&self, sql: &str) -> Result<SqlDdlPreparationReport, QueryError>
425 where
426 E: PersistedRow<Canister = C> + EntityValue,
427 {
428 let (_, prepared) = self.prepare_sql_ddl_command::<E>(sql)?;
429
430 Ok(prepared.report().clone())
431 }
432
433 fn prepare_sql_ddl_command<E>(
434 &self,
435 sql: &str,
436 ) -> Result<(AcceptedSchemaSnapshot, PreparedSqlDdlCommand), QueryError>
437 where
438 E: PersistedRow<Canister = C> + EntityValue,
439 {
440 let (statement, _) =
441 parse_sql_with_attribution(sql).map_err(QueryError::from_sql_parse_error)?;
442 let (accepted_schema, _) = self
443 .accepted_entity_authority::<E>()
444 .map_err(QueryError::execute)?;
445 let schema_info = SchemaInfo::from_accepted_snapshot_for_model_with_expression_indexes(
446 E::MODEL,
447 &accepted_schema,
448 true,
449 );
450 let prepared = match prepare_sql_ddl_statement(
451 &statement,
452 &accepted_schema,
453 &schema_info,
454 E::Store::PATH,
455 ) {
456 Ok(prepared) => prepared,
457 Err(err) => {
458 return Err(QueryError::unsupported_query(format!(
459 "SQL DDL preparation failed before execution: {err}"
460 )));
461 }
462 };
463
464 Ok((accepted_schema, prepared))
465 }
466
467 pub fn execute_sql_ddl<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
472 where
473 E: PersistedRow<Canister = C> + EntityValue,
474 {
475 let (accepted_before, prepared) = self.prepare_sql_ddl_command::<E>(sql)?;
476 if !prepared.mutates_schema() {
477 return Ok(SqlStatementResult::Ddl(
478 prepared
479 .report()
480 .clone()
481 .with_execution_status(SqlDdlExecutionStatus::NoOp),
482 ));
483 }
484
485 let Some(derivation) = prepared.derivation() else {
486 return Err(QueryError::unsupported_query(
487 "SQL DDL execution could not find a prepared schema derivation".to_string(),
488 ));
489 };
490 let store = self
491 .db
492 .recovered_store(E::Store::PATH)
493 .map_err(QueryError::execute)?;
494
495 let (rows_scanned, index_keys_written) = match prepared.bound().statement() {
496 crate::db::sql::ddl::BoundSqlDdlStatement::CreateIndex(create)
497 if create.candidate_index().key().is_field_path_only() =>
498 {
499 execute_sql_ddl_field_path_index_addition(
500 store,
501 E::ENTITY_TAG,
502 E::PATH,
503 &accepted_before,
504 derivation,
505 )
506 .map_err(QueryError::execute)?
507 }
508 crate::db::sql::ddl::BoundSqlDdlStatement::CreateIndex(_) => {
509 execute_sql_ddl_expression_index_addition(
510 store,
511 E::ENTITY_TAG,
512 E::PATH,
513 &accepted_before,
514 derivation,
515 )
516 .map_err(QueryError::execute)?
517 }
518 crate::db::sql::ddl::BoundSqlDdlStatement::DropIndex(_) => {
519 execute_sql_ddl_secondary_index_drop(
520 store,
521 E::ENTITY_TAG,
522 E::PATH,
523 &accepted_before,
524 derivation,
525 )
526 .map_err(QueryError::execute)?;
527
528 (0, 0)
529 }
530 crate::db::sql::ddl::BoundSqlDdlStatement::NoOp(_) => (0, 0),
531 };
532
533 Ok(SqlStatementResult::Ddl(
534 prepared
535 .report()
536 .clone()
537 .with_execution_status(SqlDdlExecutionStatus::Published)
538 .with_execution_metrics(rows_scanned, index_keys_written),
539 ))
540 }
541}