1mod attribution;
8mod cache;
9mod compile;
10mod compile_cache;
11mod compiled;
12mod execute;
13mod projection;
14mod result;
15
16#[cfg(feature = "diagnostics")]
17use crate::db::DataStore;
18#[cfg(feature = "diagnostics")]
19use crate::db::executor::{
20 current_pure_covering_decode_local_instructions,
21 current_pure_covering_row_assembly_local_instructions,
22};
23#[cfg(test)]
24use crate::db::sql::parser::parse_sql;
25#[cfg(feature = "diagnostics")]
26use crate::db::{GroupedCountAttribution, GroupedExecutionAttribution};
27use crate::{
28 db::{
29 DbSession, PersistedRow, QueryError,
30 executor::{EntityAuthority, SharedPreparedExecutionPlan},
31 query::intent::StructuralQuery,
32 schema::{AcceptedSchemaSnapshot, SchemaInfo},
33 schema::{execute_sql_ddl_field_path_index_addition, execute_sql_ddl_secondary_index_drop},
34 session::query::QueryPlanCacheAttribution,
35 session::sql::projection::{
36 projection_fixed_scales_from_projection_spec, projection_labels_from_projection_spec,
37 },
38 sql::{
39 ddl::{PreparedSqlDdlCommand, prepare_sql_ddl_statement},
40 parser::{SqlDdlStatement, SqlExplainTarget, SqlStatement, parse_sql_with_attribution},
41 },
42 },
43 traits::{CanisterKind, EntityValue, Path},
44};
45
46pub(in crate::db::session::sql) use crate::db::diagnostics::measure_local_instruction_delta as measure_sql_stage;
47pub use crate::db::sql::ddl::{SqlDdlExecutionStatus, SqlDdlMutationKind, SqlDdlPreparationReport};
48#[cfg(feature = "diagnostics")]
49pub(in crate::db) use attribution::SqlExecutePhaseAttribution;
50#[cfg(feature = "diagnostics")]
51pub use attribution::{
52 SqlCompileAttribution, SqlExecutionAttribution, SqlPureCoveringAttribution,
53 SqlQueryCacheAttribution, SqlQueryExecutionAttribution, SqlScalarAggregateAttribution,
54};
55pub(in crate::db) use cache::{SqlCacheAttribution, SqlCompiledCommandCacheKey};
56pub(in crate::db::session::sql) use cache::{
57 SqlCompiledCommandSurface, sql_compiled_command_cache_miss_reason,
58};
59pub(in crate::db::session::sql) use compile::{
60 SqlCompileAttributionBuilder, SqlCompilePhaseAttribution,
61};
62pub(in crate::db) use compiled::{CompiledSqlCommand, SqlProjectionContract};
63pub use result::SqlStatementResult;
64
65#[cfg(all(test, not(feature = "diagnostics")))]
66pub(crate) use crate::db::session::sql::projection::with_sql_projection_materialization_metrics;
67#[cfg(feature = "diagnostics")]
68pub use crate::db::session::sql::projection::{
69 SqlProjectionMaterializationMetrics, with_sql_projection_materialization_metrics,
70};
71
72#[cfg(test)]
75pub(in crate::db) fn parse_sql_statement(sql: &str) -> Result<SqlStatement, QueryError> {
76 parse_sql(sql).map_err(QueryError::from_sql_parse_error)
77}
78
79#[doc(hidden)]
84pub fn sql_statement_entity_name(sql: &str) -> Result<Option<String>, QueryError> {
85 let (statement, _) =
86 parse_sql_with_attribution(sql).map_err(QueryError::from_sql_parse_error)?;
87
88 Ok(sql_statement_entity_name_from_statement(&statement).map(str::to_string))
89}
90
91const fn sql_statement_entity_name_from_statement(statement: &SqlStatement) -> Option<&str> {
92 match statement {
93 SqlStatement::Select(statement) => Some(statement.entity.as_str()),
94 SqlStatement::Delete(statement) => Some(statement.entity.as_str()),
95 SqlStatement::Insert(statement) => Some(statement.entity.as_str()),
96 SqlStatement::Update(statement) => Some(statement.entity.as_str()),
97 SqlStatement::Ddl(SqlDdlStatement::CreateIndex(statement)) => {
98 Some(statement.entity.as_str())
99 }
100 SqlStatement::Ddl(SqlDdlStatement::DropIndex(statement)) => match &statement.entity {
101 Some(entity) => Some(entity.as_str()),
102 None => None,
103 },
104 SqlStatement::Explain(statement) => match &statement.statement {
105 SqlExplainTarget::Select(statement) => Some(statement.entity.as_str()),
106 SqlExplainTarget::Delete(statement) => Some(statement.entity.as_str()),
107 },
108 SqlStatement::Describe(statement) => Some(statement.entity.as_str()),
109 SqlStatement::ShowIndexes(statement) => Some(statement.entity.as_str()),
110 SqlStatement::ShowColumns(statement) => Some(statement.entity.as_str()),
111 SqlStatement::ShowEntities(_) => None,
112 }
113}
114
115fn measured<T>(stage: impl FnOnce() -> Result<T, QueryError>) -> Result<(u64, T), QueryError> {
119 let (local_instructions, result) = measure_sql_stage(stage);
120 let value = result?;
121
122 Ok((local_instructions, value))
123}
124
125impl<C: CanisterKind> DbSession<C> {
126 fn sql_select_prepared_plan_for_accepted_authority(
130 &self,
131 query: &StructuralQuery,
132 authority: EntityAuthority,
133 accepted_schema: &AcceptedSchemaSnapshot,
134 ) -> Result<
135 (
136 SharedPreparedExecutionPlan,
137 SqlProjectionContract,
138 SqlCacheAttribution,
139 ),
140 QueryError,
141 > {
142 let (prepared_plan, cache_attribution) = self
143 .cached_shared_query_plan_for_accepted_authority(
144 authority.clone(),
145 accepted_schema,
146 query,
147 )?;
148 Ok(Self::sql_select_projection_from_prepared_plan(
149 prepared_plan,
150 authority,
151 cache_attribution,
152 ))
153 }
154
155 fn sql_select_prepared_plan_for_entity<E>(
158 &self,
159 query: &StructuralQuery,
160 ) -> Result<
161 (
162 SharedPreparedExecutionPlan,
163 SqlProjectionContract,
164 SqlCacheAttribution,
165 ),
166 QueryError,
167 >
168 where
169 E: PersistedRow<Canister = C> + EntityValue,
170 {
171 let (accepted_schema, authority) = self
172 .accepted_entity_authority::<E>()
173 .map_err(QueryError::execute)?;
174
175 self.sql_select_prepared_plan_for_accepted_authority(query, authority, &accepted_schema)
176 }
177
178 fn sql_select_projection_from_prepared_plan(
179 prepared_plan: SharedPreparedExecutionPlan,
180 authority: EntityAuthority,
181 cache_attribution: QueryPlanCacheAttribution,
182 ) -> (
183 SharedPreparedExecutionPlan,
184 SqlProjectionContract,
185 SqlCacheAttribution,
186 ) {
187 let projection_spec = prepared_plan
188 .logical_plan()
189 .projection_spec(authority.model());
190 let projection = SqlProjectionContract::new(
191 projection_labels_from_projection_spec(&projection_spec),
192 projection_fixed_scales_from_projection_spec(&projection_spec),
193 );
194
195 (
196 prepared_plan,
197 projection,
198 SqlCacheAttribution::from_shared_query_plan_cache(cache_attribution),
199 )
200 }
201
202 fn ensure_sql_statement_supported_for_surface(
206 statement: &SqlStatement,
207 surface: SqlCompiledCommandSurface,
208 ) -> Result<(), QueryError> {
209 match (surface, statement) {
210 (
211 SqlCompiledCommandSurface::Query,
212 SqlStatement::Select(_)
213 | SqlStatement::Explain(_)
214 | SqlStatement::Describe(_)
215 | SqlStatement::ShowIndexes(_)
216 | SqlStatement::ShowColumns(_)
217 | SqlStatement::ShowEntities(_),
218 )
219 | (
220 SqlCompiledCommandSurface::Update,
221 SqlStatement::Insert(_) | SqlStatement::Update(_) | SqlStatement::Delete(_),
222 ) => Ok(()),
223 (_, SqlStatement::Ddl(_)) => Err(QueryError::unsupported_query(
224 "SQL DDL execution is not supported in this release",
225 )),
226 (SqlCompiledCommandSurface::Query, SqlStatement::Insert(_)) => {
227 Err(QueryError::unsupported_query(
228 "execute_sql_query rejects INSERT; use execute_sql_update::<E>()",
229 ))
230 }
231 (SqlCompiledCommandSurface::Query, SqlStatement::Update(_)) => {
232 Err(QueryError::unsupported_query(
233 "execute_sql_query rejects UPDATE; use execute_sql_update::<E>()",
234 ))
235 }
236 (SqlCompiledCommandSurface::Query, SqlStatement::Delete(_)) => {
237 Err(QueryError::unsupported_query(
238 "execute_sql_query rejects DELETE; use execute_sql_update::<E>()",
239 ))
240 }
241 (SqlCompiledCommandSurface::Update, SqlStatement::Select(_)) => {
242 Err(QueryError::unsupported_query(
243 "execute_sql_update rejects SELECT; use execute_sql_query::<E>()",
244 ))
245 }
246 (SqlCompiledCommandSurface::Update, SqlStatement::Explain(_)) => {
247 Err(QueryError::unsupported_query(
248 "execute_sql_update rejects EXPLAIN; use execute_sql_query::<E>()",
249 ))
250 }
251 (SqlCompiledCommandSurface::Update, SqlStatement::Describe(_)) => {
252 Err(QueryError::unsupported_query(
253 "execute_sql_update rejects DESCRIBE; use execute_sql_query::<E>()",
254 ))
255 }
256 (SqlCompiledCommandSurface::Update, SqlStatement::ShowIndexes(_)) => {
257 Err(QueryError::unsupported_query(
258 "execute_sql_update rejects SHOW INDEXES; use execute_sql_query::<E>()",
259 ))
260 }
261 (SqlCompiledCommandSurface::Update, SqlStatement::ShowColumns(_)) => {
262 Err(QueryError::unsupported_query(
263 "execute_sql_update rejects SHOW COLUMNS; use execute_sql_query::<E>()",
264 ))
265 }
266 (SqlCompiledCommandSurface::Update, SqlStatement::ShowEntities(_)) => {
267 Err(QueryError::unsupported_query(
268 "execute_sql_update rejects SHOW ENTITIES; use execute_sql_query::<E>()",
269 ))
270 }
271 }
272 }
273
274 pub fn execute_sql_query<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
279 where
280 E: PersistedRow<Canister = C> + EntityValue,
281 {
282 let compiled = self.compile_sql_query::<E>(sql)?;
283
284 self.execute_compiled_sql_owned::<E>(compiled)
285 }
286
287 #[cfg(feature = "diagnostics")]
290 #[doc(hidden)]
291 pub fn execute_sql_query_with_attribution<E>(
292 &self,
293 sql: &str,
294 ) -> Result<(SqlStatementResult, SqlQueryExecutionAttribution), QueryError>
295 where
296 E: PersistedRow<Canister = C> + EntityValue,
297 {
298 let (compile_local_instructions, compiled) =
301 measure_sql_stage(|| self.compile_sql_query_with_cache_attribution::<E>(sql));
302 let (compiled, compile_cache_attribution, compile_phase_attribution) = compiled?;
303
304 let store_get_calls_before = DataStore::current_get_call_count();
307 let pure_covering_decode_before = current_pure_covering_decode_local_instructions();
308 let pure_covering_row_assembly_before =
309 current_pure_covering_row_assembly_local_instructions();
310 let (result, execute_cache_attribution, execute_phase_attribution) =
311 self.execute_compiled_sql_with_phase_attribution::<E>(&compiled)?;
312 let store_get_calls =
313 DataStore::current_get_call_count().saturating_sub(store_get_calls_before);
314 let pure_covering_decode_local_instructions =
315 current_pure_covering_decode_local_instructions()
316 .saturating_sub(pure_covering_decode_before);
317 let pure_covering_row_assembly_local_instructions =
318 current_pure_covering_row_assembly_local_instructions()
319 .saturating_sub(pure_covering_row_assembly_before);
320 let execute_local_instructions = execute_phase_attribution
321 .planner_local_instructions
322 .saturating_add(execute_phase_attribution.store_local_instructions)
323 .saturating_add(execute_phase_attribution.executor_local_instructions)
324 .saturating_add(execute_phase_attribution.response_finalization_local_instructions);
325 let cache_attribution = compile_cache_attribution.merge(execute_cache_attribution);
326 let total_local_instructions =
327 compile_local_instructions.saturating_add(execute_local_instructions);
328 let grouped = matches!(&result, SqlStatementResult::Grouped { .. }).then_some(
329 GroupedExecutionAttribution {
330 stream_local_instructions: execute_phase_attribution
331 .grouped_stream_local_instructions,
332 fold_local_instructions: execute_phase_attribution.grouped_fold_local_instructions,
333 finalize_local_instructions: execute_phase_attribution
334 .grouped_finalize_local_instructions,
335 count: GroupedCountAttribution::from_executor(
336 execute_phase_attribution.grouped_count,
337 ),
338 },
339 );
340 let pure_covering = (pure_covering_decode_local_instructions > 0
341 || pure_covering_row_assembly_local_instructions > 0)
342 .then_some(SqlPureCoveringAttribution {
343 decode_local_instructions: pure_covering_decode_local_instructions,
344 row_assembly_local_instructions: pure_covering_row_assembly_local_instructions,
345 });
346
347 Ok((
348 result,
349 SqlQueryExecutionAttribution {
350 compile_local_instructions,
351 compile: SqlCompileAttribution {
352 cache_key_local_instructions: compile_phase_attribution.cache_key,
353 cache_lookup_local_instructions: compile_phase_attribution.cache_lookup,
354 parse_local_instructions: compile_phase_attribution.parse,
355 parse_tokenize_local_instructions: compile_phase_attribution.parse_tokenize,
356 parse_select_local_instructions: compile_phase_attribution.parse_select,
357 parse_expr_local_instructions: compile_phase_attribution.parse_expr,
358 parse_predicate_local_instructions: compile_phase_attribution.parse_predicate,
359 aggregate_lane_check_local_instructions: compile_phase_attribution
360 .aggregate_lane_check,
361 prepare_local_instructions: compile_phase_attribution.prepare,
362 lower_local_instructions: compile_phase_attribution.lower,
363 bind_local_instructions: compile_phase_attribution.bind,
364 cache_insert_local_instructions: compile_phase_attribution.cache_insert,
365 },
366 plan_lookup_local_instructions: execute_phase_attribution
367 .planner_local_instructions,
368 execution: SqlExecutionAttribution {
369 planner_local_instructions: execute_phase_attribution
370 .planner_local_instructions,
371 store_local_instructions: execute_phase_attribution.store_local_instructions,
372 executor_invocation_local_instructions: execute_phase_attribution
373 .executor_invocation_local_instructions,
374 executor_local_instructions: execute_phase_attribution
375 .executor_local_instructions,
376 response_finalization_local_instructions: execute_phase_attribution
377 .response_finalization_local_instructions,
378 },
379 grouped,
380 scalar_aggregate: SqlScalarAggregateAttribution::from_executor(
381 execute_phase_attribution.scalar_aggregate_terminal,
382 ),
383 pure_covering,
384 store_get_calls,
385 response_decode_local_instructions: 0,
386 execute_local_instructions,
387 total_local_instructions,
388 cache: SqlQueryCacheAttribution {
389 sql_compiled_command_hits: cache_attribution.sql_compiled_command_cache_hits,
390 sql_compiled_command_misses: cache_attribution
391 .sql_compiled_command_cache_misses,
392 shared_query_plan_hits: cache_attribution.shared_query_plan_cache_hits,
393 shared_query_plan_misses: cache_attribution.shared_query_plan_cache_misses,
394 },
395 },
396 ))
397 }
398
399 pub fn execute_sql_update<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
404 where
405 E: PersistedRow<Canister = C> + EntityValue,
406 {
407 let compiled = self.compile_sql_update::<E>(sql)?;
408
409 self.execute_compiled_sql_owned::<E>(compiled)
410 }
411
412 pub fn prepare_sql_ddl<E>(&self, sql: &str) -> Result<SqlDdlPreparationReport, QueryError>
419 where
420 E: PersistedRow<Canister = C> + EntityValue,
421 {
422 let (_, prepared) = self.prepare_sql_ddl_command::<E>(sql)?;
423
424 Ok(prepared.report().clone())
425 }
426
427 fn prepare_sql_ddl_command<E>(
428 &self,
429 sql: &str,
430 ) -> Result<(AcceptedSchemaSnapshot, PreparedSqlDdlCommand), QueryError>
431 where
432 E: PersistedRow<Canister = C> + EntityValue,
433 {
434 let (statement, _) =
435 parse_sql_with_attribution(sql).map_err(QueryError::from_sql_parse_error)?;
436 let (accepted_schema, _) = self
437 .accepted_entity_authority::<E>()
438 .map_err(QueryError::execute)?;
439 let schema_info = SchemaInfo::from_accepted_snapshot_for_model(E::MODEL, &accepted_schema);
440 let prepared = match prepare_sql_ddl_statement(
441 &statement,
442 &accepted_schema,
443 &schema_info,
444 E::Store::PATH,
445 ) {
446 Ok(prepared) => prepared,
447 Err(err) => {
448 return Err(QueryError::unsupported_query(format!(
449 "SQL DDL preparation failed before execution: {err}"
450 )));
451 }
452 };
453
454 Ok((accepted_schema, prepared))
455 }
456
457 pub fn execute_sql_ddl<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
462 where
463 E: PersistedRow<Canister = C> + EntityValue,
464 {
465 let (accepted_before, prepared) = self.prepare_sql_ddl_command::<E>(sql)?;
466 if !prepared.mutates_schema() {
467 return Ok(SqlStatementResult::Ddl(
468 prepared
469 .report()
470 .clone()
471 .with_execution_status(SqlDdlExecutionStatus::NoOp),
472 ));
473 }
474
475 let Some(derivation) = prepared.derivation() else {
476 return Err(QueryError::unsupported_query(
477 "SQL DDL execution could not find a prepared schema derivation".to_string(),
478 ));
479 };
480 let store = self
481 .db
482 .recovered_store(E::Store::PATH)
483 .map_err(QueryError::execute)?;
484
485 let (rows_scanned, index_keys_written) = match prepared.bound().statement() {
486 crate::db::sql::ddl::BoundSqlDdlStatement::CreateIndex(_) => {
487 execute_sql_ddl_field_path_index_addition(
488 store,
489 E::ENTITY_TAG,
490 E::PATH,
491 &accepted_before,
492 derivation,
493 )
494 .map_err(QueryError::execute)?
495 }
496 crate::db::sql::ddl::BoundSqlDdlStatement::DropIndex(_) => {
497 execute_sql_ddl_secondary_index_drop(
498 store,
499 E::ENTITY_TAG,
500 E::PATH,
501 &accepted_before,
502 derivation,
503 )
504 .map_err(QueryError::execute)?;
505
506 (0, 0)
507 }
508 crate::db::sql::ddl::BoundSqlDdlStatement::NoOp(_) => (0, 0),
509 };
510
511 Ok(SqlStatementResult::Ddl(
512 prepared
513 .report()
514 .clone()
515 .with_execution_status(SqlDdlExecutionStatus::Published)
516 .with_execution_metrics(rows_scanned, index_keys_written),
517 ))
518 }
519}