icydb_core/db/session/sql/
mod.rs1mod attribution;
8mod cache;
9mod compile;
10mod compile_cache;
11mod compiled;
12mod execute;
13mod projection;
14mod result;
15
16#[cfg(feature = "diagnostics")]
17use crate::db::DataStore;
18#[cfg(feature = "diagnostics")]
19use crate::db::executor::{
20 current_pure_covering_decode_local_instructions,
21 current_pure_covering_row_assembly_local_instructions,
22};
23#[cfg(test)]
24use crate::db::sql::parser::parse_sql;
25#[cfg(feature = "diagnostics")]
26use crate::db::{GroupedCountAttribution, GroupedExecutionAttribution};
27use crate::{
28 db::{
29 DbSession, PersistedRow, QueryError,
30 executor::{EntityAuthority, SharedPreparedExecutionPlan},
31 query::intent::StructuralQuery,
32 session::sql::projection::{
33 projection_fixed_scales_from_projection_spec, projection_labels_from_projection_spec,
34 },
35 sql::parser::SqlStatement,
36 },
37 traits::{CanisterKind, EntityValue},
38};
39
40pub(in crate::db::session::sql) use crate::db::diagnostics::measure_local_instruction_delta as measure_sql_stage;
41#[cfg(feature = "diagnostics")]
42pub(in crate::db) use attribution::SqlExecutePhaseAttribution;
43#[cfg(feature = "diagnostics")]
44pub use attribution::{
45 SqlCompileAttribution, SqlExecutionAttribution, SqlPureCoveringAttribution,
46 SqlQueryCacheAttribution, SqlQueryExecutionAttribution, SqlScalarAggregateAttribution,
47};
48pub(in crate::db) use cache::{SqlCacheAttribution, SqlCompiledCommandCacheKey};
49pub(in crate::db::session::sql) use cache::{
50 SqlCompiledCommandSurface, sql_compiled_command_cache_miss_reason,
51};
52pub(in crate::db::session::sql) use compile::{
53 SqlCompileAttributionBuilder, SqlCompilePhaseAttribution,
54};
55pub(in crate::db) use compiled::{CompiledSqlCommand, SqlProjectionContract};
56pub use result::SqlStatementResult;
57
58#[cfg(all(test, not(feature = "diagnostics")))]
59pub(crate) use crate::db::session::sql::projection::with_sql_projection_materialization_metrics;
60#[cfg(feature = "diagnostics")]
61pub use crate::db::session::sql::projection::{
62 SqlProjectionMaterializationMetrics, with_sql_projection_materialization_metrics,
63};
64
65#[cfg(test)]
68pub(in crate::db) fn parse_sql_statement(sql: &str) -> Result<SqlStatement, QueryError> {
69 parse_sql(sql).map_err(QueryError::from_sql_parse_error)
70}
71
72fn measured<T>(stage: impl FnOnce() -> Result<T, QueryError>) -> Result<(u64, T), QueryError> {
76 let (local_instructions, result) = measure_sql_stage(stage);
77 let value = result?;
78
79 Ok((local_instructions, value))
80}
81
82impl<C: CanisterKind> DbSession<C> {
83 fn sql_select_prepared_plan(
86 &self,
87 query: &StructuralQuery,
88 authority: EntityAuthority,
89 ) -> Result<
90 (
91 SharedPreparedExecutionPlan,
92 SqlProjectionContract,
93 SqlCacheAttribution,
94 ),
95 QueryError,
96 > {
97 let (prepared_plan, cache_attribution) =
98 self.cached_shared_query_plan_for_authority(authority, query)?;
99 let projection_spec = prepared_plan
100 .logical_plan()
101 .projection_spec(authority.model());
102 let projection = SqlProjectionContract::new(
103 projection_labels_from_projection_spec(&projection_spec),
104 projection_fixed_scales_from_projection_spec(&projection_spec),
105 );
106
107 Ok((
108 prepared_plan,
109 projection,
110 SqlCacheAttribution::from_shared_query_plan_cache(cache_attribution),
111 ))
112 }
113
114 fn ensure_sql_statement_supported_for_surface(
118 statement: &SqlStatement,
119 surface: SqlCompiledCommandSurface,
120 ) -> Result<(), QueryError> {
121 match (surface, statement) {
122 (
123 SqlCompiledCommandSurface::Query,
124 SqlStatement::Select(_)
125 | SqlStatement::Explain(_)
126 | SqlStatement::Describe(_)
127 | SqlStatement::ShowIndexes(_)
128 | SqlStatement::ShowColumns(_)
129 | SqlStatement::ShowEntities(_),
130 )
131 | (
132 SqlCompiledCommandSurface::Update,
133 SqlStatement::Insert(_) | SqlStatement::Update(_) | SqlStatement::Delete(_),
134 ) => Ok(()),
135 (SqlCompiledCommandSurface::Query, SqlStatement::Insert(_)) => {
136 Err(QueryError::unsupported_query(
137 "execute_sql_query rejects INSERT; use execute_sql_update::<E>()",
138 ))
139 }
140 (SqlCompiledCommandSurface::Query, SqlStatement::Update(_)) => {
141 Err(QueryError::unsupported_query(
142 "execute_sql_query rejects UPDATE; use execute_sql_update::<E>()",
143 ))
144 }
145 (SqlCompiledCommandSurface::Query, SqlStatement::Delete(_)) => {
146 Err(QueryError::unsupported_query(
147 "execute_sql_query rejects DELETE; use execute_sql_update::<E>()",
148 ))
149 }
150 (SqlCompiledCommandSurface::Update, SqlStatement::Select(_)) => {
151 Err(QueryError::unsupported_query(
152 "execute_sql_update rejects SELECT; use execute_sql_query::<E>()",
153 ))
154 }
155 (SqlCompiledCommandSurface::Update, SqlStatement::Explain(_)) => {
156 Err(QueryError::unsupported_query(
157 "execute_sql_update rejects EXPLAIN; use execute_sql_query::<E>()",
158 ))
159 }
160 (SqlCompiledCommandSurface::Update, SqlStatement::Describe(_)) => {
161 Err(QueryError::unsupported_query(
162 "execute_sql_update rejects DESCRIBE; use execute_sql_query::<E>()",
163 ))
164 }
165 (SqlCompiledCommandSurface::Update, SqlStatement::ShowIndexes(_)) => {
166 Err(QueryError::unsupported_query(
167 "execute_sql_update rejects SHOW INDEXES; use execute_sql_query::<E>()",
168 ))
169 }
170 (SqlCompiledCommandSurface::Update, SqlStatement::ShowColumns(_)) => {
171 Err(QueryError::unsupported_query(
172 "execute_sql_update rejects SHOW COLUMNS; use execute_sql_query::<E>()",
173 ))
174 }
175 (SqlCompiledCommandSurface::Update, SqlStatement::ShowEntities(_)) => {
176 Err(QueryError::unsupported_query(
177 "execute_sql_update rejects SHOW ENTITIES; use execute_sql_query::<E>()",
178 ))
179 }
180 }
181 }
182
183 pub fn execute_sql_query<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
188 where
189 E: PersistedRow<Canister = C> + EntityValue,
190 {
191 let compiled = self.compile_sql_query::<E>(sql)?;
192
193 self.execute_compiled_sql_owned::<E>(compiled)
194 }
195
196 #[cfg(feature = "diagnostics")]
199 #[doc(hidden)]
200 pub fn execute_sql_query_with_attribution<E>(
201 &self,
202 sql: &str,
203 ) -> Result<(SqlStatementResult, SqlQueryExecutionAttribution), QueryError>
204 where
205 E: PersistedRow<Canister = C> + EntityValue,
206 {
207 let (compile_local_instructions, compiled) =
210 measure_sql_stage(|| self.compile_sql_query_with_cache_attribution::<E>(sql));
211 let (compiled, compile_cache_attribution, compile_phase_attribution) = compiled?;
212
213 let store_get_calls_before = DataStore::current_get_call_count();
216 let pure_covering_decode_before = current_pure_covering_decode_local_instructions();
217 let pure_covering_row_assembly_before =
218 current_pure_covering_row_assembly_local_instructions();
219 let (result, execute_cache_attribution, execute_phase_attribution) =
220 self.execute_compiled_sql_with_phase_attribution::<E>(&compiled)?;
221 let store_get_calls =
222 DataStore::current_get_call_count().saturating_sub(store_get_calls_before);
223 let pure_covering_decode_local_instructions =
224 current_pure_covering_decode_local_instructions()
225 .saturating_sub(pure_covering_decode_before);
226 let pure_covering_row_assembly_local_instructions =
227 current_pure_covering_row_assembly_local_instructions()
228 .saturating_sub(pure_covering_row_assembly_before);
229 let execute_local_instructions = execute_phase_attribution
230 .planner_local_instructions
231 .saturating_add(execute_phase_attribution.store_local_instructions)
232 .saturating_add(execute_phase_attribution.executor_local_instructions)
233 .saturating_add(execute_phase_attribution.response_finalization_local_instructions);
234 let cache_attribution = compile_cache_attribution.merge(execute_cache_attribution);
235 let total_local_instructions =
236 compile_local_instructions.saturating_add(execute_local_instructions);
237 let grouped = matches!(&result, SqlStatementResult::Grouped { .. }).then_some(
238 GroupedExecutionAttribution {
239 stream_local_instructions: execute_phase_attribution
240 .grouped_stream_local_instructions,
241 fold_local_instructions: execute_phase_attribution.grouped_fold_local_instructions,
242 finalize_local_instructions: execute_phase_attribution
243 .grouped_finalize_local_instructions,
244 count: GroupedCountAttribution::from_executor(
245 execute_phase_attribution.grouped_count,
246 ),
247 },
248 );
249 let pure_covering = (pure_covering_decode_local_instructions > 0
250 || pure_covering_row_assembly_local_instructions > 0)
251 .then_some(SqlPureCoveringAttribution {
252 decode_local_instructions: pure_covering_decode_local_instructions,
253 row_assembly_local_instructions: pure_covering_row_assembly_local_instructions,
254 });
255
256 Ok((
257 result,
258 SqlQueryExecutionAttribution {
259 compile_local_instructions,
260 compile: SqlCompileAttribution {
261 cache_key_local_instructions: compile_phase_attribution.cache_key,
262 cache_lookup_local_instructions: compile_phase_attribution.cache_lookup,
263 parse_local_instructions: compile_phase_attribution.parse,
264 parse_tokenize_local_instructions: compile_phase_attribution.parse_tokenize,
265 parse_select_local_instructions: compile_phase_attribution.parse_select,
266 parse_expr_local_instructions: compile_phase_attribution.parse_expr,
267 parse_predicate_local_instructions: compile_phase_attribution.parse_predicate,
268 aggregate_lane_check_local_instructions: compile_phase_attribution
269 .aggregate_lane_check,
270 prepare_local_instructions: compile_phase_attribution.prepare,
271 lower_local_instructions: compile_phase_attribution.lower,
272 bind_local_instructions: compile_phase_attribution.bind,
273 cache_insert_local_instructions: compile_phase_attribution.cache_insert,
274 },
275 plan_lookup_local_instructions: execute_phase_attribution
276 .planner_local_instructions,
277 execution: SqlExecutionAttribution {
278 planner_local_instructions: execute_phase_attribution
279 .planner_local_instructions,
280 store_local_instructions: execute_phase_attribution.store_local_instructions,
281 executor_invocation_local_instructions: execute_phase_attribution
282 .executor_invocation_local_instructions,
283 executor_local_instructions: execute_phase_attribution
284 .executor_local_instructions,
285 response_finalization_local_instructions: execute_phase_attribution
286 .response_finalization_local_instructions,
287 },
288 grouped,
289 scalar_aggregate: SqlScalarAggregateAttribution::from_executor(
290 execute_phase_attribution.scalar_aggregate_terminal,
291 ),
292 pure_covering,
293 store_get_calls,
294 response_decode_local_instructions: 0,
295 execute_local_instructions,
296 total_local_instructions,
297 cache: SqlQueryCacheAttribution {
298 sql_compiled_command_hits: cache_attribution.sql_compiled_command_cache_hits,
299 sql_compiled_command_misses: cache_attribution
300 .sql_compiled_command_cache_misses,
301 shared_query_plan_hits: cache_attribution.shared_query_plan_cache_hits,
302 shared_query_plan_misses: cache_attribution.shared_query_plan_cache_misses,
303 },
304 },
305 ))
306 }
307
308 pub fn execute_sql_update<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
313 where
314 E: PersistedRow<Canister = C> + EntityValue,
315 {
316 let compiled = self.compile_sql_update::<E>(sql)?;
317
318 self.execute_compiled_sql_owned::<E>(compiled)
319 }
320}