1mod execute;
8mod explain;
9mod projection;
10
11#[cfg(feature = "diagnostics")]
12use candid::CandidType;
13use icydb_utils::Xxh3;
14#[cfg(feature = "diagnostics")]
15use serde::Deserialize;
16use std::{cell::RefCell, collections::HashMap, hash::BuildHasherDefault};
17
18type CacheBuildHasher = BuildHasherDefault<Xxh3>;
19
20const SQL_COMPILED_COMMAND_CACHE_METHOD_VERSION: u8 = 1;
23const SQL_SELECT_PLAN_CACHE_METHOD_VERSION: u8 = 1;
24
25#[cfg(feature = "diagnostics")]
26use crate::db::DataStore;
27#[cfg(feature = "diagnostics")]
28use crate::db::executor::GroupedCountAttribution;
29#[cfg(feature = "diagnostics")]
30use crate::db::session::sql::projection::{
31 current_pure_covering_decode_local_instructions,
32 current_pure_covering_row_assembly_local_instructions,
33};
34use crate::db::sql::parser::{SqlDeleteStatement, SqlInsertStatement, SqlUpdateStatement};
35use crate::{
36 db::{
37 DbSession, GroupedRow, PersistedRow, QueryError,
38 commit::CommitSchemaFingerprint,
39 executor::{EntityAuthority, SharedPreparedExecutionPlan},
40 query::{
41 intent::StructuralQuery,
42 plan::{AccessPlannedQuery, VisibleIndexes},
43 },
44 schema::commit_schema_fingerprint_for_entity,
45 session::query::QueryPlanCacheAttribution,
46 session::sql::projection::{
47 projection_fixed_scales_from_projection_spec, projection_labels_from_projection_spec,
48 },
49 sql::lowering::{LoweredBaseQueryShape, LoweredSqlCommand, SqlGlobalAggregateCommandCore},
50 sql::parser::{SqlStatement, parse_sql},
51 },
52 traits::{CanisterKind, EntityValue},
53};
54
55#[cfg(test)]
56use crate::db::{
57 MissingRowPolicy, PagedGroupedExecutionWithTrace,
58 sql::lowering::{
59 bind_lowered_sql_query, lower_sql_command_from_prepared_statement, prepare_sql_statement,
60 },
61};
62
63#[cfg(all(test, not(feature = "diagnostics")))]
64pub(crate) use crate::db::session::sql::projection::with_sql_projection_materialization_metrics;
65#[cfg(feature = "diagnostics")]
66pub use crate::db::session::sql::projection::{
67 SqlProjectionMaterializationMetrics, with_sql_projection_materialization_metrics,
68};
69
70#[derive(Debug)]
72pub enum SqlStatementResult {
73 Count {
74 row_count: u32,
75 },
76 Projection {
77 columns: Vec<String>,
78 fixed_scales: Vec<Option<u32>>,
79 rows: Vec<Vec<crate::value::Value>>,
80 row_count: u32,
81 },
82 ProjectionText {
83 columns: Vec<String>,
84 rows: Vec<Vec<String>>,
85 row_count: u32,
86 },
87 Grouped {
88 columns: Vec<String>,
89 fixed_scales: Vec<Option<u32>>,
90 rows: Vec<GroupedRow>,
91 row_count: u32,
92 next_cursor: Option<String>,
93 },
94 Explain(String),
95 Describe(crate::db::EntitySchemaDescription),
96 ShowIndexes(Vec<String>),
97 ShowColumns(Vec<crate::db::EntityFieldDescription>),
98 ShowEntities(Vec<String>),
99}
100
101#[cfg(feature = "diagnostics")]
112#[derive(CandidType, Clone, Debug, Deserialize, Eq, PartialEq)]
113pub struct SqlQueryExecutionAttribution {
114 pub compile_local_instructions: u64,
115 pub planner_local_instructions: u64,
116 pub store_local_instructions: u64,
117 pub executor_local_instructions: u64,
118 pub grouped_stream_local_instructions: u64,
119 pub grouped_fold_local_instructions: u64,
120 pub grouped_finalize_local_instructions: u64,
121 pub grouped_count_borrowed_hash_computations: u64,
122 pub grouped_count_bucket_candidate_checks: u64,
123 pub grouped_count_existing_group_hits: u64,
124 pub grouped_count_new_group_inserts: u64,
125 pub grouped_count_row_materialization_local_instructions: u64,
126 pub grouped_count_group_lookup_local_instructions: u64,
127 pub grouped_count_existing_group_update_local_instructions: u64,
128 pub grouped_count_new_group_insert_local_instructions: u64,
129 pub pure_covering_decode_local_instructions: u64,
130 pub pure_covering_row_assembly_local_instructions: u64,
131 pub store_get_calls: u64,
132 pub response_decode_local_instructions: u64,
133 pub execute_local_instructions: u64,
134 pub total_local_instructions: u64,
135 pub sql_compiled_command_cache_hits: u64,
136 pub sql_compiled_command_cache_misses: u64,
137 pub sql_select_plan_cache_hits: u64,
138 pub sql_select_plan_cache_misses: u64,
139 pub shared_query_plan_cache_hits: u64,
140 pub shared_query_plan_cache_misses: u64,
141}
142
143#[cfg(feature = "diagnostics")]
147#[derive(Clone, Copy, Debug, Eq, PartialEq)]
148pub(in crate::db) struct SqlExecutePhaseAttribution {
149 pub planner_local_instructions: u64,
150 pub store_local_instructions: u64,
151 pub executor_local_instructions: u64,
152 pub grouped_stream_local_instructions: u64,
153 pub grouped_fold_local_instructions: u64,
154 pub grouped_finalize_local_instructions: u64,
155 pub grouped_count: GroupedCountAttribution,
156}
157
158#[cfg(feature = "diagnostics")]
159impl SqlExecutePhaseAttribution {
160 #[must_use]
161 pub(in crate::db) const fn from_execute_total_and_store_total(
162 execute_local_instructions: u64,
163 store_local_instructions: u64,
164 ) -> Self {
165 Self {
166 planner_local_instructions: 0,
167 store_local_instructions,
168 executor_local_instructions: execute_local_instructions
169 .saturating_sub(store_local_instructions),
170 grouped_stream_local_instructions: 0,
171 grouped_fold_local_instructions: 0,
172 grouped_finalize_local_instructions: 0,
173 grouped_count: GroupedCountAttribution::none(),
174 }
175 }
176}
177
178#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
182pub(in crate::db) struct SqlCacheAttribution {
183 pub sql_compiled_command_cache_hits: u64,
184 pub sql_compiled_command_cache_misses: u64,
185 pub sql_select_plan_cache_hits: u64,
186 pub sql_select_plan_cache_misses: u64,
187 pub shared_query_plan_cache_hits: u64,
188 pub shared_query_plan_cache_misses: u64,
189}
190
191impl SqlCacheAttribution {
192 #[must_use]
193 const fn none() -> Self {
194 Self {
195 sql_compiled_command_cache_hits: 0,
196 sql_compiled_command_cache_misses: 0,
197 sql_select_plan_cache_hits: 0,
198 sql_select_plan_cache_misses: 0,
199 shared_query_plan_cache_hits: 0,
200 shared_query_plan_cache_misses: 0,
201 }
202 }
203
204 #[must_use]
205 const fn sql_compiled_command_cache_hit() -> Self {
206 Self {
207 sql_compiled_command_cache_hits: 1,
208 ..Self::none()
209 }
210 }
211
212 #[must_use]
213 const fn sql_compiled_command_cache_miss() -> Self {
214 Self {
215 sql_compiled_command_cache_misses: 1,
216 ..Self::none()
217 }
218 }
219
220 #[must_use]
221 const fn sql_select_plan_cache_hit() -> Self {
222 Self {
223 sql_select_plan_cache_hits: 1,
224 ..Self::none()
225 }
226 }
227
228 #[must_use]
229 const fn sql_select_plan_cache_miss() -> Self {
230 Self {
231 sql_select_plan_cache_misses: 1,
232 ..Self::none()
233 }
234 }
235
236 #[must_use]
237 const fn from_shared_query_plan_cache(attribution: QueryPlanCacheAttribution) -> Self {
238 Self {
239 shared_query_plan_cache_hits: attribution.hits,
240 shared_query_plan_cache_misses: attribution.misses,
241 ..Self::none()
242 }
243 }
244
245 #[must_use]
246 const fn merge(self, other: Self) -> Self {
247 Self {
248 sql_compiled_command_cache_hits: self
249 .sql_compiled_command_cache_hits
250 .saturating_add(other.sql_compiled_command_cache_hits),
251 sql_compiled_command_cache_misses: self
252 .sql_compiled_command_cache_misses
253 .saturating_add(other.sql_compiled_command_cache_misses),
254 sql_select_plan_cache_hits: self
255 .sql_select_plan_cache_hits
256 .saturating_add(other.sql_select_plan_cache_hits),
257 sql_select_plan_cache_misses: self
258 .sql_select_plan_cache_misses
259 .saturating_add(other.sql_select_plan_cache_misses),
260 shared_query_plan_cache_hits: self
261 .shared_query_plan_cache_hits
262 .saturating_add(other.shared_query_plan_cache_hits),
263 shared_query_plan_cache_misses: self
264 .shared_query_plan_cache_misses
265 .saturating_add(other.shared_query_plan_cache_misses),
266 }
267 }
268}
269
270#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
271enum SqlCompiledCommandSurface {
272 Query,
273 Update,
274}
275
276#[derive(Clone, Debug, Eq, Hash, PartialEq)]
287pub(in crate::db) struct SqlCompiledCommandCacheKey {
288 cache_method_version: u8,
289 surface: SqlCompiledCommandSurface,
290 entity_path: &'static str,
291 schema_fingerprint: CommitSchemaFingerprint,
292 sql: String,
293}
294
295#[derive(Clone, Debug, Eq, Hash, PartialEq)]
296pub(in crate::db) struct SqlSelectPlanCacheKey {
297 cache_method_version: u8,
298 compiled: SqlCompiledCommandCacheKey,
299 visibility: crate::db::session::query::QueryPlanVisibility,
300}
301
302#[derive(Clone, Debug)]
312pub(in crate::db) struct SqlSelectPlanCacheEntry {
313 prepared_plan: SharedPreparedExecutionPlan,
314 columns: Vec<String>,
315 fixed_scales: Vec<Option<u32>>,
316}
317
318impl SqlSelectPlanCacheEntry {
319 #[must_use]
320 pub(in crate::db) const fn new(
321 prepared_plan: SharedPreparedExecutionPlan,
322 columns: Vec<String>,
323 fixed_scales: Vec<Option<u32>>,
324 ) -> Self {
325 Self {
326 prepared_plan,
327 columns,
328 fixed_scales,
329 }
330 }
331
332 #[must_use]
333 pub(in crate::db) fn into_parts(
334 self,
335 ) -> (SharedPreparedExecutionPlan, Vec<String>, Vec<Option<u32>>) {
336 (self.prepared_plan, self.columns, self.fixed_scales)
337 }
338}
339
340impl SqlCompiledCommandCacheKey {
341 fn query_for_entity<E>(sql: &str) -> Self
342 where
343 E: PersistedRow + EntityValue,
344 {
345 Self::for_entity::<E>(SqlCompiledCommandSurface::Query, sql)
346 }
347
348 fn update_for_entity<E>(sql: &str) -> Self
349 where
350 E: PersistedRow + EntityValue,
351 {
352 Self::for_entity::<E>(SqlCompiledCommandSurface::Update, sql)
353 }
354
355 fn for_entity<E>(surface: SqlCompiledCommandSurface, sql: &str) -> Self
356 where
357 E: PersistedRow + EntityValue,
358 {
359 Self {
360 cache_method_version: SQL_COMPILED_COMMAND_CACHE_METHOD_VERSION,
361 surface,
362 entity_path: E::PATH,
363 schema_fingerprint: commit_schema_fingerprint_for_entity::<E>(),
364 sql: sql.to_string(),
365 }
366 }
367
368 #[must_use]
369 pub(in crate::db) const fn schema_fingerprint(&self) -> CommitSchemaFingerprint {
370 self.schema_fingerprint
371 }
372}
373
374impl SqlSelectPlanCacheKey {
375 const fn from_compiled_key(
376 compiled: SqlCompiledCommandCacheKey,
377 visibility: crate::db::session::query::QueryPlanVisibility,
378 ) -> Self {
379 Self {
380 cache_method_version: SQL_SELECT_PLAN_CACHE_METHOD_VERSION,
381 compiled,
382 visibility,
383 }
384 }
385}
386
387#[cfg(test)]
388impl SqlCompiledCommandCacheKey {
389 pub(in crate::db) fn query_for_entity_with_method_version<E>(
390 sql: &str,
391 cache_method_version: u8,
392 ) -> Self
393 where
394 E: PersistedRow + EntityValue,
395 {
396 Self::for_entity_with_method_version::<E>(
397 SqlCompiledCommandSurface::Query,
398 sql,
399 cache_method_version,
400 )
401 }
402
403 pub(in crate::db) fn update_for_entity_with_method_version<E>(
404 sql: &str,
405 cache_method_version: u8,
406 ) -> Self
407 where
408 E: PersistedRow + EntityValue,
409 {
410 Self::for_entity_with_method_version::<E>(
411 SqlCompiledCommandSurface::Update,
412 sql,
413 cache_method_version,
414 )
415 }
416
417 fn for_entity_with_method_version<E>(
418 surface: SqlCompiledCommandSurface,
419 sql: &str,
420 cache_method_version: u8,
421 ) -> Self
422 where
423 E: PersistedRow + EntityValue,
424 {
425 Self {
426 cache_method_version,
427 surface,
428 entity_path: E::PATH,
429 schema_fingerprint: commit_schema_fingerprint_for_entity::<E>(),
430 sql: sql.to_string(),
431 }
432 }
433}
434
435#[cfg(test)]
436impl SqlSelectPlanCacheKey {
437 pub(in crate::db) const fn from_compiled_key_with_method_version(
438 compiled: SqlCompiledCommandCacheKey,
439 visibility: crate::db::session::query::QueryPlanVisibility,
440 cache_method_version: u8,
441 ) -> Self {
442 Self {
443 cache_method_version,
444 compiled,
445 visibility,
446 }
447 }
448}
449
450pub(in crate::db) type SqlCompiledCommandCache =
451 HashMap<SqlCompiledCommandCacheKey, CompiledSqlCommand, CacheBuildHasher>;
452pub(in crate::db) type SqlSelectPlanCache =
453 HashMap<SqlSelectPlanCacheKey, SqlSelectPlanCacheEntry, CacheBuildHasher>;
454
455thread_local! {
456 static SQL_COMPILED_COMMAND_CACHES: RefCell<HashMap<usize, SqlCompiledCommandCache, CacheBuildHasher>> =
460 RefCell::new(HashMap::default());
461 static SQL_SELECT_PLAN_CACHES: RefCell<HashMap<usize, SqlSelectPlanCache, CacheBuildHasher>> =
462 RefCell::new(HashMap::default());
463}
464
465#[derive(Clone, Debug)]
469pub(in crate::db) enum CompiledSqlCommand {
470 Select {
471 query: StructuralQuery,
472 compiled_cache_key: Option<SqlCompiledCommandCacheKey>,
473 },
474 Delete {
475 query: LoweredBaseQueryShape,
476 statement: SqlDeleteStatement,
477 },
478 GlobalAggregate {
479 command: SqlGlobalAggregateCommandCore,
480 },
481 Explain(LoweredSqlCommand),
482 Insert(SqlInsertStatement),
483 Update(SqlUpdateStatement),
484 DescribeEntity,
485 ShowIndexesEntity,
486 ShowColumnsEntity,
487 ShowEntities,
488}
489
490pub(in crate::db) fn parse_sql_statement(sql: &str) -> Result<SqlStatement, QueryError> {
493 parse_sql(sql).map_err(QueryError::from_sql_parse_error)
494}
495
496#[cfg(feature = "diagnostics")]
497#[expect(
498 clippy::missing_const_for_fn,
499 reason = "the wasm32 branch reads the runtime performance counter and cannot be const"
500)]
501fn read_sql_local_instruction_counter() -> u64 {
502 #[cfg(target_arch = "wasm32")]
503 {
504 canic_cdk::api::performance_counter(1)
505 }
506
507 #[cfg(not(target_arch = "wasm32"))]
508 {
509 0
510 }
511}
512
513#[cfg(feature = "diagnostics")]
514fn measure_sql_stage<T, E>(run: impl FnOnce() -> Result<T, E>) -> (u64, Result<T, E>) {
515 let start = read_sql_local_instruction_counter();
516 let result = run();
517 let delta = read_sql_local_instruction_counter().saturating_sub(start);
518
519 (delta, result)
520}
521
522impl<C: CanisterKind> DbSession<C> {
523 fn sql_cache_scope_id(&self) -> usize {
524 self.db.cache_scope_id()
525 }
526
527 fn with_sql_compiled_command_cache<R>(
528 &self,
529 f: impl FnOnce(&mut SqlCompiledCommandCache) -> R,
530 ) -> R {
531 let scope_id = self.sql_cache_scope_id();
532
533 SQL_COMPILED_COMMAND_CACHES.with(|caches| {
534 let mut caches = caches.borrow_mut();
535 let cache = caches.entry(scope_id).or_default();
536
537 f(cache)
538 })
539 }
540
541 fn with_sql_select_plan_cache<R>(&self, f: impl FnOnce(&mut SqlSelectPlanCache) -> R) -> R {
542 let scope_id = self.sql_cache_scope_id();
543
544 SQL_SELECT_PLAN_CACHES.with(|caches| {
545 let mut caches = caches.borrow_mut();
546 let cache = caches.entry(scope_id).or_default();
547
548 f(cache)
549 })
550 }
551
552 #[cfg(test)]
553 pub(in crate::db) fn sql_compiled_command_cache_len(&self) -> usize {
554 self.with_sql_compiled_command_cache(|cache| cache.len())
555 }
556
557 #[cfg(test)]
558 pub(in crate::db) fn sql_select_plan_cache_len(&self) -> usize {
559 self.with_sql_select_plan_cache(|cache| cache.len())
560 }
561
562 #[cfg(test)]
563 pub(in crate::db) fn clear_sql_caches_for_tests(&self) {
564 self.with_sql_compiled_command_cache(SqlCompiledCommandCache::clear);
565 self.with_sql_select_plan_cache(SqlSelectPlanCache::clear);
566 }
567
568 fn planned_sql_select_with_visibility(
569 &self,
570 query: &StructuralQuery,
571 authority: EntityAuthority,
572 compiled_cache_key: Option<&SqlCompiledCommandCacheKey>,
573 ) -> Result<(SqlSelectPlanCacheEntry, SqlCacheAttribution), QueryError> {
574 let visibility = self.query_plan_visibility_for_store_path(authority.store_path())?;
575 let fallback_schema_fingerprint = crate::db::schema::commit_schema_fingerprint_for_model(
576 authority.model().path,
577 authority.model(),
578 );
579 let cache_schema_fingerprint = compiled_cache_key.map_or(
580 fallback_schema_fingerprint,
581 SqlCompiledCommandCacheKey::schema_fingerprint,
582 );
583
584 let Some(compiled_cache_key) = compiled_cache_key else {
585 let (entry, cache_attribution) = self.cached_query_plan_entry_for_authority(
586 authority,
587 cache_schema_fingerprint,
588 query,
589 )?;
590 let projection = entry.logical_plan().projection_spec(authority.model());
591 let columns = projection_labels_from_projection_spec(&projection);
592 let fixed_scales = projection_fixed_scales_from_projection_spec(&projection);
593
594 return Ok((
595 SqlSelectPlanCacheEntry::new(entry.prepared_plan().clone(), columns, fixed_scales),
596 SqlCacheAttribution::from_shared_query_plan_cache(cache_attribution),
597 ));
598 };
599
600 let plan_cache_key =
601 SqlSelectPlanCacheKey::from_compiled_key(compiled_cache_key.clone(), visibility);
602 {
603 let cached =
604 self.with_sql_select_plan_cache(|cache| cache.get(&plan_cache_key).cloned());
605 if let Some(plan) = cached {
606 return Ok((plan, SqlCacheAttribution::sql_select_plan_cache_hit()));
607 }
608 }
609
610 let (entry, cache_attribution) =
611 self.cached_query_plan_entry_for_authority(authority, cache_schema_fingerprint, query)?;
612 let projection = entry.logical_plan().projection_spec(authority.model());
613 let columns = projection_labels_from_projection_spec(&projection);
614 let fixed_scales = projection_fixed_scales_from_projection_spec(&projection);
615 let entry =
616 SqlSelectPlanCacheEntry::new(entry.prepared_plan().clone(), columns, fixed_scales);
617 self.with_sql_select_plan_cache(|cache| {
618 cache.insert(plan_cache_key, entry.clone());
619 });
620
621 Ok((
622 entry,
623 SqlCacheAttribution::sql_select_plan_cache_miss().merge(
624 SqlCacheAttribution::from_shared_query_plan_cache(cache_attribution),
625 ),
626 ))
627 }
628
629 pub(in crate::db::session::sql) fn build_structural_plan_with_visible_indexes_for_authority(
632 &self,
633 query: StructuralQuery,
634 authority: EntityAuthority,
635 ) -> Result<(VisibleIndexes<'_>, AccessPlannedQuery), QueryError> {
636 let visible_indexes =
637 self.visible_indexes_for_store_model(authority.store_path(), authority.model())?;
638 let plan = query.build_plan_with_visible_indexes(&visible_indexes)?;
639
640 Ok((visible_indexes, plan))
641 }
642
643 fn ensure_sql_query_statement_supported(statement: &SqlStatement) -> Result<(), QueryError> {
646 match statement {
647 SqlStatement::Select(_)
648 | SqlStatement::Explain(_)
649 | SqlStatement::Describe(_)
650 | SqlStatement::ShowIndexes(_)
651 | SqlStatement::ShowColumns(_)
652 | SqlStatement::ShowEntities(_) => Ok(()),
653 SqlStatement::Insert(_) => Err(QueryError::unsupported_query(
654 "execute_sql_query rejects INSERT; use execute_sql_update::<E>()",
655 )),
656 SqlStatement::Update(_) => Err(QueryError::unsupported_query(
657 "execute_sql_query rejects UPDATE; use execute_sql_update::<E>()",
658 )),
659 SqlStatement::Delete(_) => Err(QueryError::unsupported_query(
660 "execute_sql_query rejects DELETE; use execute_sql_update::<E>()",
661 )),
662 }
663 }
664
665 fn ensure_sql_update_statement_supported(statement: &SqlStatement) -> Result<(), QueryError> {
668 match statement {
669 SqlStatement::Insert(_) | SqlStatement::Update(_) | SqlStatement::Delete(_) => Ok(()),
670 SqlStatement::Select(_) => Err(QueryError::unsupported_query(
671 "execute_sql_update rejects SELECT; use execute_sql_query::<E>()",
672 )),
673 SqlStatement::Explain(_) => Err(QueryError::unsupported_query(
674 "execute_sql_update rejects EXPLAIN; use execute_sql_query::<E>()",
675 )),
676 SqlStatement::Describe(_) => Err(QueryError::unsupported_query(
677 "execute_sql_update rejects DESCRIBE; use execute_sql_query::<E>()",
678 )),
679 SqlStatement::ShowIndexes(_) => Err(QueryError::unsupported_query(
680 "execute_sql_update rejects SHOW INDEXES; use execute_sql_query::<E>()",
681 )),
682 SqlStatement::ShowColumns(_) => Err(QueryError::unsupported_query(
683 "execute_sql_update rejects SHOW COLUMNS; use execute_sql_query::<E>()",
684 )),
685 SqlStatement::ShowEntities(_) => Err(QueryError::unsupported_query(
686 "execute_sql_update rejects SHOW ENTITIES; use execute_sql_query::<E>()",
687 )),
688 }
689 }
690
691 pub fn execute_sql_query<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
696 where
697 E: PersistedRow<Canister = C> + EntityValue,
698 {
699 let compiled = self.compile_sql_query::<E>(sql)?;
700
701 self.execute_compiled_sql::<E>(&compiled)
702 }
703
704 #[cfg(feature = "diagnostics")]
707 #[doc(hidden)]
708 pub fn execute_sql_query_with_attribution<E>(
709 &self,
710 sql: &str,
711 ) -> Result<(SqlStatementResult, SqlQueryExecutionAttribution), QueryError>
712 where
713 E: PersistedRow<Canister = C> + EntityValue,
714 {
715 let (compile_local_instructions, compiled) =
718 measure_sql_stage(|| self.compile_sql_query_with_cache_attribution::<E>(sql));
719 let (compiled, compile_cache_attribution) = compiled?;
720
721 let store_get_calls_before = DataStore::current_get_call_count();
724 let pure_covering_decode_before = current_pure_covering_decode_local_instructions();
725 let pure_covering_row_assembly_before =
726 current_pure_covering_row_assembly_local_instructions();
727 let (result, execute_cache_attribution, execute_phase_attribution) =
728 self.execute_compiled_sql_with_phase_attribution::<E>(&compiled)?;
729 let store_get_calls =
730 DataStore::current_get_call_count().saturating_sub(store_get_calls_before);
731 let pure_covering_decode_local_instructions =
732 current_pure_covering_decode_local_instructions()
733 .saturating_sub(pure_covering_decode_before);
734 let pure_covering_row_assembly_local_instructions =
735 current_pure_covering_row_assembly_local_instructions()
736 .saturating_sub(pure_covering_row_assembly_before);
737 let execute_local_instructions = execute_phase_attribution
738 .planner_local_instructions
739 .saturating_add(execute_phase_attribution.store_local_instructions)
740 .saturating_add(execute_phase_attribution.executor_local_instructions);
741 let cache_attribution = compile_cache_attribution.merge(execute_cache_attribution);
742 let total_local_instructions =
743 compile_local_instructions.saturating_add(execute_local_instructions);
744
745 Ok((
746 result,
747 SqlQueryExecutionAttribution {
748 compile_local_instructions,
749 planner_local_instructions: execute_phase_attribution.planner_local_instructions,
750 store_local_instructions: execute_phase_attribution.store_local_instructions,
751 executor_local_instructions: execute_phase_attribution.executor_local_instructions,
752 grouped_stream_local_instructions: execute_phase_attribution
753 .grouped_stream_local_instructions,
754 grouped_fold_local_instructions: execute_phase_attribution
755 .grouped_fold_local_instructions,
756 grouped_finalize_local_instructions: execute_phase_attribution
757 .grouped_finalize_local_instructions,
758 grouped_count_borrowed_hash_computations: execute_phase_attribution
759 .grouped_count
760 .borrowed_hash_computations,
761 grouped_count_bucket_candidate_checks: execute_phase_attribution
762 .grouped_count
763 .bucket_candidate_checks,
764 grouped_count_existing_group_hits: execute_phase_attribution
765 .grouped_count
766 .existing_group_hits,
767 grouped_count_new_group_inserts: execute_phase_attribution
768 .grouped_count
769 .new_group_inserts,
770 grouped_count_row_materialization_local_instructions: execute_phase_attribution
771 .grouped_count
772 .row_materialization_local_instructions,
773 grouped_count_group_lookup_local_instructions: execute_phase_attribution
774 .grouped_count
775 .group_lookup_local_instructions,
776 grouped_count_existing_group_update_local_instructions: execute_phase_attribution
777 .grouped_count
778 .existing_group_update_local_instructions,
779 grouped_count_new_group_insert_local_instructions: execute_phase_attribution
780 .grouped_count
781 .new_group_insert_local_instructions,
782 pure_covering_decode_local_instructions,
783 pure_covering_row_assembly_local_instructions,
784 store_get_calls,
785 response_decode_local_instructions: 0,
786 execute_local_instructions,
787 total_local_instructions,
788 sql_compiled_command_cache_hits: cache_attribution.sql_compiled_command_cache_hits,
789 sql_compiled_command_cache_misses: cache_attribution
790 .sql_compiled_command_cache_misses,
791 sql_select_plan_cache_hits: cache_attribution.sql_select_plan_cache_hits,
792 sql_select_plan_cache_misses: cache_attribution.sql_select_plan_cache_misses,
793 shared_query_plan_cache_hits: cache_attribution.shared_query_plan_cache_hits,
794 shared_query_plan_cache_misses: cache_attribution.shared_query_plan_cache_misses,
795 },
796 ))
797 }
798
799 pub fn execute_sql_update<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
804 where
805 E: PersistedRow<Canister = C> + EntityValue,
806 {
807 let compiled = self.compile_sql_update::<E>(sql)?;
808
809 self.execute_compiled_sql::<E>(&compiled)
810 }
811
812 #[cfg(test)]
813 pub(in crate::db) fn execute_grouped_sql_query_for_tests<E>(
814 &self,
815 sql: &str,
816 cursor_token: Option<&str>,
817 ) -> Result<PagedGroupedExecutionWithTrace, QueryError>
818 where
819 E: PersistedRow<Canister = C> + EntityValue,
820 {
821 let parsed = parse_sql_statement(sql)?;
822
823 let lowered = lower_sql_command_from_prepared_statement(
824 prepare_sql_statement(parsed, E::MODEL.name())
825 .map_err(QueryError::from_sql_lowering_error)?,
826 E::MODEL,
827 )
828 .map_err(QueryError::from_sql_lowering_error)?;
829 let Some(query) = lowered.query().cloned() else {
830 return Err(QueryError::unsupported_query(
831 "grouped SELECT helper requires grouped SELECT",
832 ));
833 };
834 let query = bind_lowered_sql_query::<E>(query, MissingRowPolicy::Ignore)
835 .map_err(QueryError::from_sql_lowering_error)?;
836 if !query.has_grouping() {
837 return Err(QueryError::unsupported_query(
838 "grouped SELECT helper requires grouped SELECT",
839 ));
840 }
841
842 self.execute_grouped(&query, cursor_token)
843 }
844
845 pub(in crate::db) fn compile_sql_query<E>(
848 &self,
849 sql: &str,
850 ) -> Result<CompiledSqlCommand, QueryError>
851 where
852 E: PersistedRow<Canister = C> + EntityValue,
853 {
854 self.compile_sql_query_with_cache_attribution::<E>(sql)
855 .map(|(compiled, _)| compiled)
856 }
857
858 fn compile_sql_query_with_cache_attribution<E>(
859 &self,
860 sql: &str,
861 ) -> Result<(CompiledSqlCommand, SqlCacheAttribution), QueryError>
862 where
863 E: PersistedRow<Canister = C> + EntityValue,
864 {
865 self.compile_sql_statement_with_cache::<E>(
866 SqlCompiledCommandCacheKey::query_for_entity::<E>(sql),
867 sql,
868 Self::ensure_sql_query_statement_supported,
869 )
870 }
871
872 pub(in crate::db) fn compile_sql_update<E>(
875 &self,
876 sql: &str,
877 ) -> Result<CompiledSqlCommand, QueryError>
878 where
879 E: PersistedRow<Canister = C> + EntityValue,
880 {
881 self.compile_sql_update_with_cache_attribution::<E>(sql)
882 .map(|(compiled, _)| compiled)
883 }
884
885 fn compile_sql_update_with_cache_attribution<E>(
886 &self,
887 sql: &str,
888 ) -> Result<(CompiledSqlCommand, SqlCacheAttribution), QueryError>
889 where
890 E: PersistedRow<Canister = C> + EntityValue,
891 {
892 self.compile_sql_statement_with_cache::<E>(
893 SqlCompiledCommandCacheKey::update_for_entity::<E>(sql),
894 sql,
895 Self::ensure_sql_update_statement_supported,
896 )
897 }
898
899 fn compile_sql_statement_with_cache<E>(
902 &self,
903 cache_key: SqlCompiledCommandCacheKey,
904 sql: &str,
905 ensure_surface_supported: fn(&SqlStatement) -> Result<(), QueryError>,
906 ) -> Result<(CompiledSqlCommand, SqlCacheAttribution), QueryError>
907 where
908 E: PersistedRow<Canister = C> + EntityValue,
909 {
910 {
911 let cached =
912 self.with_sql_compiled_command_cache(|cache| cache.get(&cache_key).cloned());
913 if let Some(compiled) = cached {
914 return Ok((
915 compiled,
916 SqlCacheAttribution::sql_compiled_command_cache_hit(),
917 ));
918 }
919 }
920
921 let parsed = parse_sql_statement(sql)?;
922 ensure_surface_supported(&parsed)?;
923 let mut compiled = Self::compile_sql_statement_inner::<E>(&parsed)?;
924 if let CompiledSqlCommand::Select {
925 compiled_cache_key, ..
926 } = &mut compiled
927 {
928 *compiled_cache_key = Some(cache_key.clone());
929 }
930
931 self.with_sql_compiled_command_cache(|cache| {
932 cache.insert(cache_key, compiled.clone());
933 });
934
935 Ok((
936 compiled,
937 SqlCacheAttribution::sql_compiled_command_cache_miss(),
938 ))
939 }
940
941 pub(in crate::db) fn compile_sql_statement_inner<E>(
944 sql_statement: &SqlStatement,
945 ) -> Result<CompiledSqlCommand, QueryError>
946 where
947 E: PersistedRow<Canister = C> + EntityValue,
948 {
949 Self::compile_sql_statement_for_authority(sql_statement, EntityAuthority::for_type::<E>())
950 }
951}