1mod execute;
8mod explain;
9mod projection;
10
11#[cfg(feature = "perf-attribution")]
12use candid::CandidType;
13#[cfg(feature = "perf-attribution")]
14use serde::Deserialize;
15use std::{cell::RefCell, collections::HashMap};
16
17use crate::db::sql::parser::{SqlDeleteStatement, SqlInsertStatement, SqlUpdateStatement};
18use crate::{
19 db::{
20 DbSession, GroupedRow, PersistedRow, QueryError,
21 commit::CommitSchemaFingerprint,
22 executor::EntityAuthority,
23 query::{
24 intent::StructuralQuery,
25 plan::{AccessPlannedQuery, VisibleIndexes},
26 },
27 schema::commit_schema_fingerprint_for_entity,
28 sql::lowering::{
29 LoweredBaseQueryShape, LoweredSelectShape, LoweredSqlCommand,
30 SqlGlobalAggregateCommandCore,
31 },
32 sql::parser::{SqlStatement, parse_sql},
33 },
34 traits::{CanisterKind, EntityValue},
35};
36
37#[cfg(test)]
38use crate::db::{
39 MissingRowPolicy, PagedGroupedExecutionWithTrace,
40 sql::lowering::{
41 LoweredSelectQueryShape, bind_lowered_sql_query, lower_sql_command_from_prepared_statement,
42 prepare_sql_statement,
43 },
44};
45
46#[cfg(feature = "structural-read-metrics")]
47pub use crate::db::session::sql::projection::{
48 SqlProjectionMaterializationMetrics, with_sql_projection_materialization_metrics,
49};
50
51#[derive(Debug)]
53pub enum SqlStatementResult {
54 Count {
55 row_count: u32,
56 },
57 Projection {
58 columns: Vec<String>,
59 rows: Vec<Vec<crate::value::Value>>,
60 row_count: u32,
61 },
62 ProjectionText {
63 columns: Vec<String>,
64 rows: Vec<Vec<String>>,
65 row_count: u32,
66 },
67 Grouped {
68 columns: Vec<String>,
69 rows: Vec<GroupedRow>,
70 row_count: u32,
71 next_cursor: Option<String>,
72 },
73 Explain(String),
74 Describe(crate::db::EntitySchemaDescription),
75 ShowIndexes(Vec<String>),
76 ShowColumns(Vec<crate::db::EntityFieldDescription>),
77 ShowEntities(Vec<String>),
78}
79
80#[cfg(feature = "perf-attribution")]
91#[derive(CandidType, Clone, Debug, Deserialize, Eq, PartialEq)]
92pub struct SqlQueryExecutionAttribution {
93 pub compile_local_instructions: u64,
94 pub execute_local_instructions: u64,
95 pub total_local_instructions: u64,
96}
97
98#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
99enum SqlCompiledCommandSurface {
100 Query,
101 Update,
102}
103
104#[derive(Clone, Debug, Eq, Hash, PartialEq)]
115pub(in crate::db) struct SqlCompiledCommandCacheKey {
116 surface: SqlCompiledCommandSurface,
117 entity_path: &'static str,
118 schema_fingerprint: CommitSchemaFingerprint,
119 sql: String,
120}
121
122impl SqlCompiledCommandCacheKey {
123 fn query_for_entity<E>(sql: &str) -> Self
124 where
125 E: PersistedRow + EntityValue,
126 {
127 Self::for_entity::<E>(SqlCompiledCommandSurface::Query, sql)
128 }
129
130 fn update_for_entity<E>(sql: &str) -> Self
131 where
132 E: PersistedRow + EntityValue,
133 {
134 Self::for_entity::<E>(SqlCompiledCommandSurface::Update, sql)
135 }
136
137 fn for_entity<E>(surface: SqlCompiledCommandSurface, sql: &str) -> Self
138 where
139 E: PersistedRow + EntityValue,
140 {
141 Self {
142 surface,
143 entity_path: E::PATH,
144 schema_fingerprint: commit_schema_fingerprint_for_entity::<E>(),
145 sql: sql.to_string(),
146 }
147 }
148}
149
150pub(in crate::db) type SqlCompiledCommandCache =
151 HashMap<SqlCompiledCommandCacheKey, CompiledSqlCommand>;
152
153#[derive(Clone, Debug)]
157pub(in crate::db) enum CompiledSqlCommand {
158 Select(LoweredSelectShape),
159 Delete {
160 query: LoweredBaseQueryShape,
161 statement: SqlDeleteStatement,
162 },
163 GlobalAggregate {
164 command: SqlGlobalAggregateCommandCore,
165 label_override: Option<String>,
166 },
167 Explain(LoweredSqlCommand),
168 Insert(SqlInsertStatement),
169 Update(SqlUpdateStatement),
170 DescribeEntity,
171 ShowIndexesEntity,
172 ShowColumnsEntity,
173 ShowEntities,
174}
175
176pub(in crate::db) fn parse_sql_statement(sql: &str) -> Result<SqlStatement, QueryError> {
179 parse_sql(sql).map_err(QueryError::from_sql_parse_error)
180}
181
182#[cfg(feature = "perf-attribution")]
183#[expect(
184 clippy::missing_const_for_fn,
185 reason = "the wasm32 branch reads the runtime performance counter and cannot be const"
186)]
187fn read_sql_local_instruction_counter() -> u64 {
188 #[cfg(target_arch = "wasm32")]
189 {
190 canic_cdk::api::performance_counter(1)
191 }
192
193 #[cfg(not(target_arch = "wasm32"))]
194 {
195 0
196 }
197}
198
199#[cfg(feature = "perf-attribution")]
200fn measure_sql_stage<T, E>(run: impl FnOnce() -> Result<T, E>) -> (u64, Result<T, E>) {
201 let start = read_sql_local_instruction_counter();
202 let result = run();
203 let delta = read_sql_local_instruction_counter().saturating_sub(start);
204
205 (delta, result)
206}
207
208impl<C: CanisterKind> DbSession<C> {
209 fn sql_compiled_command_cache(&self) -> &RefCell<SqlCompiledCommandCache> {
212 self.sql_compiled_command_cache
213 .get_or_init(|| RefCell::new(SqlCompiledCommandCache::new()))
214 }
215
216 #[cfg(test)]
217 pub(in crate::db) fn sql_compiled_command_cache_len(&self) -> usize {
218 self.sql_compiled_command_cache().borrow().len()
219 }
220
221 pub(in crate::db::session::sql) fn build_structural_plan_with_visible_indexes_for_authority(
224 &self,
225 query: StructuralQuery,
226 authority: EntityAuthority,
227 ) -> Result<(VisibleIndexes<'_>, AccessPlannedQuery), QueryError> {
228 let visible_indexes =
229 self.visible_indexes_for_store_model(authority.store_path(), authority.model())?;
230 let plan = query.build_plan_with_visible_indexes(&visible_indexes)?;
231
232 Ok((visible_indexes, plan))
233 }
234
235 fn ensure_sql_query_statement_supported(statement: &SqlStatement) -> Result<(), QueryError> {
238 match statement {
239 SqlStatement::Select(_)
240 | SqlStatement::Explain(_)
241 | SqlStatement::Describe(_)
242 | SqlStatement::ShowIndexes(_)
243 | SqlStatement::ShowColumns(_)
244 | SqlStatement::ShowEntities(_) => Ok(()),
245 SqlStatement::Insert(_) => Err(QueryError::unsupported_query(
246 "execute_sql_query rejects INSERT; use execute_sql_update::<E>()",
247 )),
248 SqlStatement::Update(_) => Err(QueryError::unsupported_query(
249 "execute_sql_query rejects UPDATE; use execute_sql_update::<E>()",
250 )),
251 SqlStatement::Delete(_) => Err(QueryError::unsupported_query(
252 "execute_sql_query rejects DELETE; use execute_sql_update::<E>()",
253 )),
254 }
255 }
256
257 fn ensure_sql_update_statement_supported(statement: &SqlStatement) -> Result<(), QueryError> {
260 match statement {
261 SqlStatement::Insert(_) | SqlStatement::Update(_) | SqlStatement::Delete(_) => Ok(()),
262 SqlStatement::Select(_) => Err(QueryError::unsupported_query(
263 "execute_sql_update rejects SELECT; use execute_sql_query::<E>()",
264 )),
265 SqlStatement::Explain(_) => Err(QueryError::unsupported_query(
266 "execute_sql_update rejects EXPLAIN; use execute_sql_query::<E>()",
267 )),
268 SqlStatement::Describe(_) => Err(QueryError::unsupported_query(
269 "execute_sql_update rejects DESCRIBE; use execute_sql_query::<E>()",
270 )),
271 SqlStatement::ShowIndexes(_) => Err(QueryError::unsupported_query(
272 "execute_sql_update rejects SHOW INDEXES; use execute_sql_query::<E>()",
273 )),
274 SqlStatement::ShowColumns(_) => Err(QueryError::unsupported_query(
275 "execute_sql_update rejects SHOW COLUMNS; use execute_sql_query::<E>()",
276 )),
277 SqlStatement::ShowEntities(_) => Err(QueryError::unsupported_query(
278 "execute_sql_update rejects SHOW ENTITIES; use execute_sql_query::<E>()",
279 )),
280 }
281 }
282
283 pub fn execute_sql_query<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
288 where
289 E: PersistedRow<Canister = C> + EntityValue,
290 {
291 let compiled = self.compile_sql_query::<E>(sql)?;
292
293 self.execute_compiled_sql::<E>(&compiled)
294 }
295
296 #[cfg(feature = "perf-attribution")]
299 #[doc(hidden)]
300 pub fn execute_sql_query_with_attribution<E>(
301 &self,
302 sql: &str,
303 ) -> Result<(SqlStatementResult, SqlQueryExecutionAttribution), QueryError>
304 where
305 E: PersistedRow<Canister = C> + EntityValue,
306 {
307 let (compile_local_instructions, compiled) =
310 measure_sql_stage(|| self.compile_sql_query::<E>(sql));
311 let compiled = compiled?;
312
313 let (execute_local_instructions, result) =
316 measure_sql_stage(|| self.execute_compiled_sql::<E>(&compiled));
317 let result = result?;
318 let total_local_instructions =
319 compile_local_instructions.saturating_add(execute_local_instructions);
320
321 Ok((
322 result,
323 SqlQueryExecutionAttribution {
324 compile_local_instructions,
325 execute_local_instructions,
326 total_local_instructions,
327 },
328 ))
329 }
330
331 pub fn execute_sql_update<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
336 where
337 E: PersistedRow<Canister = C> + EntityValue,
338 {
339 let compiled = self.compile_sql_update::<E>(sql)?;
340
341 self.execute_compiled_sql::<E>(&compiled)
342 }
343
344 #[cfg(test)]
345 pub(in crate::db) fn execute_grouped_sql_query_for_tests<E>(
346 &self,
347 sql: &str,
348 cursor_token: Option<&str>,
349 ) -> Result<PagedGroupedExecutionWithTrace, QueryError>
350 where
351 E: PersistedRow<Canister = C> + EntityValue,
352 {
353 let parsed = parse_sql_statement(sql)?;
354
355 let lowered = lower_sql_command_from_prepared_statement(
356 prepare_sql_statement(parsed, E::MODEL.name())
357 .map_err(QueryError::from_sql_lowering_error)?,
358 E::MODEL.primary_key.name,
359 )
360 .map_err(QueryError::from_sql_lowering_error)?;
361 let Some(query) = lowered.query().cloned() else {
362 return Err(QueryError::unsupported_query(
363 "grouped SELECT helper requires grouped SELECT",
364 ));
365 };
366 if query.select_shape() != Some(LoweredSelectQueryShape::Grouped) {
367 return Err(QueryError::unsupported_query(
368 "grouped SELECT helper requires grouped SELECT",
369 ));
370 }
371 let query = bind_lowered_sql_query::<E>(query, MissingRowPolicy::Ignore)
372 .map_err(QueryError::from_sql_lowering_error)?;
373
374 self.execute_grouped(&query, cursor_token)
375 }
376
377 pub(in crate::db) fn compile_sql_query<E>(
380 &self,
381 sql: &str,
382 ) -> Result<CompiledSqlCommand, QueryError>
383 where
384 E: PersistedRow<Canister = C> + EntityValue,
385 {
386 self.compile_sql_statement_with_cache::<E>(
387 SqlCompiledCommandCacheKey::query_for_entity::<E>(sql),
388 sql,
389 Self::ensure_sql_query_statement_supported,
390 )
391 }
392
393 pub(in crate::db) fn compile_sql_update<E>(
396 &self,
397 sql: &str,
398 ) -> Result<CompiledSqlCommand, QueryError>
399 where
400 E: PersistedRow<Canister = C> + EntityValue,
401 {
402 self.compile_sql_statement_with_cache::<E>(
403 SqlCompiledCommandCacheKey::update_for_entity::<E>(sql),
404 sql,
405 Self::ensure_sql_update_statement_supported,
406 )
407 }
408
409 fn compile_sql_statement_with_cache<E>(
412 &self,
413 cache_key: SqlCompiledCommandCacheKey,
414 sql: &str,
415 ensure_surface_supported: fn(&SqlStatement) -> Result<(), QueryError>,
416 ) -> Result<CompiledSqlCommand, QueryError>
417 where
418 E: PersistedRow<Canister = C> + EntityValue,
419 {
420 {
421 let cache = self.sql_compiled_command_cache().borrow();
422 if let Some(compiled) = cache.get(&cache_key) {
423 return Ok(compiled.clone());
424 }
425 }
426
427 let parsed = parse_sql_statement(sql)?;
428 ensure_surface_supported(&parsed)?;
429 let compiled = Self::compile_sql_statement_inner::<E>(&parsed)?;
430
431 self.sql_compiled_command_cache()
432 .borrow_mut()
433 .insert(cache_key, compiled.clone());
434
435 Ok(compiled)
436 }
437
438 pub(in crate::db) fn compile_sql_statement_inner<E>(
441 sql_statement: &SqlStatement,
442 ) -> Result<CompiledSqlCommand, QueryError>
443 where
444 E: PersistedRow<Canister = C> + EntityValue,
445 {
446 Self::compile_sql_statement_for_authority(sql_statement, EntityAuthority::for_type::<E>())
447 }
448}