1mod attribution;
8mod cache;
9mod compile;
10mod compile_cache;
11mod compiled;
12mod execute;
13mod projection;
14mod result;
15
16#[cfg(feature = "diagnostics")]
17use crate::db::DataStore;
18#[cfg(feature = "diagnostics")]
19use crate::db::executor::{
20 current_pure_covering_decode_local_instructions,
21 current_pure_covering_row_assembly_local_instructions,
22};
23#[cfg(test)]
24use crate::db::sql::parser::parse_sql;
25#[cfg(feature = "diagnostics")]
26use crate::db::{GroupedCountAttribution, GroupedExecutionAttribution};
27use crate::{
28 db::{
29 DbSession, PersistedRow, QueryError,
30 executor::{EntityAuthority, SharedPreparedExecutionPlan},
31 query::intent::StructuralQuery,
32 schema::execute_sql_ddl_field_path_index_addition,
33 schema::{AcceptedSchemaSnapshot, SchemaInfo},
34 session::query::QueryPlanCacheAttribution,
35 session::sql::projection::{
36 projection_fixed_scales_from_projection_spec, projection_labels_from_projection_spec,
37 },
38 sql::{
39 ddl::{PreparedSqlDdlCommand, prepare_sql_ddl_statement},
40 parser::{SqlDdlStatement, SqlExplainTarget, SqlStatement, parse_sql_with_attribution},
41 },
42 },
43 traits::{CanisterKind, EntityValue, Path},
44};
45
46pub(in crate::db::session::sql) use crate::db::diagnostics::measure_local_instruction_delta as measure_sql_stage;
47pub use crate::db::sql::ddl::{SqlDdlExecutionStatus, SqlDdlMutationKind, SqlDdlPreparationReport};
48#[cfg(feature = "diagnostics")]
49pub(in crate::db) use attribution::SqlExecutePhaseAttribution;
50#[cfg(feature = "diagnostics")]
51pub use attribution::{
52 SqlCompileAttribution, SqlExecutionAttribution, SqlPureCoveringAttribution,
53 SqlQueryCacheAttribution, SqlQueryExecutionAttribution, SqlScalarAggregateAttribution,
54};
55pub(in crate::db) use cache::{SqlCacheAttribution, SqlCompiledCommandCacheKey};
56pub(in crate::db::session::sql) use cache::{
57 SqlCompiledCommandSurface, sql_compiled_command_cache_miss_reason,
58};
59pub(in crate::db::session::sql) use compile::{
60 SqlCompileAttributionBuilder, SqlCompilePhaseAttribution,
61};
62pub(in crate::db) use compiled::{CompiledSqlCommand, SqlProjectionContract};
63pub use result::SqlStatementResult;
64
65#[cfg(all(test, not(feature = "diagnostics")))]
66pub(crate) use crate::db::session::sql::projection::with_sql_projection_materialization_metrics;
67#[cfg(feature = "diagnostics")]
68pub use crate::db::session::sql::projection::{
69 SqlProjectionMaterializationMetrics, with_sql_projection_materialization_metrics,
70};
71
72#[cfg(test)]
75pub(in crate::db) fn parse_sql_statement(sql: &str) -> Result<SqlStatement, QueryError> {
76 parse_sql(sql).map_err(QueryError::from_sql_parse_error)
77}
78
79#[doc(hidden)]
84pub fn sql_statement_entity_name(sql: &str) -> Result<Option<String>, QueryError> {
85 let (statement, _) =
86 parse_sql_with_attribution(sql).map_err(QueryError::from_sql_parse_error)?;
87
88 Ok(sql_statement_entity_name_from_statement(&statement).map(str::to_string))
89}
90
91const fn sql_statement_entity_name_from_statement(statement: &SqlStatement) -> Option<&str> {
92 match statement {
93 SqlStatement::Select(statement) => Some(statement.entity.as_str()),
94 SqlStatement::Delete(statement) => Some(statement.entity.as_str()),
95 SqlStatement::Insert(statement) => Some(statement.entity.as_str()),
96 SqlStatement::Update(statement) => Some(statement.entity.as_str()),
97 SqlStatement::Ddl(SqlDdlStatement::CreateIndex(statement)) => {
98 Some(statement.entity.as_str())
99 }
100 SqlStatement::Explain(statement) => match &statement.statement {
101 SqlExplainTarget::Select(statement) => Some(statement.entity.as_str()),
102 SqlExplainTarget::Delete(statement) => Some(statement.entity.as_str()),
103 },
104 SqlStatement::Describe(statement) => Some(statement.entity.as_str()),
105 SqlStatement::ShowIndexes(statement) => Some(statement.entity.as_str()),
106 SqlStatement::ShowColumns(statement) => Some(statement.entity.as_str()),
107 SqlStatement::ShowEntities(_) => None,
108 }
109}
110
111fn measured<T>(stage: impl FnOnce() -> Result<T, QueryError>) -> Result<(u64, T), QueryError> {
115 let (local_instructions, result) = measure_sql_stage(stage);
116 let value = result?;
117
118 Ok((local_instructions, value))
119}
120
121impl<C: CanisterKind> DbSession<C> {
122 fn sql_select_prepared_plan_for_accepted_authority(
126 &self,
127 query: &StructuralQuery,
128 authority: EntityAuthority,
129 accepted_schema: &AcceptedSchemaSnapshot,
130 ) -> Result<
131 (
132 SharedPreparedExecutionPlan,
133 SqlProjectionContract,
134 SqlCacheAttribution,
135 ),
136 QueryError,
137 > {
138 let (prepared_plan, cache_attribution) = self
139 .cached_shared_query_plan_for_accepted_authority(
140 authority.clone(),
141 accepted_schema,
142 query,
143 )?;
144 Ok(Self::sql_select_projection_from_prepared_plan(
145 prepared_plan,
146 authority,
147 cache_attribution,
148 ))
149 }
150
151 fn sql_select_prepared_plan_for_entity<E>(
154 &self,
155 query: &StructuralQuery,
156 ) -> Result<
157 (
158 SharedPreparedExecutionPlan,
159 SqlProjectionContract,
160 SqlCacheAttribution,
161 ),
162 QueryError,
163 >
164 where
165 E: PersistedRow<Canister = C> + EntityValue,
166 {
167 let (accepted_schema, authority) = self
168 .accepted_entity_authority::<E>()
169 .map_err(QueryError::execute)?;
170
171 self.sql_select_prepared_plan_for_accepted_authority(query, authority, &accepted_schema)
172 }
173
174 fn sql_select_projection_from_prepared_plan(
175 prepared_plan: SharedPreparedExecutionPlan,
176 authority: EntityAuthority,
177 cache_attribution: QueryPlanCacheAttribution,
178 ) -> (
179 SharedPreparedExecutionPlan,
180 SqlProjectionContract,
181 SqlCacheAttribution,
182 ) {
183 let projection_spec = prepared_plan
184 .logical_plan()
185 .projection_spec(authority.model());
186 let projection = SqlProjectionContract::new(
187 projection_labels_from_projection_spec(&projection_spec),
188 projection_fixed_scales_from_projection_spec(&projection_spec),
189 );
190
191 (
192 prepared_plan,
193 projection,
194 SqlCacheAttribution::from_shared_query_plan_cache(cache_attribution),
195 )
196 }
197
198 fn ensure_sql_statement_supported_for_surface(
202 statement: &SqlStatement,
203 surface: SqlCompiledCommandSurface,
204 ) -> Result<(), QueryError> {
205 match (surface, statement) {
206 (
207 SqlCompiledCommandSurface::Query,
208 SqlStatement::Select(_)
209 | SqlStatement::Explain(_)
210 | SqlStatement::Describe(_)
211 | SqlStatement::ShowIndexes(_)
212 | SqlStatement::ShowColumns(_)
213 | SqlStatement::ShowEntities(_),
214 )
215 | (
216 SqlCompiledCommandSurface::Update,
217 SqlStatement::Insert(_) | SqlStatement::Update(_) | SqlStatement::Delete(_),
218 ) => Ok(()),
219 (_, SqlStatement::Ddl(_)) => Err(QueryError::unsupported_query(
220 "SQL DDL execution is not supported in this release",
221 )),
222 (SqlCompiledCommandSurface::Query, SqlStatement::Insert(_)) => {
223 Err(QueryError::unsupported_query(
224 "execute_sql_query rejects INSERT; use execute_sql_update::<E>()",
225 ))
226 }
227 (SqlCompiledCommandSurface::Query, SqlStatement::Update(_)) => {
228 Err(QueryError::unsupported_query(
229 "execute_sql_query rejects UPDATE; use execute_sql_update::<E>()",
230 ))
231 }
232 (SqlCompiledCommandSurface::Query, SqlStatement::Delete(_)) => {
233 Err(QueryError::unsupported_query(
234 "execute_sql_query rejects DELETE; use execute_sql_update::<E>()",
235 ))
236 }
237 (SqlCompiledCommandSurface::Update, SqlStatement::Select(_)) => {
238 Err(QueryError::unsupported_query(
239 "execute_sql_update rejects SELECT; use execute_sql_query::<E>()",
240 ))
241 }
242 (SqlCompiledCommandSurface::Update, SqlStatement::Explain(_)) => {
243 Err(QueryError::unsupported_query(
244 "execute_sql_update rejects EXPLAIN; use execute_sql_query::<E>()",
245 ))
246 }
247 (SqlCompiledCommandSurface::Update, SqlStatement::Describe(_)) => {
248 Err(QueryError::unsupported_query(
249 "execute_sql_update rejects DESCRIBE; use execute_sql_query::<E>()",
250 ))
251 }
252 (SqlCompiledCommandSurface::Update, SqlStatement::ShowIndexes(_)) => {
253 Err(QueryError::unsupported_query(
254 "execute_sql_update rejects SHOW INDEXES; use execute_sql_query::<E>()",
255 ))
256 }
257 (SqlCompiledCommandSurface::Update, SqlStatement::ShowColumns(_)) => {
258 Err(QueryError::unsupported_query(
259 "execute_sql_update rejects SHOW COLUMNS; use execute_sql_query::<E>()",
260 ))
261 }
262 (SqlCompiledCommandSurface::Update, SqlStatement::ShowEntities(_)) => {
263 Err(QueryError::unsupported_query(
264 "execute_sql_update rejects SHOW ENTITIES; use execute_sql_query::<E>()",
265 ))
266 }
267 }
268 }
269
270 pub fn execute_sql_query<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
275 where
276 E: PersistedRow<Canister = C> + EntityValue,
277 {
278 let compiled = self.compile_sql_query::<E>(sql)?;
279
280 self.execute_compiled_sql_owned::<E>(compiled)
281 }
282
283 #[cfg(feature = "diagnostics")]
286 #[doc(hidden)]
287 pub fn execute_sql_query_with_attribution<E>(
288 &self,
289 sql: &str,
290 ) -> Result<(SqlStatementResult, SqlQueryExecutionAttribution), QueryError>
291 where
292 E: PersistedRow<Canister = C> + EntityValue,
293 {
294 let (compile_local_instructions, compiled) =
297 measure_sql_stage(|| self.compile_sql_query_with_cache_attribution::<E>(sql));
298 let (compiled, compile_cache_attribution, compile_phase_attribution) = compiled?;
299
300 let store_get_calls_before = DataStore::current_get_call_count();
303 let pure_covering_decode_before = current_pure_covering_decode_local_instructions();
304 let pure_covering_row_assembly_before =
305 current_pure_covering_row_assembly_local_instructions();
306 let (result, execute_cache_attribution, execute_phase_attribution) =
307 self.execute_compiled_sql_with_phase_attribution::<E>(&compiled)?;
308 let store_get_calls =
309 DataStore::current_get_call_count().saturating_sub(store_get_calls_before);
310 let pure_covering_decode_local_instructions =
311 current_pure_covering_decode_local_instructions()
312 .saturating_sub(pure_covering_decode_before);
313 let pure_covering_row_assembly_local_instructions =
314 current_pure_covering_row_assembly_local_instructions()
315 .saturating_sub(pure_covering_row_assembly_before);
316 let execute_local_instructions = execute_phase_attribution
317 .planner_local_instructions
318 .saturating_add(execute_phase_attribution.store_local_instructions)
319 .saturating_add(execute_phase_attribution.executor_local_instructions)
320 .saturating_add(execute_phase_attribution.response_finalization_local_instructions);
321 let cache_attribution = compile_cache_attribution.merge(execute_cache_attribution);
322 let total_local_instructions =
323 compile_local_instructions.saturating_add(execute_local_instructions);
324 let grouped = matches!(&result, SqlStatementResult::Grouped { .. }).then_some(
325 GroupedExecutionAttribution {
326 stream_local_instructions: execute_phase_attribution
327 .grouped_stream_local_instructions,
328 fold_local_instructions: execute_phase_attribution.grouped_fold_local_instructions,
329 finalize_local_instructions: execute_phase_attribution
330 .grouped_finalize_local_instructions,
331 count: GroupedCountAttribution::from_executor(
332 execute_phase_attribution.grouped_count,
333 ),
334 },
335 );
336 let pure_covering = (pure_covering_decode_local_instructions > 0
337 || pure_covering_row_assembly_local_instructions > 0)
338 .then_some(SqlPureCoveringAttribution {
339 decode_local_instructions: pure_covering_decode_local_instructions,
340 row_assembly_local_instructions: pure_covering_row_assembly_local_instructions,
341 });
342
343 Ok((
344 result,
345 SqlQueryExecutionAttribution {
346 compile_local_instructions,
347 compile: SqlCompileAttribution {
348 cache_key_local_instructions: compile_phase_attribution.cache_key,
349 cache_lookup_local_instructions: compile_phase_attribution.cache_lookup,
350 parse_local_instructions: compile_phase_attribution.parse,
351 parse_tokenize_local_instructions: compile_phase_attribution.parse_tokenize,
352 parse_select_local_instructions: compile_phase_attribution.parse_select,
353 parse_expr_local_instructions: compile_phase_attribution.parse_expr,
354 parse_predicate_local_instructions: compile_phase_attribution.parse_predicate,
355 aggregate_lane_check_local_instructions: compile_phase_attribution
356 .aggregate_lane_check,
357 prepare_local_instructions: compile_phase_attribution.prepare,
358 lower_local_instructions: compile_phase_attribution.lower,
359 bind_local_instructions: compile_phase_attribution.bind,
360 cache_insert_local_instructions: compile_phase_attribution.cache_insert,
361 },
362 plan_lookup_local_instructions: execute_phase_attribution
363 .planner_local_instructions,
364 execution: SqlExecutionAttribution {
365 planner_local_instructions: execute_phase_attribution
366 .planner_local_instructions,
367 store_local_instructions: execute_phase_attribution.store_local_instructions,
368 executor_invocation_local_instructions: execute_phase_attribution
369 .executor_invocation_local_instructions,
370 executor_local_instructions: execute_phase_attribution
371 .executor_local_instructions,
372 response_finalization_local_instructions: execute_phase_attribution
373 .response_finalization_local_instructions,
374 },
375 grouped,
376 scalar_aggregate: SqlScalarAggregateAttribution::from_executor(
377 execute_phase_attribution.scalar_aggregate_terminal,
378 ),
379 pure_covering,
380 store_get_calls,
381 response_decode_local_instructions: 0,
382 execute_local_instructions,
383 total_local_instructions,
384 cache: SqlQueryCacheAttribution {
385 sql_compiled_command_hits: cache_attribution.sql_compiled_command_cache_hits,
386 sql_compiled_command_misses: cache_attribution
387 .sql_compiled_command_cache_misses,
388 shared_query_plan_hits: cache_attribution.shared_query_plan_cache_hits,
389 shared_query_plan_misses: cache_attribution.shared_query_plan_cache_misses,
390 },
391 },
392 ))
393 }
394
395 pub fn execute_sql_update<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
400 where
401 E: PersistedRow<Canister = C> + EntityValue,
402 {
403 let compiled = self.compile_sql_update::<E>(sql)?;
404
405 self.execute_compiled_sql_owned::<E>(compiled)
406 }
407
408 pub fn prepare_sql_ddl<E>(&self, sql: &str) -> Result<SqlDdlPreparationReport, QueryError>
415 where
416 E: PersistedRow<Canister = C> + EntityValue,
417 {
418 let (_, prepared) = self.prepare_sql_ddl_command::<E>(sql)?;
419
420 Ok(prepared.report().clone())
421 }
422
423 fn prepare_sql_ddl_command<E>(
424 &self,
425 sql: &str,
426 ) -> Result<(AcceptedSchemaSnapshot, PreparedSqlDdlCommand), QueryError>
427 where
428 E: PersistedRow<Canister = C> + EntityValue,
429 {
430 let (statement, _) =
431 parse_sql_with_attribution(sql).map_err(QueryError::from_sql_parse_error)?;
432 let (accepted_schema, _) = self
433 .accepted_entity_authority::<E>()
434 .map_err(QueryError::execute)?;
435 let schema_info = SchemaInfo::from_accepted_snapshot_for_model(E::MODEL, &accepted_schema);
436 let prepared =
437 prepare_sql_ddl_statement(&statement, &accepted_schema, &schema_info, E::Store::PATH)
438 .map_err(|err| {
439 QueryError::unsupported_query(format!(
440 "SQL DDL preparation failed before execution: {err}"
441 ))
442 })?;
443
444 Ok((accepted_schema, prepared))
445 }
446
447 pub fn execute_sql_ddl<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
452 where
453 E: PersistedRow<Canister = C> + EntityValue,
454 {
455 let (accepted_before, prepared) = self.prepare_sql_ddl_command::<E>(sql)?;
456 let store = self
457 .db
458 .recovered_store(E::Store::PATH)
459 .map_err(QueryError::execute)?;
460
461 execute_sql_ddl_field_path_index_addition(
462 store,
463 E::ENTITY_TAG,
464 E::PATH,
465 &accepted_before,
466 prepared.derivation(),
467 )
468 .map_err(QueryError::execute)?;
469
470 Ok(SqlStatementResult::Ddl(
471 prepared
472 .report()
473 .clone()
474 .with_execution_status(SqlDdlExecutionStatus::Published),
475 ))
476 }
477}