1mod execute;
8mod projection;
9
10#[cfg(feature = "diagnostics")]
11use candid::CandidType;
12#[cfg(feature = "diagnostics")]
13use serde::Deserialize;
14use std::{cell::RefCell, collections::HashMap, sync::Arc};
15
16const SQL_COMPILED_COMMAND_CACHE_METHOD_VERSION: u8 = 1;
22
23#[cfg(feature = "diagnostics")]
24use crate::db::DataStore;
25#[cfg(feature = "diagnostics")]
26use crate::db::executor::GroupedCountAttribution;
27#[cfg(feature = "diagnostics")]
28use crate::db::session::sql::projection::{
29 current_pure_covering_decode_local_instructions,
30 current_pure_covering_row_assembly_local_instructions,
31};
32#[cfg(test)]
33use crate::db::sql::parser::parse_sql;
34use crate::db::sql::parser::{SqlInsertStatement, SqlReturningProjection, SqlUpdateStatement};
35use crate::{
36 db::{
37 DbSession, GroupedRow, MissingRowPolicy, PersistedRow, QueryError,
38 commit::CommitSchemaFingerprint,
39 executor::{EntityAuthority, SharedPreparedExecutionPlan},
40 query::intent::StructuralQuery,
41 schema::commit_schema_fingerprint_for_entity,
42 session::query::QueryPlanCacheAttribution,
43 session::sql::projection::{
44 projection_fixed_scales_from_projection_spec, projection_labels_from_projection_spec,
45 },
46 sql::identifier::identifiers_tail_match,
47 sql::lowering::{
48 LoweredSqlCommand, SqlGlobalAggregateCommandCore, SqlLoweringError,
49 bind_lowered_sql_delete_query_structural, bind_lowered_sql_select_query_structural,
50 compile_sql_global_aggregate_command_core_from_prepared,
51 extract_prepared_sql_insert_statement, extract_prepared_sql_update_statement,
52 lower_prepared_sql_delete_statement, lower_prepared_sql_select_statement,
53 lower_sql_command_from_prepared_statement, prepare_sql_statement,
54 },
55 sql::parser::{SqlStatement, parse_sql_with_attribution},
56 },
57 traits::{CanisterKind, EntityValue},
58 value::OutputValue,
59};
60
61#[cfg(all(test, not(feature = "diagnostics")))]
62pub(crate) use crate::db::session::sql::projection::with_sql_projection_materialization_metrics;
63#[cfg(feature = "diagnostics")]
64pub use crate::db::session::sql::projection::{
65 SqlProjectionMaterializationMetrics, with_sql_projection_materialization_metrics,
66};
67
68#[derive(Debug)]
70pub enum SqlStatementResult {
71 Count {
72 row_count: u32,
73 },
74 Projection {
75 columns: Vec<String>,
76 fixed_scales: Vec<Option<u32>>,
77 rows: Vec<Vec<OutputValue>>,
78 row_count: u32,
79 },
80 ProjectionText {
81 columns: Vec<String>,
82 rows: Vec<Vec<String>>,
83 row_count: u32,
84 },
85 Grouped {
86 columns: Vec<String>,
87 fixed_scales: Vec<Option<u32>>,
88 rows: Vec<GroupedRow>,
89 row_count: u32,
90 next_cursor: Option<String>,
91 },
92 Explain(String),
93 Describe(crate::db::EntitySchemaDescription),
94 ShowIndexes(Vec<String>),
95 ShowColumns(Vec<crate::db::EntityFieldDescription>),
96 ShowEntities(Vec<String>),
97}
98
99#[cfg(feature = "diagnostics")]
110#[derive(CandidType, Clone, Debug, Deserialize, Eq, PartialEq)]
111pub struct SqlQueryExecutionAttribution {
112 pub compile_local_instructions: u64,
113 pub compile_cache_key_local_instructions: u64,
114 pub compile_cache_lookup_local_instructions: u64,
115 pub compile_parse_local_instructions: u64,
116 pub compile_parse_tokenize_local_instructions: u64,
117 pub compile_parse_select_local_instructions: u64,
118 pub compile_parse_expr_local_instructions: u64,
119 pub compile_parse_predicate_local_instructions: u64,
120 pub compile_aggregate_lane_check_local_instructions: u64,
121 pub compile_prepare_local_instructions: u64,
122 pub compile_lower_local_instructions: u64,
123 pub compile_bind_local_instructions: u64,
124 pub compile_cache_insert_local_instructions: u64,
125 pub planner_local_instructions: u64,
126 pub store_local_instructions: u64,
127 pub executor_local_instructions: u64,
128 pub grouped_stream_local_instructions: u64,
129 pub grouped_fold_local_instructions: u64,
130 pub grouped_finalize_local_instructions: u64,
131 pub grouped_count_borrowed_hash_computations: u64,
132 pub grouped_count_bucket_candidate_checks: u64,
133 pub grouped_count_existing_group_hits: u64,
134 pub grouped_count_new_group_inserts: u64,
135 pub grouped_count_row_materialization_local_instructions: u64,
136 pub grouped_count_group_lookup_local_instructions: u64,
137 pub grouped_count_existing_group_update_local_instructions: u64,
138 pub grouped_count_new_group_insert_local_instructions: u64,
139 pub pure_covering_decode_local_instructions: u64,
140 pub pure_covering_row_assembly_local_instructions: u64,
141 pub store_get_calls: u64,
142 pub response_decode_local_instructions: u64,
143 pub execute_local_instructions: u64,
144 pub total_local_instructions: u64,
145 pub sql_compiled_command_cache_hits: u64,
146 pub sql_compiled_command_cache_misses: u64,
147 pub shared_query_plan_cache_hits: u64,
148 pub shared_query_plan_cache_misses: u64,
149}
150
151#[cfg(feature = "diagnostics")]
155#[derive(Clone, Copy, Debug, Eq, PartialEq)]
156pub(in crate::db) struct SqlExecutePhaseAttribution {
157 pub planner_local_instructions: u64,
158 pub store_local_instructions: u64,
159 pub executor_local_instructions: u64,
160 pub grouped_stream_local_instructions: u64,
161 pub grouped_fold_local_instructions: u64,
162 pub grouped_finalize_local_instructions: u64,
163 pub grouped_count: GroupedCountAttribution,
164}
165
166#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
178pub(in crate::db) struct SqlCompilePhaseAttribution {
179 pub cache_key: u64,
180 pub cache_lookup: u64,
181 pub parse: u64,
182 pub parse_tokenize: u64,
183 pub parse_select: u64,
184 pub parse_expr: u64,
185 pub parse_predicate: u64,
186 pub aggregate_lane_check: u64,
187 pub prepare: u64,
188 pub lower: u64,
189 pub bind: u64,
190 pub cache_insert: u64,
191}
192
193impl SqlCompilePhaseAttribution {
194 #[must_use]
195 const fn cache_hit(cache_key: u64, cache_lookup: u64) -> Self {
196 Self {
197 cache_key,
198 cache_lookup,
199 parse: 0,
200 parse_tokenize: 0,
201 parse_select: 0,
202 parse_expr: 0,
203 parse_predicate: 0,
204 aggregate_lane_check: 0,
205 prepare: 0,
206 lower: 0,
207 bind: 0,
208 cache_insert: 0,
209 }
210 }
211}
212
213#[cfg(feature = "diagnostics")]
214impl SqlExecutePhaseAttribution {
215 #[must_use]
216 pub(in crate::db) const fn from_execute_total_and_store_total(
217 execute_local_instructions: u64,
218 store_local_instructions: u64,
219 ) -> Self {
220 Self {
221 planner_local_instructions: 0,
222 store_local_instructions,
223 executor_local_instructions: execute_local_instructions
224 .saturating_sub(store_local_instructions),
225 grouped_stream_local_instructions: 0,
226 grouped_fold_local_instructions: 0,
227 grouped_finalize_local_instructions: 0,
228 grouped_count: GroupedCountAttribution::none(),
229 }
230 }
231}
232
233#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
239pub(in crate::db) struct SqlCacheAttribution {
240 pub sql_compiled_command_cache_hits: u64,
241 pub sql_compiled_command_cache_misses: u64,
242 pub shared_query_plan_cache_hits: u64,
243 pub shared_query_plan_cache_misses: u64,
244}
245
246impl SqlCacheAttribution {
247 #[must_use]
248 const fn none() -> Self {
249 Self {
250 sql_compiled_command_cache_hits: 0,
251 sql_compiled_command_cache_misses: 0,
252 shared_query_plan_cache_hits: 0,
253 shared_query_plan_cache_misses: 0,
254 }
255 }
256
257 #[must_use]
258 const fn sql_compiled_command_cache_hit() -> Self {
259 Self {
260 sql_compiled_command_cache_hits: 1,
261 ..Self::none()
262 }
263 }
264
265 #[must_use]
266 const fn sql_compiled_command_cache_miss() -> Self {
267 Self {
268 sql_compiled_command_cache_misses: 1,
269 ..Self::none()
270 }
271 }
272
273 #[must_use]
274 const fn from_shared_query_plan_cache(attribution: QueryPlanCacheAttribution) -> Self {
275 Self {
276 shared_query_plan_cache_hits: attribution.hits,
277 shared_query_plan_cache_misses: attribution.misses,
278 ..Self::none()
279 }
280 }
281
282 #[must_use]
283 const fn merge(self, other: Self) -> Self {
284 Self {
285 sql_compiled_command_cache_hits: self
286 .sql_compiled_command_cache_hits
287 .saturating_add(other.sql_compiled_command_cache_hits),
288 sql_compiled_command_cache_misses: self
289 .sql_compiled_command_cache_misses
290 .saturating_add(other.sql_compiled_command_cache_misses),
291 shared_query_plan_cache_hits: self
292 .shared_query_plan_cache_hits
293 .saturating_add(other.shared_query_plan_cache_hits),
294 shared_query_plan_cache_misses: self
295 .shared_query_plan_cache_misses
296 .saturating_add(other.shared_query_plan_cache_misses),
297 }
298 }
299}
300
301#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
302enum SqlCompiledCommandSurface {
303 Query,
304 Update,
305}
306
307#[derive(Clone, Debug, Eq, Hash, PartialEq)]
318pub(in crate::db) struct SqlCompiledCommandCacheKey {
319 cache_method_version: u8,
320 surface: SqlCompiledCommandSurface,
321 entity_path: &'static str,
322 schema_fingerprint: CommitSchemaFingerprint,
323 sql: String,
324}
325
326#[derive(Clone, Debug)]
336pub(in crate::db) struct SqlProjectionContract {
337 columns: Vec<String>,
338 fixed_scales: Vec<Option<u32>>,
339}
340
341impl SqlProjectionContract {
342 #[must_use]
343 pub(in crate::db) const fn new(columns: Vec<String>, fixed_scales: Vec<Option<u32>>) -> Self {
344 Self {
345 columns,
346 fixed_scales,
347 }
348 }
349
350 #[must_use]
351 pub(in crate::db) fn into_parts(self) -> (Vec<String>, Vec<Option<u32>>) {
352 (self.columns, self.fixed_scales)
353 }
354}
355
356impl SqlCompiledCommandCacheKey {
357 fn for_entity<E>(surface: SqlCompiledCommandSurface, sql: &str) -> Self
358 where
359 E: PersistedRow + EntityValue,
360 {
361 Self {
362 cache_method_version: SQL_COMPILED_COMMAND_CACHE_METHOD_VERSION,
363 surface,
364 entity_path: E::PATH,
365 schema_fingerprint: commit_schema_fingerprint_for_entity::<E>(),
366 sql: sql.to_string(),
367 }
368 }
369
370 #[must_use]
371 pub(in crate::db) const fn schema_fingerprint(&self) -> CommitSchemaFingerprint {
372 self.schema_fingerprint
373 }
374}
375
376#[cfg(test)]
377impl SqlCompiledCommandCacheKey {
378 pub(in crate::db) fn query_for_entity_with_method_version<E>(
379 sql: &str,
380 cache_method_version: u8,
381 ) -> Self
382 where
383 E: PersistedRow + EntityValue,
384 {
385 Self::for_entity_with_method_version::<E>(
386 SqlCompiledCommandSurface::Query,
387 sql,
388 cache_method_version,
389 )
390 }
391
392 pub(in crate::db) fn update_for_entity_with_method_version<E>(
393 sql: &str,
394 cache_method_version: u8,
395 ) -> Self
396 where
397 E: PersistedRow + EntityValue,
398 {
399 Self::for_entity_with_method_version::<E>(
400 SqlCompiledCommandSurface::Update,
401 sql,
402 cache_method_version,
403 )
404 }
405
406 fn for_entity_with_method_version<E>(
407 surface: SqlCompiledCommandSurface,
408 sql: &str,
409 cache_method_version: u8,
410 ) -> Self
411 where
412 E: PersistedRow + EntityValue,
413 {
414 Self {
415 cache_method_version,
416 surface,
417 entity_path: E::PATH,
418 schema_fingerprint: commit_schema_fingerprint_for_entity::<E>(),
419 sql: sql.to_string(),
420 }
421 }
422}
423
424pub(in crate::db) type SqlCompiledCommandCache =
425 HashMap<SqlCompiledCommandCacheKey, CompiledSqlCommand>;
426
427thread_local! {
428 static SQL_COMPILED_COMMAND_CACHES: RefCell<HashMap<usize, SqlCompiledCommandCache>> =
432 RefCell::new(HashMap::default());
433}
434
435#[derive(Clone, Debug)]
439pub(in crate::db) enum CompiledSqlCommand {
440 Select {
441 query: Arc<StructuralQuery>,
442 compiled_cache_key: SqlCompiledCommandCacheKey,
443 },
444 Delete {
445 query: Arc<StructuralQuery>,
446 returning: Option<SqlReturningProjection>,
447 },
448 GlobalAggregate {
449 command: Box<SqlGlobalAggregateCommandCore>,
450 },
451 Explain(Box<LoweredSqlCommand>),
452 Insert(SqlInsertStatement),
453 Update(SqlUpdateStatement),
454 DescribeEntity,
455 ShowIndexesEntity,
456 ShowColumnsEntity,
457 ShowEntities,
458}
459
460#[cfg(test)]
463pub(in crate::db) fn parse_sql_statement(sql: &str) -> Result<SqlStatement, QueryError> {
464 parse_sql(sql).map_err(QueryError::from_sql_parse_error)
465}
466
467#[cfg(feature = "diagnostics")]
468#[expect(
469 clippy::missing_const_for_fn,
470 reason = "the wasm32 branch reads the runtime performance counter and cannot be const"
471)]
472fn read_sql_local_instruction_counter() -> u64 {
473 #[cfg(all(feature = "diagnostics", target_arch = "wasm32"))]
474 {
475 canic_cdk::api::performance_counter(1)
476 }
477
478 #[cfg(not(all(feature = "diagnostics", target_arch = "wasm32")))]
479 {
480 0
481 }
482}
483
484pub(in crate::db::session::sql) fn measure_sql_stage<T, E>(
485 run: impl FnOnce() -> Result<T, E>,
486) -> (u64, Result<T, E>) {
487 #[cfg(feature = "diagnostics")]
488 let start = read_sql_local_instruction_counter();
489
490 let result = run();
491
492 #[cfg(feature = "diagnostics")]
493 let delta = read_sql_local_instruction_counter().saturating_sub(start);
494
495 #[cfg(not(feature = "diagnostics"))]
496 let delta = 0;
497
498 (delta, result)
499}
500
501impl<C: CanisterKind> DbSession<C> {
502 fn with_sql_compiled_command_cache<R>(
503 &self,
504 f: impl FnOnce(&mut SqlCompiledCommandCache) -> R,
505 ) -> R {
506 let scope_id = self.db.cache_scope_id();
507
508 SQL_COMPILED_COMMAND_CACHES.with(|caches| {
509 let mut caches = caches.borrow_mut();
510 let cache = caches.entry(scope_id).or_default();
511
512 f(cache)
513 })
514 }
515
516 #[cfg(test)]
517 pub(in crate::db) fn sql_compiled_command_cache_len(&self) -> usize {
518 self.with_sql_compiled_command_cache(|cache| cache.len())
519 }
520
521 #[cfg(test)]
522 pub(in crate::db) fn clear_sql_caches_for_tests(&self) {
523 self.with_sql_compiled_command_cache(SqlCompiledCommandCache::clear);
524 }
525
526 #[expect(clippy::too_many_lines)]
529 fn compile_sql_statement_for_authority(
530 statement: &SqlStatement,
531 authority: EntityAuthority,
532 compiled_cache_key: SqlCompiledCommandCacheKey,
533 ) -> Result<(CompiledSqlCommand, u64, u64, u64, u64), QueryError> {
534 let prepare_statement = || {
538 measure_sql_stage(|| {
539 prepare_sql_statement(statement.clone(), authority.model().name())
540 .map_err(QueryError::from_sql_lowering_error)
541 })
542 };
543
544 let validate_metadata_entity = |sql_entity: &str| {
547 if identifiers_tail_match(sql_entity, authority.model().name()) {
548 return Ok(());
549 }
550
551 Err(QueryError::from_sql_lowering_error(
552 SqlLoweringError::EntityMismatch {
553 sql_entity: sql_entity.to_string(),
554 expected_entity: authority.model().name(),
555 },
556 ))
557 };
558
559 match statement {
560 SqlStatement::Select(_) => {
561 let (prepare_local_instructions, prepared) = prepare_statement();
562 let prepared = prepared?;
563 let (aggregate_lane_check_local_instructions, requires_aggregate_lane) =
564 measure_sql_stage(|| {
565 Ok::<_, QueryError>(prepared.statement().is_global_aggregate_lane_shape())
566 });
567 let requires_aggregate_lane = requires_aggregate_lane?;
568
569 if requires_aggregate_lane {
570 let (lower_local_instructions, command) = measure_sql_stage(|| {
571 compile_sql_global_aggregate_command_core_from_prepared(
572 prepared,
573 authority.model(),
574 MissingRowPolicy::Ignore,
575 )
576 .map_err(QueryError::from_sql_lowering_error)
577 });
578 let command = command?;
579
580 Ok((
581 CompiledSqlCommand::GlobalAggregate {
582 command: Box::new(command),
583 },
584 aggregate_lane_check_local_instructions,
585 prepare_local_instructions,
586 lower_local_instructions,
587 0,
588 ))
589 } else {
590 let (lower_local_instructions, select) = measure_sql_stage(|| {
591 lower_prepared_sql_select_statement(prepared, authority.model())
592 .map_err(QueryError::from_sql_lowering_error)
593 });
594 let select = select?;
595 let (bind_local_instructions, query) = measure_sql_stage(|| {
596 bind_lowered_sql_select_query_structural(
597 authority.model(),
598 select,
599 MissingRowPolicy::Ignore,
600 )
601 .map_err(QueryError::from_sql_lowering_error)
602 });
603 let query = query?;
604
605 Ok((
606 CompiledSqlCommand::Select {
607 query: Arc::new(query),
608 compiled_cache_key,
609 },
610 aggregate_lane_check_local_instructions,
611 prepare_local_instructions,
612 lower_local_instructions,
613 bind_local_instructions,
614 ))
615 }
616 }
617 SqlStatement::Delete(_) => {
618 let (prepare_local_instructions, prepared) = prepare_statement();
619 let prepared = prepared?;
620 let (lower_local_instructions, lowered) = measure_sql_stage(|| {
621 lower_prepared_sql_delete_statement(prepared)
622 .map_err(QueryError::from_sql_lowering_error)
623 });
624 let delete = lowered?;
625 let returning = delete.returning().cloned();
626 let query = delete.into_base_query();
627 let (bind_local_instructions, query) = measure_sql_stage(|| {
628 Ok::<_, QueryError>(bind_lowered_sql_delete_query_structural(
629 authority.model(),
630 query,
631 MissingRowPolicy::Ignore,
632 ))
633 });
634 let query = query?;
635
636 Ok((
637 CompiledSqlCommand::Delete {
638 query: Arc::new(query),
639 returning,
640 },
641 0,
642 prepare_local_instructions,
643 lower_local_instructions,
644 bind_local_instructions,
645 ))
646 }
647 SqlStatement::Insert(_) => {
648 let (prepare_local_instructions, prepared) = prepare_statement();
649 let prepared = prepared?;
650 let statement = extract_prepared_sql_insert_statement(prepared)
651 .map_err(QueryError::from_sql_lowering_error)?;
652
653 Ok((
654 CompiledSqlCommand::Insert(statement),
655 0,
656 prepare_local_instructions,
657 0,
658 0,
659 ))
660 }
661 SqlStatement::Update(_) => {
662 let (prepare_local_instructions, prepared) = prepare_statement();
663 let prepared = prepared?;
664 let statement = extract_prepared_sql_update_statement(prepared)
665 .map_err(QueryError::from_sql_lowering_error)?;
666
667 Ok((
668 CompiledSqlCommand::Update(statement),
669 0,
670 prepare_local_instructions,
671 0,
672 0,
673 ))
674 }
675 SqlStatement::Explain(_) => {
676 let (prepare_local_instructions, prepared) = prepare_statement();
677 let prepared = prepared?;
678 let (lower_local_instructions, lowered) = measure_sql_stage(|| {
679 lower_sql_command_from_prepared_statement(prepared, authority.model())
680 .map_err(QueryError::from_sql_lowering_error)
681 });
682 let lowered = lowered?;
683
684 Ok((
685 CompiledSqlCommand::Explain(Box::new(lowered)),
686 0,
687 prepare_local_instructions,
688 lower_local_instructions,
689 0,
690 ))
691 }
692 SqlStatement::Describe(_) => {
693 let (prepare_local_instructions, validated) = measure_sql_stage(|| {
694 let SqlStatement::Describe(statement) = statement else {
695 return Err(QueryError::invariant(
696 "compiled SQL DESCRIBE lane must preserve DESCRIBE statement ownership",
697 ));
698 };
699
700 validate_metadata_entity(statement.entity.as_str())
701 });
702 validated?;
703
704 Ok((
705 CompiledSqlCommand::DescribeEntity,
706 0,
707 prepare_local_instructions,
708 0,
709 0,
710 ))
711 }
712 SqlStatement::ShowIndexes(entity) => {
713 let (prepare_local_instructions, validated) =
714 measure_sql_stage(|| validate_metadata_entity(entity.entity.as_str()));
715 validated?;
716
717 Ok((
718 CompiledSqlCommand::ShowIndexesEntity,
719 0,
720 prepare_local_instructions,
721 0,
722 0,
723 ))
724 }
725 SqlStatement::ShowColumns(entity) => {
726 let (prepare_local_instructions, validated) =
727 measure_sql_stage(|| validate_metadata_entity(entity.entity.as_str()));
728 validated?;
729
730 Ok((
731 CompiledSqlCommand::ShowColumnsEntity,
732 0,
733 prepare_local_instructions,
734 0,
735 0,
736 ))
737 }
738 SqlStatement::ShowEntities(_) => Ok((CompiledSqlCommand::ShowEntities, 0, 0, 0, 0)),
739 }
740 }
741
742 fn sql_select_prepared_plan(
745 &self,
746 query: &StructuralQuery,
747 authority: EntityAuthority,
748 cache_schema_fingerprint: CommitSchemaFingerprint,
749 ) -> Result<
750 (
751 SharedPreparedExecutionPlan,
752 SqlProjectionContract,
753 SqlCacheAttribution,
754 ),
755 QueryError,
756 > {
757 let (prepared_plan, cache_attribution) = self.cached_shared_query_plan_for_authority(
758 authority,
759 cache_schema_fingerprint,
760 query,
761 )?;
762 let projection_spec = prepared_plan
763 .logical_plan()
764 .projection_spec(authority.model());
765 let projection = SqlProjectionContract::new(
766 projection_labels_from_projection_spec(&projection_spec),
767 projection_fixed_scales_from_projection_spec(&projection_spec),
768 );
769
770 Ok((
771 prepared_plan,
772 projection,
773 SqlCacheAttribution::from_shared_query_plan_cache(cache_attribution),
774 ))
775 }
776
777 fn ensure_sql_statement_supported_for_surface(
781 statement: &SqlStatement,
782 surface: SqlCompiledCommandSurface,
783 ) -> Result<(), QueryError> {
784 match (surface, statement) {
785 (
786 SqlCompiledCommandSurface::Query,
787 SqlStatement::Select(_)
788 | SqlStatement::Explain(_)
789 | SqlStatement::Describe(_)
790 | SqlStatement::ShowIndexes(_)
791 | SqlStatement::ShowColumns(_)
792 | SqlStatement::ShowEntities(_),
793 )
794 | (
795 SqlCompiledCommandSurface::Update,
796 SqlStatement::Insert(_) | SqlStatement::Update(_) | SqlStatement::Delete(_),
797 ) => Ok(()),
798 (SqlCompiledCommandSurface::Query, SqlStatement::Insert(_)) => {
799 Err(QueryError::unsupported_query(
800 "execute_sql_query rejects INSERT; use execute_sql_update::<E>()",
801 ))
802 }
803 (SqlCompiledCommandSurface::Query, SqlStatement::Update(_)) => {
804 Err(QueryError::unsupported_query(
805 "execute_sql_query rejects UPDATE; use execute_sql_update::<E>()",
806 ))
807 }
808 (SqlCompiledCommandSurface::Query, SqlStatement::Delete(_)) => {
809 Err(QueryError::unsupported_query(
810 "execute_sql_query rejects DELETE; use execute_sql_update::<E>()",
811 ))
812 }
813 (SqlCompiledCommandSurface::Update, SqlStatement::Select(_)) => {
814 Err(QueryError::unsupported_query(
815 "execute_sql_update rejects SELECT; use execute_sql_query::<E>()",
816 ))
817 }
818 (SqlCompiledCommandSurface::Update, SqlStatement::Explain(_)) => {
819 Err(QueryError::unsupported_query(
820 "execute_sql_update rejects EXPLAIN; use execute_sql_query::<E>()",
821 ))
822 }
823 (SqlCompiledCommandSurface::Update, SqlStatement::Describe(_)) => {
824 Err(QueryError::unsupported_query(
825 "execute_sql_update rejects DESCRIBE; use execute_sql_query::<E>()",
826 ))
827 }
828 (SqlCompiledCommandSurface::Update, SqlStatement::ShowIndexes(_)) => {
829 Err(QueryError::unsupported_query(
830 "execute_sql_update rejects SHOW INDEXES; use execute_sql_query::<E>()",
831 ))
832 }
833 (SqlCompiledCommandSurface::Update, SqlStatement::ShowColumns(_)) => {
834 Err(QueryError::unsupported_query(
835 "execute_sql_update rejects SHOW COLUMNS; use execute_sql_query::<E>()",
836 ))
837 }
838 (SqlCompiledCommandSurface::Update, SqlStatement::ShowEntities(_)) => {
839 Err(QueryError::unsupported_query(
840 "execute_sql_update rejects SHOW ENTITIES; use execute_sql_query::<E>()",
841 ))
842 }
843 }
844 }
845
846 pub fn execute_sql_query<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
851 where
852 E: PersistedRow<Canister = C> + EntityValue,
853 {
854 let compiled = self.compile_sql_query::<E>(sql)?;
855
856 self.execute_compiled_sql::<E>(&compiled)
857 }
858
859 #[cfg(feature = "diagnostics")]
862 #[doc(hidden)]
863 pub fn execute_sql_query_with_attribution<E>(
864 &self,
865 sql: &str,
866 ) -> Result<(SqlStatementResult, SqlQueryExecutionAttribution), QueryError>
867 where
868 E: PersistedRow<Canister = C> + EntityValue,
869 {
870 let (compile_local_instructions, compiled) =
873 measure_sql_stage(|| self.compile_sql_query_with_cache_attribution::<E>(sql));
874 let (compiled, compile_cache_attribution, compile_phase_attribution) = compiled?;
875
876 let store_get_calls_before = DataStore::current_get_call_count();
879 let pure_covering_decode_before = current_pure_covering_decode_local_instructions();
880 let pure_covering_row_assembly_before =
881 current_pure_covering_row_assembly_local_instructions();
882 let (result, execute_cache_attribution, execute_phase_attribution) =
883 self.execute_compiled_sql_with_phase_attribution::<E>(&compiled)?;
884 let store_get_calls =
885 DataStore::current_get_call_count().saturating_sub(store_get_calls_before);
886 let pure_covering_decode_local_instructions =
887 current_pure_covering_decode_local_instructions()
888 .saturating_sub(pure_covering_decode_before);
889 let pure_covering_row_assembly_local_instructions =
890 current_pure_covering_row_assembly_local_instructions()
891 .saturating_sub(pure_covering_row_assembly_before);
892 let execute_local_instructions = execute_phase_attribution
893 .planner_local_instructions
894 .saturating_add(execute_phase_attribution.store_local_instructions)
895 .saturating_add(execute_phase_attribution.executor_local_instructions);
896 let cache_attribution = compile_cache_attribution.merge(execute_cache_attribution);
897 let total_local_instructions =
898 compile_local_instructions.saturating_add(execute_local_instructions);
899
900 Ok((
901 result,
902 SqlQueryExecutionAttribution {
903 compile_local_instructions,
904 compile_cache_key_local_instructions: compile_phase_attribution.cache_key,
905 compile_cache_lookup_local_instructions: compile_phase_attribution.cache_lookup,
906 compile_parse_local_instructions: compile_phase_attribution.parse,
907 compile_parse_tokenize_local_instructions: compile_phase_attribution.parse_tokenize,
908 compile_parse_select_local_instructions: compile_phase_attribution.parse_select,
909 compile_parse_expr_local_instructions: compile_phase_attribution.parse_expr,
910 compile_parse_predicate_local_instructions: compile_phase_attribution
911 .parse_predicate,
912 compile_aggregate_lane_check_local_instructions: compile_phase_attribution
913 .aggregate_lane_check,
914 compile_prepare_local_instructions: compile_phase_attribution.prepare,
915 compile_lower_local_instructions: compile_phase_attribution.lower,
916 compile_bind_local_instructions: compile_phase_attribution.bind,
917 compile_cache_insert_local_instructions: compile_phase_attribution.cache_insert,
918 planner_local_instructions: execute_phase_attribution.planner_local_instructions,
919 store_local_instructions: execute_phase_attribution.store_local_instructions,
920 executor_local_instructions: execute_phase_attribution.executor_local_instructions,
921 grouped_stream_local_instructions: execute_phase_attribution
922 .grouped_stream_local_instructions,
923 grouped_fold_local_instructions: execute_phase_attribution
924 .grouped_fold_local_instructions,
925 grouped_finalize_local_instructions: execute_phase_attribution
926 .grouped_finalize_local_instructions,
927 grouped_count_borrowed_hash_computations: execute_phase_attribution
928 .grouped_count
929 .borrowed_hash_computations,
930 grouped_count_bucket_candidate_checks: execute_phase_attribution
931 .grouped_count
932 .bucket_candidate_checks,
933 grouped_count_existing_group_hits: execute_phase_attribution
934 .grouped_count
935 .existing_group_hits,
936 grouped_count_new_group_inserts: execute_phase_attribution
937 .grouped_count
938 .new_group_inserts,
939 grouped_count_row_materialization_local_instructions: execute_phase_attribution
940 .grouped_count
941 .row_materialization_local_instructions,
942 grouped_count_group_lookup_local_instructions: execute_phase_attribution
943 .grouped_count
944 .group_lookup_local_instructions,
945 grouped_count_existing_group_update_local_instructions: execute_phase_attribution
946 .grouped_count
947 .existing_group_update_local_instructions,
948 grouped_count_new_group_insert_local_instructions: execute_phase_attribution
949 .grouped_count
950 .new_group_insert_local_instructions,
951 pure_covering_decode_local_instructions,
952 pure_covering_row_assembly_local_instructions,
953 store_get_calls,
954 response_decode_local_instructions: 0,
955 execute_local_instructions,
956 total_local_instructions,
957 sql_compiled_command_cache_hits: cache_attribution.sql_compiled_command_cache_hits,
958 sql_compiled_command_cache_misses: cache_attribution
959 .sql_compiled_command_cache_misses,
960 shared_query_plan_cache_hits: cache_attribution.shared_query_plan_cache_hits,
961 shared_query_plan_cache_misses: cache_attribution.shared_query_plan_cache_misses,
962 },
963 ))
964 }
965
966 pub fn execute_sql_update<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
971 where
972 E: PersistedRow<Canister = C> + EntityValue,
973 {
974 let compiled = self.compile_sql_update::<E>(sql)?;
975
976 self.execute_compiled_sql::<E>(&compiled)
977 }
978
979 pub(in crate::db) fn compile_sql_query<E>(
982 &self,
983 sql: &str,
984 ) -> Result<CompiledSqlCommand, QueryError>
985 where
986 E: PersistedRow<Canister = C> + EntityValue,
987 {
988 self.compile_sql_query_with_cache_attribution::<E>(sql)
989 .map(|(compiled, _, _)| compiled)
990 }
991
992 fn compile_sql_query_with_cache_attribution<E>(
993 &self,
994 sql: &str,
995 ) -> Result<
996 (
997 CompiledSqlCommand,
998 SqlCacheAttribution,
999 SqlCompilePhaseAttribution,
1000 ),
1001 QueryError,
1002 >
1003 where
1004 E: PersistedRow<Canister = C> + EntityValue,
1005 {
1006 self.compile_sql_surface_with_cache_attribution::<E>(sql, SqlCompiledCommandSurface::Query)
1007 }
1008
1009 pub(in crate::db) fn compile_sql_update<E>(
1012 &self,
1013 sql: &str,
1014 ) -> Result<CompiledSqlCommand, QueryError>
1015 where
1016 E: PersistedRow<Canister = C> + EntityValue,
1017 {
1018 self.compile_sql_update_with_cache_attribution::<E>(sql)
1019 .map(|(compiled, _, _)| compiled)
1020 }
1021
1022 fn compile_sql_update_with_cache_attribution<E>(
1023 &self,
1024 sql: &str,
1025 ) -> Result<
1026 (
1027 CompiledSqlCommand,
1028 SqlCacheAttribution,
1029 SqlCompilePhaseAttribution,
1030 ),
1031 QueryError,
1032 >
1033 where
1034 E: PersistedRow<Canister = C> + EntityValue,
1035 {
1036 self.compile_sql_surface_with_cache_attribution::<E>(sql, SqlCompiledCommandSurface::Update)
1037 }
1038
1039 fn compile_sql_surface_with_cache_attribution<E>(
1043 &self,
1044 sql: &str,
1045 surface: SqlCompiledCommandSurface,
1046 ) -> Result<
1047 (
1048 CompiledSqlCommand,
1049 SqlCacheAttribution,
1050 SqlCompilePhaseAttribution,
1051 ),
1052 QueryError,
1053 >
1054 where
1055 E: PersistedRow<Canister = C> + EntityValue,
1056 {
1057 let (cache_key_local_instructions, cache_key) = measure_sql_stage(|| {
1058 Ok::<_, QueryError>(SqlCompiledCommandCacheKey::for_entity::<E>(surface, sql))
1059 });
1060 let cache_key = cache_key?;
1061
1062 self.compile_sql_statement_with_cache::<E, _>(
1063 cache_key,
1064 cache_key_local_instructions,
1065 sql,
1066 |statement| Self::ensure_sql_statement_supported_for_surface(statement, surface),
1067 )
1068 }
1069
1070 fn compile_sql_statement_with_cache<E, F>(
1073 &self,
1074 cache_key: SqlCompiledCommandCacheKey,
1075 cache_key_local_instructions: u64,
1076 sql: &str,
1077 ensure_surface_supported: F,
1078 ) -> Result<
1079 (
1080 CompiledSqlCommand,
1081 SqlCacheAttribution,
1082 SqlCompilePhaseAttribution,
1083 ),
1084 QueryError,
1085 >
1086 where
1087 E: PersistedRow<Canister = C> + EntityValue,
1088 F: FnOnce(&SqlStatement) -> Result<(), QueryError>,
1089 {
1090 let (cache_lookup_local_instructions, cached) = measure_sql_stage(|| {
1091 let cached =
1092 self.with_sql_compiled_command_cache(|cache| cache.get(&cache_key).cloned());
1093 Ok::<_, QueryError>(cached)
1094 });
1095 let cached = cached?;
1096 if let Some(compiled) = cached {
1097 return Ok((
1098 compiled,
1099 SqlCacheAttribution::sql_compiled_command_cache_hit(),
1100 SqlCompilePhaseAttribution::cache_hit(
1101 cache_key_local_instructions,
1102 cache_lookup_local_instructions,
1103 ),
1104 ));
1105 }
1106
1107 let (parse_local_instructions, parsed) = measure_sql_stage(|| {
1108 parse_sql_with_attribution(sql).map_err(QueryError::from_sql_parse_error)
1109 });
1110 let (parsed, parse_attribution) = parsed?;
1111 let parse_select_local_instructions = parse_local_instructions
1112 .saturating_sub(parse_attribution.tokenize)
1113 .saturating_sub(parse_attribution.expr)
1114 .saturating_sub(parse_attribution.predicate);
1115 ensure_surface_supported(&parsed)?;
1116 let authority = EntityAuthority::for_type::<E>();
1117 let (
1118 compiled,
1119 aggregate_lane_check_local_instructions,
1120 prepare_local_instructions,
1121 lower_local_instructions,
1122 bind_local_instructions,
1123 ) = Self::compile_sql_statement_for_authority(&parsed, authority, cache_key.clone())?;
1124
1125 let (cache_insert_local_instructions, cache_insert) = measure_sql_stage(|| {
1126 self.with_sql_compiled_command_cache(|cache| {
1127 cache.insert(cache_key, compiled.clone());
1128 });
1129 Ok::<_, QueryError>(())
1130 });
1131 cache_insert?;
1132
1133 Ok((
1134 compiled,
1135 SqlCacheAttribution::sql_compiled_command_cache_miss(),
1136 SqlCompilePhaseAttribution {
1137 cache_key: cache_key_local_instructions,
1138 cache_lookup: cache_lookup_local_instructions,
1139 parse: parse_local_instructions,
1140 parse_tokenize: parse_attribution.tokenize,
1141 parse_select: parse_select_local_instructions,
1142 parse_expr: parse_attribution.expr,
1143 parse_predicate: parse_attribution.predicate,
1144 aggregate_lane_check: aggregate_lane_check_local_instructions,
1145 prepare: prepare_local_instructions,
1146 lower: lower_local_instructions,
1147 bind: bind_local_instructions,
1148 cache_insert: cache_insert_local_instructions,
1149 },
1150 ))
1151 }
1152}