1mod attribution;
8mod cache;
9mod compile;
10mod compile_cache;
11mod compiled;
12mod execute;
13mod projection;
14mod result;
15
16#[cfg(feature = "diagnostics")]
17use crate::db::DataStore;
18#[cfg(feature = "diagnostics")]
19use crate::db::executor::{
20 current_pure_covering_decode_local_instructions,
21 current_pure_covering_row_assembly_local_instructions,
22};
23#[cfg(test)]
24use crate::db::sql::parser::parse_sql;
25#[cfg(feature = "diagnostics")]
26use crate::db::{GroupedCountAttribution, GroupedExecutionAttribution};
27use crate::{
28 db::{
29 DbSession, PersistedRow, QueryError,
30 executor::{EntityAuthority, SharedPreparedExecutionPlan},
31 query::intent::StructuralQuery,
32 schema::{AcceptedSchemaSnapshot, SchemaInfo},
33 schema::{execute_sql_ddl_field_path_index_addition, execute_sql_ddl_secondary_index_drop},
34 session::query::QueryPlanCacheAttribution,
35 session::sql::projection::{
36 projection_fixed_scales_from_projection_spec, projection_labels_from_projection_spec,
37 },
38 sql::{
39 ddl::{PreparedSqlDdlCommand, prepare_sql_ddl_statement},
40 parser::{SqlDdlStatement, SqlExplainTarget, SqlStatement, parse_sql_with_attribution},
41 },
42 },
43 traits::{CanisterKind, EntityValue, Path},
44};
45
46pub(in crate::db::session::sql) use crate::db::diagnostics::measure_local_instruction_delta as measure_sql_stage;
47pub use crate::db::sql::ddl::{SqlDdlExecutionStatus, SqlDdlMutationKind, SqlDdlPreparationReport};
48#[cfg(feature = "diagnostics")]
49pub(in crate::db) use attribution::SqlExecutePhaseAttribution;
50#[cfg(feature = "diagnostics")]
51pub use attribution::{
52 SqlCompileAttribution, SqlExecutionAttribution, SqlPureCoveringAttribution,
53 SqlQueryCacheAttribution, SqlQueryExecutionAttribution, SqlScalarAggregateAttribution,
54};
55pub(in crate::db) use cache::{SqlCacheAttribution, SqlCompiledCommandCacheKey};
56pub(in crate::db::session::sql) use cache::{
57 SqlCompiledCommandSurface, sql_compiled_command_cache_miss_reason,
58};
59pub(in crate::db::session::sql) use compile::{
60 SqlCompileAttributionBuilder, SqlCompilePhaseAttribution,
61};
62pub(in crate::db) use compiled::{CompiledSqlCommand, SqlProjectionContract};
63pub use result::SqlStatementResult;
64
65#[cfg(all(test, not(feature = "diagnostics")))]
66pub(crate) use crate::db::session::sql::projection::with_sql_projection_materialization_metrics;
67#[cfg(feature = "diagnostics")]
68pub use crate::db::session::sql::projection::{
69 SqlProjectionMaterializationMetrics, with_sql_projection_materialization_metrics,
70};
71
72#[cfg(test)]
75pub(in crate::db) fn parse_sql_statement(sql: &str) -> Result<SqlStatement, QueryError> {
76 parse_sql(sql).map_err(QueryError::from_sql_parse_error)
77}
78
79#[doc(hidden)]
84pub fn sql_statement_entity_name(sql: &str) -> Result<Option<String>, QueryError> {
85 let (statement, _) =
86 parse_sql_with_attribution(sql).map_err(QueryError::from_sql_parse_error)?;
87
88 Ok(sql_statement_entity_name_from_statement(&statement).map(str::to_string))
89}
90
91const fn sql_statement_entity_name_from_statement(statement: &SqlStatement) -> Option<&str> {
92 match statement {
93 SqlStatement::Select(statement) => Some(statement.entity.as_str()),
94 SqlStatement::Delete(statement) => Some(statement.entity.as_str()),
95 SqlStatement::Insert(statement) => Some(statement.entity.as_str()),
96 SqlStatement::Update(statement) => Some(statement.entity.as_str()),
97 SqlStatement::Ddl(SqlDdlStatement::CreateIndex(statement)) => {
98 Some(statement.entity.as_str())
99 }
100 SqlStatement::Ddl(SqlDdlStatement::DropIndex(statement)) => Some(statement.entity.as_str()),
101 SqlStatement::Explain(statement) => match &statement.statement {
102 SqlExplainTarget::Select(statement) => Some(statement.entity.as_str()),
103 SqlExplainTarget::Delete(statement) => Some(statement.entity.as_str()),
104 },
105 SqlStatement::Describe(statement) => Some(statement.entity.as_str()),
106 SqlStatement::ShowIndexes(statement) => Some(statement.entity.as_str()),
107 SqlStatement::ShowColumns(statement) => Some(statement.entity.as_str()),
108 SqlStatement::ShowEntities(_) => None,
109 }
110}
111
112fn measured<T>(stage: impl FnOnce() -> Result<T, QueryError>) -> Result<(u64, T), QueryError> {
116 let (local_instructions, result) = measure_sql_stage(stage);
117 let value = result?;
118
119 Ok((local_instructions, value))
120}
121
122impl<C: CanisterKind> DbSession<C> {
123 fn sql_select_prepared_plan_for_accepted_authority(
127 &self,
128 query: &StructuralQuery,
129 authority: EntityAuthority,
130 accepted_schema: &AcceptedSchemaSnapshot,
131 ) -> Result<
132 (
133 SharedPreparedExecutionPlan,
134 SqlProjectionContract,
135 SqlCacheAttribution,
136 ),
137 QueryError,
138 > {
139 let (prepared_plan, cache_attribution) = self
140 .cached_shared_query_plan_for_accepted_authority(
141 authority.clone(),
142 accepted_schema,
143 query,
144 )?;
145 Ok(Self::sql_select_projection_from_prepared_plan(
146 prepared_plan,
147 authority,
148 cache_attribution,
149 ))
150 }
151
152 fn sql_select_prepared_plan_for_entity<E>(
155 &self,
156 query: &StructuralQuery,
157 ) -> Result<
158 (
159 SharedPreparedExecutionPlan,
160 SqlProjectionContract,
161 SqlCacheAttribution,
162 ),
163 QueryError,
164 >
165 where
166 E: PersistedRow<Canister = C> + EntityValue,
167 {
168 let (accepted_schema, authority) = self
169 .accepted_entity_authority::<E>()
170 .map_err(QueryError::execute)?;
171
172 self.sql_select_prepared_plan_for_accepted_authority(query, authority, &accepted_schema)
173 }
174
175 fn sql_select_projection_from_prepared_plan(
176 prepared_plan: SharedPreparedExecutionPlan,
177 authority: EntityAuthority,
178 cache_attribution: QueryPlanCacheAttribution,
179 ) -> (
180 SharedPreparedExecutionPlan,
181 SqlProjectionContract,
182 SqlCacheAttribution,
183 ) {
184 let projection_spec = prepared_plan
185 .logical_plan()
186 .projection_spec(authority.model());
187 let projection = SqlProjectionContract::new(
188 projection_labels_from_projection_spec(&projection_spec),
189 projection_fixed_scales_from_projection_spec(&projection_spec),
190 );
191
192 (
193 prepared_plan,
194 projection,
195 SqlCacheAttribution::from_shared_query_plan_cache(cache_attribution),
196 )
197 }
198
199 fn ensure_sql_statement_supported_for_surface(
203 statement: &SqlStatement,
204 surface: SqlCompiledCommandSurface,
205 ) -> Result<(), QueryError> {
206 match (surface, statement) {
207 (
208 SqlCompiledCommandSurface::Query,
209 SqlStatement::Select(_)
210 | SqlStatement::Explain(_)
211 | SqlStatement::Describe(_)
212 | SqlStatement::ShowIndexes(_)
213 | SqlStatement::ShowColumns(_)
214 | SqlStatement::ShowEntities(_),
215 )
216 | (
217 SqlCompiledCommandSurface::Update,
218 SqlStatement::Insert(_) | SqlStatement::Update(_) | SqlStatement::Delete(_),
219 ) => Ok(()),
220 (_, SqlStatement::Ddl(_)) => Err(QueryError::unsupported_query(
221 "SQL DDL execution is not supported in this release",
222 )),
223 (SqlCompiledCommandSurface::Query, SqlStatement::Insert(_)) => {
224 Err(QueryError::unsupported_query(
225 "execute_sql_query rejects INSERT; use execute_sql_update::<E>()",
226 ))
227 }
228 (SqlCompiledCommandSurface::Query, SqlStatement::Update(_)) => {
229 Err(QueryError::unsupported_query(
230 "execute_sql_query rejects UPDATE; use execute_sql_update::<E>()",
231 ))
232 }
233 (SqlCompiledCommandSurface::Query, SqlStatement::Delete(_)) => {
234 Err(QueryError::unsupported_query(
235 "execute_sql_query rejects DELETE; use execute_sql_update::<E>()",
236 ))
237 }
238 (SqlCompiledCommandSurface::Update, SqlStatement::Select(_)) => {
239 Err(QueryError::unsupported_query(
240 "execute_sql_update rejects SELECT; use execute_sql_query::<E>()",
241 ))
242 }
243 (SqlCompiledCommandSurface::Update, SqlStatement::Explain(_)) => {
244 Err(QueryError::unsupported_query(
245 "execute_sql_update rejects EXPLAIN; use execute_sql_query::<E>()",
246 ))
247 }
248 (SqlCompiledCommandSurface::Update, SqlStatement::Describe(_)) => {
249 Err(QueryError::unsupported_query(
250 "execute_sql_update rejects DESCRIBE; use execute_sql_query::<E>()",
251 ))
252 }
253 (SqlCompiledCommandSurface::Update, SqlStatement::ShowIndexes(_)) => {
254 Err(QueryError::unsupported_query(
255 "execute_sql_update rejects SHOW INDEXES; use execute_sql_query::<E>()",
256 ))
257 }
258 (SqlCompiledCommandSurface::Update, SqlStatement::ShowColumns(_)) => {
259 Err(QueryError::unsupported_query(
260 "execute_sql_update rejects SHOW COLUMNS; use execute_sql_query::<E>()",
261 ))
262 }
263 (SqlCompiledCommandSurface::Update, SqlStatement::ShowEntities(_)) => {
264 Err(QueryError::unsupported_query(
265 "execute_sql_update rejects SHOW ENTITIES; use execute_sql_query::<E>()",
266 ))
267 }
268 }
269 }
270
271 pub fn execute_sql_query<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
276 where
277 E: PersistedRow<Canister = C> + EntityValue,
278 {
279 let compiled = self.compile_sql_query::<E>(sql)?;
280
281 self.execute_compiled_sql_owned::<E>(compiled)
282 }
283
284 #[cfg(feature = "diagnostics")]
287 #[doc(hidden)]
288 pub fn execute_sql_query_with_attribution<E>(
289 &self,
290 sql: &str,
291 ) -> Result<(SqlStatementResult, SqlQueryExecutionAttribution), QueryError>
292 where
293 E: PersistedRow<Canister = C> + EntityValue,
294 {
295 let (compile_local_instructions, compiled) =
298 measure_sql_stage(|| self.compile_sql_query_with_cache_attribution::<E>(sql));
299 let (compiled, compile_cache_attribution, compile_phase_attribution) = compiled?;
300
301 let store_get_calls_before = DataStore::current_get_call_count();
304 let pure_covering_decode_before = current_pure_covering_decode_local_instructions();
305 let pure_covering_row_assembly_before =
306 current_pure_covering_row_assembly_local_instructions();
307 let (result, execute_cache_attribution, execute_phase_attribution) =
308 self.execute_compiled_sql_with_phase_attribution::<E>(&compiled)?;
309 let store_get_calls =
310 DataStore::current_get_call_count().saturating_sub(store_get_calls_before);
311 let pure_covering_decode_local_instructions =
312 current_pure_covering_decode_local_instructions()
313 .saturating_sub(pure_covering_decode_before);
314 let pure_covering_row_assembly_local_instructions =
315 current_pure_covering_row_assembly_local_instructions()
316 .saturating_sub(pure_covering_row_assembly_before);
317 let execute_local_instructions = execute_phase_attribution
318 .planner_local_instructions
319 .saturating_add(execute_phase_attribution.store_local_instructions)
320 .saturating_add(execute_phase_attribution.executor_local_instructions)
321 .saturating_add(execute_phase_attribution.response_finalization_local_instructions);
322 let cache_attribution = compile_cache_attribution.merge(execute_cache_attribution);
323 let total_local_instructions =
324 compile_local_instructions.saturating_add(execute_local_instructions);
325 let grouped = matches!(&result, SqlStatementResult::Grouped { .. }).then_some(
326 GroupedExecutionAttribution {
327 stream_local_instructions: execute_phase_attribution
328 .grouped_stream_local_instructions,
329 fold_local_instructions: execute_phase_attribution.grouped_fold_local_instructions,
330 finalize_local_instructions: execute_phase_attribution
331 .grouped_finalize_local_instructions,
332 count: GroupedCountAttribution::from_executor(
333 execute_phase_attribution.grouped_count,
334 ),
335 },
336 );
337 let pure_covering = (pure_covering_decode_local_instructions > 0
338 || pure_covering_row_assembly_local_instructions > 0)
339 .then_some(SqlPureCoveringAttribution {
340 decode_local_instructions: pure_covering_decode_local_instructions,
341 row_assembly_local_instructions: pure_covering_row_assembly_local_instructions,
342 });
343
344 Ok((
345 result,
346 SqlQueryExecutionAttribution {
347 compile_local_instructions,
348 compile: SqlCompileAttribution {
349 cache_key_local_instructions: compile_phase_attribution.cache_key,
350 cache_lookup_local_instructions: compile_phase_attribution.cache_lookup,
351 parse_local_instructions: compile_phase_attribution.parse,
352 parse_tokenize_local_instructions: compile_phase_attribution.parse_tokenize,
353 parse_select_local_instructions: compile_phase_attribution.parse_select,
354 parse_expr_local_instructions: compile_phase_attribution.parse_expr,
355 parse_predicate_local_instructions: compile_phase_attribution.parse_predicate,
356 aggregate_lane_check_local_instructions: compile_phase_attribution
357 .aggregate_lane_check,
358 prepare_local_instructions: compile_phase_attribution.prepare,
359 lower_local_instructions: compile_phase_attribution.lower,
360 bind_local_instructions: compile_phase_attribution.bind,
361 cache_insert_local_instructions: compile_phase_attribution.cache_insert,
362 },
363 plan_lookup_local_instructions: execute_phase_attribution
364 .planner_local_instructions,
365 execution: SqlExecutionAttribution {
366 planner_local_instructions: execute_phase_attribution
367 .planner_local_instructions,
368 store_local_instructions: execute_phase_attribution.store_local_instructions,
369 executor_invocation_local_instructions: execute_phase_attribution
370 .executor_invocation_local_instructions,
371 executor_local_instructions: execute_phase_attribution
372 .executor_local_instructions,
373 response_finalization_local_instructions: execute_phase_attribution
374 .response_finalization_local_instructions,
375 },
376 grouped,
377 scalar_aggregate: SqlScalarAggregateAttribution::from_executor(
378 execute_phase_attribution.scalar_aggregate_terminal,
379 ),
380 pure_covering,
381 store_get_calls,
382 response_decode_local_instructions: 0,
383 execute_local_instructions,
384 total_local_instructions,
385 cache: SqlQueryCacheAttribution {
386 sql_compiled_command_hits: cache_attribution.sql_compiled_command_cache_hits,
387 sql_compiled_command_misses: cache_attribution
388 .sql_compiled_command_cache_misses,
389 shared_query_plan_hits: cache_attribution.shared_query_plan_cache_hits,
390 shared_query_plan_misses: cache_attribution.shared_query_plan_cache_misses,
391 },
392 },
393 ))
394 }
395
396 pub fn execute_sql_update<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
401 where
402 E: PersistedRow<Canister = C> + EntityValue,
403 {
404 let compiled = self.compile_sql_update::<E>(sql)?;
405
406 self.execute_compiled_sql_owned::<E>(compiled)
407 }
408
409 pub fn prepare_sql_ddl<E>(&self, sql: &str) -> Result<SqlDdlPreparationReport, QueryError>
416 where
417 E: PersistedRow<Canister = C> + EntityValue,
418 {
419 let (_, prepared) = self.prepare_sql_ddl_command::<E>(sql)?;
420
421 Ok(prepared.report().clone())
422 }
423
424 fn prepare_sql_ddl_command<E>(
425 &self,
426 sql: &str,
427 ) -> Result<(AcceptedSchemaSnapshot, PreparedSqlDdlCommand), QueryError>
428 where
429 E: PersistedRow<Canister = C> + EntityValue,
430 {
431 let (statement, _) =
432 parse_sql_with_attribution(sql).map_err(QueryError::from_sql_parse_error)?;
433 let (accepted_schema, _) = self
434 .accepted_entity_authority::<E>()
435 .map_err(QueryError::execute)?;
436 let schema_info = SchemaInfo::from_accepted_snapshot_for_model(E::MODEL, &accepted_schema);
437 let prepared = match prepare_sql_ddl_statement(
438 &statement,
439 &accepted_schema,
440 &schema_info,
441 E::Store::PATH,
442 ) {
443 Ok(prepared) => prepared,
444 Err(err) => {
445 return Err(QueryError::unsupported_query(format!(
446 "SQL DDL preparation failed before execution: {err}"
447 )));
448 }
449 };
450
451 Ok((accepted_schema, prepared))
452 }
453
454 pub fn execute_sql_ddl<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
459 where
460 E: PersistedRow<Canister = C> + EntityValue,
461 {
462 let (accepted_before, prepared) = self.prepare_sql_ddl_command::<E>(sql)?;
463 if !prepared.mutates_schema() {
464 return Ok(SqlStatementResult::Ddl(
465 prepared
466 .report()
467 .clone()
468 .with_execution_status(SqlDdlExecutionStatus::NoOp),
469 ));
470 }
471
472 let Some(derivation) = prepared.derivation() else {
473 return Err(QueryError::unsupported_query(
474 "SQL DDL execution could not find a prepared schema derivation".to_string(),
475 ));
476 };
477 let store = self
478 .db
479 .recovered_store(E::Store::PATH)
480 .map_err(QueryError::execute)?;
481
482 let (rows_scanned, index_keys_written) = match prepared.bound().statement() {
483 crate::db::sql::ddl::BoundSqlDdlStatement::CreateIndex(_) => {
484 execute_sql_ddl_field_path_index_addition(
485 store,
486 E::ENTITY_TAG,
487 E::PATH,
488 &accepted_before,
489 derivation,
490 )
491 .map_err(QueryError::execute)?
492 }
493 crate::db::sql::ddl::BoundSqlDdlStatement::DropIndex(_) => {
494 execute_sql_ddl_secondary_index_drop(
495 store,
496 E::ENTITY_TAG,
497 E::PATH,
498 &accepted_before,
499 derivation,
500 )
501 .map_err(QueryError::execute)?;
502
503 (0, 0)
504 }
505 crate::db::sql::ddl::BoundSqlDdlStatement::NoOp(_) => (0, 0),
506 };
507
508 Ok(SqlStatementResult::Ddl(
509 prepared
510 .report()
511 .clone()
512 .with_execution_status(SqlDdlExecutionStatus::Published)
513 .with_execution_metrics(rows_scanned, index_keys_written),
514 ))
515 }
516}