icydb_core/db/session/sql/
mod.rs1mod attribution;
8mod cache;
9mod compile;
10mod compile_cache;
11mod compiled;
12mod execute;
13mod projection;
14mod result;
15
16#[cfg(feature = "diagnostics")]
17use crate::db::DataStore;
18#[cfg(feature = "diagnostics")]
19use crate::db::executor::{
20 current_pure_covering_decode_local_instructions,
21 current_pure_covering_row_assembly_local_instructions,
22};
23#[cfg(test)]
24use crate::db::sql::parser::parse_sql;
25#[cfg(feature = "diagnostics")]
26use crate::db::{GroupedCountAttribution, GroupedExecutionAttribution};
27use crate::{
28 db::{
29 DbSession, PersistedRow, QueryError,
30 commit::CommitSchemaFingerprint,
31 executor::{EntityAuthority, SharedPreparedExecutionPlan},
32 query::intent::StructuralQuery,
33 session::sql::projection::{
34 projection_fixed_scales_from_projection_spec, projection_labels_from_projection_spec,
35 },
36 sql::parser::SqlStatement,
37 },
38 traits::{CanisterKind, EntityValue},
39};
40
41pub(in crate::db::session::sql) use crate::db::diagnostics::measure_local_instruction_delta as measure_sql_stage;
42#[cfg(feature = "diagnostics")]
43pub(in crate::db) use attribution::SqlExecutePhaseAttribution;
44#[cfg(feature = "diagnostics")]
45pub use attribution::{
46 SqlCompileAttribution, SqlExecutionAttribution, SqlPureCoveringAttribution,
47 SqlQueryCacheAttribution, SqlQueryExecutionAttribution, SqlScalarAggregateAttribution,
48};
49pub(in crate::db::session::sql) use cache::SqlCompiledCommandSurface;
50pub(in crate::db) use cache::{SqlCacheAttribution, SqlCompiledCommandCacheKey};
51pub(in crate::db::session::sql) use compile::{
52 SqlCompileAttributionBuilder, SqlCompilePhaseAttribution,
53};
54pub(in crate::db) use compiled::{CompiledSqlCommand, SqlProjectionContract};
55pub use result::SqlStatementResult;
56
57#[cfg(all(test, not(feature = "diagnostics")))]
58pub(crate) use crate::db::session::sql::projection::with_sql_projection_materialization_metrics;
59#[cfg(feature = "diagnostics")]
60pub use crate::db::session::sql::projection::{
61 SqlProjectionMaterializationMetrics, with_sql_projection_materialization_metrics,
62};
63
64#[cfg(test)]
67pub(in crate::db) fn parse_sql_statement(sql: &str) -> Result<SqlStatement, QueryError> {
68 parse_sql(sql).map_err(QueryError::from_sql_parse_error)
69}
70
71fn measured<T>(stage: impl FnOnce() -> Result<T, QueryError>) -> Result<(u64, T), QueryError> {
75 let (local_instructions, result) = measure_sql_stage(stage);
76 let value = result?;
77
78 Ok((local_instructions, value))
79}
80
81impl<C: CanisterKind> DbSession<C> {
82 fn sql_select_prepared_plan(
85 &self,
86 query: &StructuralQuery,
87 authority: EntityAuthority,
88 cache_schema_fingerprint: CommitSchemaFingerprint,
89 ) -> Result<
90 (
91 SharedPreparedExecutionPlan,
92 SqlProjectionContract,
93 SqlCacheAttribution,
94 ),
95 QueryError,
96 > {
97 let (prepared_plan, cache_attribution) = self.cached_shared_query_plan_for_authority(
98 authority,
99 cache_schema_fingerprint,
100 query,
101 )?;
102 let projection_spec = prepared_plan
103 .logical_plan()
104 .projection_spec(authority.model());
105 let projection = SqlProjectionContract::new(
106 projection_labels_from_projection_spec(&projection_spec),
107 projection_fixed_scales_from_projection_spec(&projection_spec),
108 );
109
110 Ok((
111 prepared_plan,
112 projection,
113 SqlCacheAttribution::from_shared_query_plan_cache(cache_attribution),
114 ))
115 }
116
117 fn ensure_sql_statement_supported_for_surface(
121 statement: &SqlStatement,
122 surface: SqlCompiledCommandSurface,
123 ) -> Result<(), QueryError> {
124 match (surface, statement) {
125 (
126 SqlCompiledCommandSurface::Query,
127 SqlStatement::Select(_)
128 | SqlStatement::Explain(_)
129 | SqlStatement::Describe(_)
130 | SqlStatement::ShowIndexes(_)
131 | SqlStatement::ShowColumns(_)
132 | SqlStatement::ShowEntities(_),
133 )
134 | (
135 SqlCompiledCommandSurface::Update,
136 SqlStatement::Insert(_) | SqlStatement::Update(_) | SqlStatement::Delete(_),
137 ) => Ok(()),
138 (SqlCompiledCommandSurface::Query, SqlStatement::Insert(_)) => {
139 Err(QueryError::unsupported_query(
140 "execute_sql_query rejects INSERT; use execute_sql_update::<E>()",
141 ))
142 }
143 (SqlCompiledCommandSurface::Query, SqlStatement::Update(_)) => {
144 Err(QueryError::unsupported_query(
145 "execute_sql_query rejects UPDATE; use execute_sql_update::<E>()",
146 ))
147 }
148 (SqlCompiledCommandSurface::Query, SqlStatement::Delete(_)) => {
149 Err(QueryError::unsupported_query(
150 "execute_sql_query rejects DELETE; use execute_sql_update::<E>()",
151 ))
152 }
153 (SqlCompiledCommandSurface::Update, SqlStatement::Select(_)) => {
154 Err(QueryError::unsupported_query(
155 "execute_sql_update rejects SELECT; use execute_sql_query::<E>()",
156 ))
157 }
158 (SqlCompiledCommandSurface::Update, SqlStatement::Explain(_)) => {
159 Err(QueryError::unsupported_query(
160 "execute_sql_update rejects EXPLAIN; use execute_sql_query::<E>()",
161 ))
162 }
163 (SqlCompiledCommandSurface::Update, SqlStatement::Describe(_)) => {
164 Err(QueryError::unsupported_query(
165 "execute_sql_update rejects DESCRIBE; use execute_sql_query::<E>()",
166 ))
167 }
168 (SqlCompiledCommandSurface::Update, SqlStatement::ShowIndexes(_)) => {
169 Err(QueryError::unsupported_query(
170 "execute_sql_update rejects SHOW INDEXES; use execute_sql_query::<E>()",
171 ))
172 }
173 (SqlCompiledCommandSurface::Update, SqlStatement::ShowColumns(_)) => {
174 Err(QueryError::unsupported_query(
175 "execute_sql_update rejects SHOW COLUMNS; use execute_sql_query::<E>()",
176 ))
177 }
178 (SqlCompiledCommandSurface::Update, SqlStatement::ShowEntities(_)) => {
179 Err(QueryError::unsupported_query(
180 "execute_sql_update rejects SHOW ENTITIES; use execute_sql_query::<E>()",
181 ))
182 }
183 }
184 }
185
186 pub fn execute_sql_query<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
191 where
192 E: PersistedRow<Canister = C> + EntityValue,
193 {
194 let compiled = self.compile_sql_query::<E>(sql)?;
195
196 self.execute_compiled_sql_owned::<E>(compiled)
197 }
198
199 #[cfg(feature = "diagnostics")]
202 #[doc(hidden)]
203 pub fn execute_sql_query_with_attribution<E>(
204 &self,
205 sql: &str,
206 ) -> Result<(SqlStatementResult, SqlQueryExecutionAttribution), QueryError>
207 where
208 E: PersistedRow<Canister = C> + EntityValue,
209 {
210 let (compile_local_instructions, compiled) =
213 measure_sql_stage(|| self.compile_sql_query_with_cache_attribution::<E>(sql));
214 let (compiled, compile_cache_attribution, compile_phase_attribution) = compiled?;
215
216 let store_get_calls_before = DataStore::current_get_call_count();
219 let pure_covering_decode_before = current_pure_covering_decode_local_instructions();
220 let pure_covering_row_assembly_before =
221 current_pure_covering_row_assembly_local_instructions();
222 let (result, execute_cache_attribution, execute_phase_attribution) =
223 self.execute_compiled_sql_with_phase_attribution::<E>(&compiled)?;
224 let store_get_calls =
225 DataStore::current_get_call_count().saturating_sub(store_get_calls_before);
226 let pure_covering_decode_local_instructions =
227 current_pure_covering_decode_local_instructions()
228 .saturating_sub(pure_covering_decode_before);
229 let pure_covering_row_assembly_local_instructions =
230 current_pure_covering_row_assembly_local_instructions()
231 .saturating_sub(pure_covering_row_assembly_before);
232 let execute_local_instructions = execute_phase_attribution
233 .planner_local_instructions
234 .saturating_add(execute_phase_attribution.store_local_instructions)
235 .saturating_add(execute_phase_attribution.executor_local_instructions)
236 .saturating_add(execute_phase_attribution.response_finalization_local_instructions);
237 let cache_attribution = compile_cache_attribution.merge(execute_cache_attribution);
238 let total_local_instructions =
239 compile_local_instructions.saturating_add(execute_local_instructions);
240 let grouped = matches!(&result, SqlStatementResult::Grouped { .. }).then_some(
241 GroupedExecutionAttribution {
242 stream_local_instructions: execute_phase_attribution
243 .grouped_stream_local_instructions,
244 fold_local_instructions: execute_phase_attribution.grouped_fold_local_instructions,
245 finalize_local_instructions: execute_phase_attribution
246 .grouped_finalize_local_instructions,
247 count: GroupedCountAttribution::from_executor(
248 execute_phase_attribution.grouped_count,
249 ),
250 },
251 );
252 let pure_covering = (pure_covering_decode_local_instructions > 0
253 || pure_covering_row_assembly_local_instructions > 0)
254 .then_some(SqlPureCoveringAttribution {
255 decode_local_instructions: pure_covering_decode_local_instructions,
256 row_assembly_local_instructions: pure_covering_row_assembly_local_instructions,
257 });
258
259 Ok((
260 result,
261 SqlQueryExecutionAttribution {
262 compile_local_instructions,
263 compile: SqlCompileAttribution {
264 cache_key_local_instructions: compile_phase_attribution.cache_key,
265 cache_lookup_local_instructions: compile_phase_attribution.cache_lookup,
266 parse_local_instructions: compile_phase_attribution.parse,
267 parse_tokenize_local_instructions: compile_phase_attribution.parse_tokenize,
268 parse_select_local_instructions: compile_phase_attribution.parse_select,
269 parse_expr_local_instructions: compile_phase_attribution.parse_expr,
270 parse_predicate_local_instructions: compile_phase_attribution.parse_predicate,
271 aggregate_lane_check_local_instructions: compile_phase_attribution
272 .aggregate_lane_check,
273 prepare_local_instructions: compile_phase_attribution.prepare,
274 lower_local_instructions: compile_phase_attribution.lower,
275 bind_local_instructions: compile_phase_attribution.bind,
276 cache_insert_local_instructions: compile_phase_attribution.cache_insert,
277 },
278 plan_lookup_local_instructions: execute_phase_attribution
279 .planner_local_instructions,
280 execution: SqlExecutionAttribution {
281 planner_local_instructions: execute_phase_attribution
282 .planner_local_instructions,
283 store_local_instructions: execute_phase_attribution.store_local_instructions,
284 executor_invocation_local_instructions: execute_phase_attribution
285 .executor_invocation_local_instructions,
286 executor_local_instructions: execute_phase_attribution
287 .executor_local_instructions,
288 response_finalization_local_instructions: execute_phase_attribution
289 .response_finalization_local_instructions,
290 },
291 grouped,
292 scalar_aggregate: SqlScalarAggregateAttribution::from_executor(
293 execute_phase_attribution.scalar_aggregate_terminal,
294 ),
295 pure_covering,
296 store_get_calls,
297 response_decode_local_instructions: 0,
298 execute_local_instructions,
299 total_local_instructions,
300 cache: SqlQueryCacheAttribution {
301 sql_compiled_command_hits: cache_attribution.sql_compiled_command_cache_hits,
302 sql_compiled_command_misses: cache_attribution
303 .sql_compiled_command_cache_misses,
304 shared_query_plan_hits: cache_attribution.shared_query_plan_cache_hits,
305 shared_query_plan_misses: cache_attribution.shared_query_plan_cache_misses,
306 },
307 },
308 ))
309 }
310
311 pub fn execute_sql_update<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
316 where
317 E: PersistedRow<Canister = C> + EntityValue,
318 {
319 let compiled = self.compile_sql_update::<E>(sql)?;
320
321 self.execute_compiled_sql_owned::<E>(compiled)
322 }
323}