1mod attribution;
8mod cache;
9mod compile;
10mod compile_cache;
11mod compiled;
12mod execute;
13mod projection;
14mod result;
15
16#[cfg(feature = "diagnostics")]
17use crate::db::DataStore;
18#[cfg(feature = "diagnostics")]
19use crate::db::executor::{
20 current_pure_covering_decode_local_instructions,
21 current_pure_covering_row_assembly_local_instructions,
22};
23#[cfg(test)]
24use crate::db::sql::parser::parse_sql;
25#[cfg(feature = "diagnostics")]
26use crate::db::{GroupedCountAttribution, GroupedExecutionAttribution};
27use crate::{
28 db::{
29 DbSession, PersistedRow, QueryError,
30 executor::{EntityAuthority, SharedPreparedExecutionPlan},
31 query::intent::StructuralQuery,
32 schema::AcceptedSchemaSnapshot,
33 session::query::QueryPlanCacheAttribution,
34 session::sql::projection::{
35 projection_fixed_scales_from_projection_spec, projection_labels_from_projection_spec,
36 },
37 sql::parser::SqlStatement,
38 },
39 traits::{CanisterKind, EntityValue},
40};
41
42pub(in crate::db::session::sql) use crate::db::diagnostics::measure_local_instruction_delta as measure_sql_stage;
43#[cfg(feature = "diagnostics")]
44pub(in crate::db) use attribution::SqlExecutePhaseAttribution;
45#[cfg(feature = "diagnostics")]
46pub use attribution::{
47 SqlCompileAttribution, SqlExecutionAttribution, SqlPureCoveringAttribution,
48 SqlQueryCacheAttribution, SqlQueryExecutionAttribution, SqlScalarAggregateAttribution,
49};
50pub(in crate::db) use cache::{SqlCacheAttribution, SqlCompiledCommandCacheKey};
51pub(in crate::db::session::sql) use cache::{
52 SqlCompiledCommandSurface, sql_compiled_command_cache_miss_reason,
53};
54pub(in crate::db::session::sql) use compile::{
55 SqlCompileAttributionBuilder, SqlCompilePhaseAttribution,
56};
57pub(in crate::db) use compiled::{CompiledSqlCommand, SqlProjectionContract};
58pub use result::SqlStatementResult;
59
60#[cfg(all(test, not(feature = "diagnostics")))]
61pub(crate) use crate::db::session::sql::projection::with_sql_projection_materialization_metrics;
62#[cfg(feature = "diagnostics")]
63pub use crate::db::session::sql::projection::{
64 SqlProjectionMaterializationMetrics, with_sql_projection_materialization_metrics,
65};
66
67#[cfg(test)]
70pub(in crate::db) fn parse_sql_statement(sql: &str) -> Result<SqlStatement, QueryError> {
71 parse_sql(sql).map_err(QueryError::from_sql_parse_error)
72}
73
74fn measured<T>(stage: impl FnOnce() -> Result<T, QueryError>) -> Result<(u64, T), QueryError> {
78 let (local_instructions, result) = measure_sql_stage(stage);
79 let value = result?;
80
81 Ok((local_instructions, value))
82}
83
84impl<C: CanisterKind> DbSession<C> {
85 fn sql_select_prepared_plan_for_accepted_authority(
89 &self,
90 query: &StructuralQuery,
91 authority: EntityAuthority,
92 accepted_schema: &AcceptedSchemaSnapshot,
93 ) -> Result<
94 (
95 SharedPreparedExecutionPlan,
96 SqlProjectionContract,
97 SqlCacheAttribution,
98 ),
99 QueryError,
100 > {
101 let (prepared_plan, cache_attribution) = self
102 .cached_shared_query_plan_for_accepted_authority(
103 authority.clone(),
104 accepted_schema,
105 query,
106 )?;
107 Ok(Self::sql_select_projection_from_prepared_plan(
108 prepared_plan,
109 authority,
110 cache_attribution,
111 ))
112 }
113
114 fn sql_select_prepared_plan_for_entity<E>(
117 &self,
118 query: &StructuralQuery,
119 ) -> Result<
120 (
121 SharedPreparedExecutionPlan,
122 SqlProjectionContract,
123 SqlCacheAttribution,
124 ),
125 QueryError,
126 >
127 where
128 E: PersistedRow<Canister = C> + EntityValue,
129 {
130 let (accepted_schema, authority) = self
131 .accepted_entity_authority::<E>()
132 .map_err(QueryError::execute)?;
133
134 self.sql_select_prepared_plan_for_accepted_authority(query, authority, &accepted_schema)
135 }
136
137 fn sql_select_projection_from_prepared_plan(
138 prepared_plan: SharedPreparedExecutionPlan,
139 authority: EntityAuthority,
140 cache_attribution: QueryPlanCacheAttribution,
141 ) -> (
142 SharedPreparedExecutionPlan,
143 SqlProjectionContract,
144 SqlCacheAttribution,
145 ) {
146 let projection_spec = prepared_plan
147 .logical_plan()
148 .projection_spec(authority.model());
149 let projection = SqlProjectionContract::new(
150 projection_labels_from_projection_spec(&projection_spec),
151 projection_fixed_scales_from_projection_spec(&projection_spec),
152 );
153
154 (
155 prepared_plan,
156 projection,
157 SqlCacheAttribution::from_shared_query_plan_cache(cache_attribution),
158 )
159 }
160
161 fn ensure_sql_statement_supported_for_surface(
165 statement: &SqlStatement,
166 surface: SqlCompiledCommandSurface,
167 ) -> Result<(), QueryError> {
168 match (surface, statement) {
169 (
170 SqlCompiledCommandSurface::Query,
171 SqlStatement::Select(_)
172 | SqlStatement::Explain(_)
173 | SqlStatement::Describe(_)
174 | SqlStatement::ShowIndexes(_)
175 | SqlStatement::ShowColumns(_)
176 | SqlStatement::ShowEntities(_),
177 )
178 | (
179 SqlCompiledCommandSurface::Update,
180 SqlStatement::Insert(_) | SqlStatement::Update(_) | SqlStatement::Delete(_),
181 ) => Ok(()),
182 (SqlCompiledCommandSurface::Query, SqlStatement::Insert(_)) => {
183 Err(QueryError::unsupported_query(
184 "execute_sql_query rejects INSERT; use execute_sql_update::<E>()",
185 ))
186 }
187 (SqlCompiledCommandSurface::Query, SqlStatement::Update(_)) => {
188 Err(QueryError::unsupported_query(
189 "execute_sql_query rejects UPDATE; use execute_sql_update::<E>()",
190 ))
191 }
192 (SqlCompiledCommandSurface::Query, SqlStatement::Delete(_)) => {
193 Err(QueryError::unsupported_query(
194 "execute_sql_query rejects DELETE; use execute_sql_update::<E>()",
195 ))
196 }
197 (SqlCompiledCommandSurface::Update, SqlStatement::Select(_)) => {
198 Err(QueryError::unsupported_query(
199 "execute_sql_update rejects SELECT; use execute_sql_query::<E>()",
200 ))
201 }
202 (SqlCompiledCommandSurface::Update, SqlStatement::Explain(_)) => {
203 Err(QueryError::unsupported_query(
204 "execute_sql_update rejects EXPLAIN; use execute_sql_query::<E>()",
205 ))
206 }
207 (SqlCompiledCommandSurface::Update, SqlStatement::Describe(_)) => {
208 Err(QueryError::unsupported_query(
209 "execute_sql_update rejects DESCRIBE; use execute_sql_query::<E>()",
210 ))
211 }
212 (SqlCompiledCommandSurface::Update, SqlStatement::ShowIndexes(_)) => {
213 Err(QueryError::unsupported_query(
214 "execute_sql_update rejects SHOW INDEXES; use execute_sql_query::<E>()",
215 ))
216 }
217 (SqlCompiledCommandSurface::Update, SqlStatement::ShowColumns(_)) => {
218 Err(QueryError::unsupported_query(
219 "execute_sql_update rejects SHOW COLUMNS; use execute_sql_query::<E>()",
220 ))
221 }
222 (SqlCompiledCommandSurface::Update, SqlStatement::ShowEntities(_)) => {
223 Err(QueryError::unsupported_query(
224 "execute_sql_update rejects SHOW ENTITIES; use execute_sql_query::<E>()",
225 ))
226 }
227 }
228 }
229
230 pub fn execute_sql_query<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
235 where
236 E: PersistedRow<Canister = C> + EntityValue,
237 {
238 let compiled = self.compile_sql_query::<E>(sql)?;
239
240 self.execute_compiled_sql_owned::<E>(compiled)
241 }
242
243 #[cfg(feature = "diagnostics")]
246 #[doc(hidden)]
247 pub fn execute_sql_query_with_attribution<E>(
248 &self,
249 sql: &str,
250 ) -> Result<(SqlStatementResult, SqlQueryExecutionAttribution), QueryError>
251 where
252 E: PersistedRow<Canister = C> + EntityValue,
253 {
254 let (compile_local_instructions, compiled) =
257 measure_sql_stage(|| self.compile_sql_query_with_cache_attribution::<E>(sql));
258 let (compiled, compile_cache_attribution, compile_phase_attribution) = compiled?;
259
260 let store_get_calls_before = DataStore::current_get_call_count();
263 let pure_covering_decode_before = current_pure_covering_decode_local_instructions();
264 let pure_covering_row_assembly_before =
265 current_pure_covering_row_assembly_local_instructions();
266 let (result, execute_cache_attribution, execute_phase_attribution) =
267 self.execute_compiled_sql_with_phase_attribution::<E>(&compiled)?;
268 let store_get_calls =
269 DataStore::current_get_call_count().saturating_sub(store_get_calls_before);
270 let pure_covering_decode_local_instructions =
271 current_pure_covering_decode_local_instructions()
272 .saturating_sub(pure_covering_decode_before);
273 let pure_covering_row_assembly_local_instructions =
274 current_pure_covering_row_assembly_local_instructions()
275 .saturating_sub(pure_covering_row_assembly_before);
276 let execute_local_instructions = execute_phase_attribution
277 .planner_local_instructions
278 .saturating_add(execute_phase_attribution.store_local_instructions)
279 .saturating_add(execute_phase_attribution.executor_local_instructions)
280 .saturating_add(execute_phase_attribution.response_finalization_local_instructions);
281 let cache_attribution = compile_cache_attribution.merge(execute_cache_attribution);
282 let total_local_instructions =
283 compile_local_instructions.saturating_add(execute_local_instructions);
284 let grouped = matches!(&result, SqlStatementResult::Grouped { .. }).then_some(
285 GroupedExecutionAttribution {
286 stream_local_instructions: execute_phase_attribution
287 .grouped_stream_local_instructions,
288 fold_local_instructions: execute_phase_attribution.grouped_fold_local_instructions,
289 finalize_local_instructions: execute_phase_attribution
290 .grouped_finalize_local_instructions,
291 count: GroupedCountAttribution::from_executor(
292 execute_phase_attribution.grouped_count,
293 ),
294 },
295 );
296 let pure_covering = (pure_covering_decode_local_instructions > 0
297 || pure_covering_row_assembly_local_instructions > 0)
298 .then_some(SqlPureCoveringAttribution {
299 decode_local_instructions: pure_covering_decode_local_instructions,
300 row_assembly_local_instructions: pure_covering_row_assembly_local_instructions,
301 });
302
303 Ok((
304 result,
305 SqlQueryExecutionAttribution {
306 compile_local_instructions,
307 compile: SqlCompileAttribution {
308 cache_key_local_instructions: compile_phase_attribution.cache_key,
309 cache_lookup_local_instructions: compile_phase_attribution.cache_lookup,
310 parse_local_instructions: compile_phase_attribution.parse,
311 parse_tokenize_local_instructions: compile_phase_attribution.parse_tokenize,
312 parse_select_local_instructions: compile_phase_attribution.parse_select,
313 parse_expr_local_instructions: compile_phase_attribution.parse_expr,
314 parse_predicate_local_instructions: compile_phase_attribution.parse_predicate,
315 aggregate_lane_check_local_instructions: compile_phase_attribution
316 .aggregate_lane_check,
317 prepare_local_instructions: compile_phase_attribution.prepare,
318 lower_local_instructions: compile_phase_attribution.lower,
319 bind_local_instructions: compile_phase_attribution.bind,
320 cache_insert_local_instructions: compile_phase_attribution.cache_insert,
321 },
322 plan_lookup_local_instructions: execute_phase_attribution
323 .planner_local_instructions,
324 execution: SqlExecutionAttribution {
325 planner_local_instructions: execute_phase_attribution
326 .planner_local_instructions,
327 store_local_instructions: execute_phase_attribution.store_local_instructions,
328 executor_invocation_local_instructions: execute_phase_attribution
329 .executor_invocation_local_instructions,
330 executor_local_instructions: execute_phase_attribution
331 .executor_local_instructions,
332 response_finalization_local_instructions: execute_phase_attribution
333 .response_finalization_local_instructions,
334 },
335 grouped,
336 scalar_aggregate: SqlScalarAggregateAttribution::from_executor(
337 execute_phase_attribution.scalar_aggregate_terminal,
338 ),
339 pure_covering,
340 store_get_calls,
341 response_decode_local_instructions: 0,
342 execute_local_instructions,
343 total_local_instructions,
344 cache: SqlQueryCacheAttribution {
345 sql_compiled_command_hits: cache_attribution.sql_compiled_command_cache_hits,
346 sql_compiled_command_misses: cache_attribution
347 .sql_compiled_command_cache_misses,
348 shared_query_plan_hits: cache_attribution.shared_query_plan_cache_hits,
349 shared_query_plan_misses: cache_attribution.shared_query_plan_cache_misses,
350 },
351 },
352 ))
353 }
354
355 pub fn execute_sql_update<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
360 where
361 E: PersistedRow<Canister = C> + EntityValue,
362 {
363 let compiled = self.compile_sql_update::<E>(sql)?;
364
365 self.execute_compiled_sql_owned::<E>(compiled)
366 }
367}