1mod execute;
8mod projection;
9
10#[cfg(feature = "diagnostics")]
11use candid::CandidType;
12#[cfg(feature = "diagnostics")]
13use serde::Deserialize;
14use std::{cell::RefCell, collections::HashMap, sync::Arc};
15
16const SQL_COMPILED_COMMAND_CACHE_METHOD_VERSION: u8 = 1;
22
23#[cfg(feature = "diagnostics")]
24use crate::db::DataStore;
25#[cfg(feature = "diagnostics")]
26use crate::db::executor::GroupedCountAttribution;
27#[cfg(feature = "diagnostics")]
28use crate::db::session::sql::projection::{
29 current_pure_covering_decode_local_instructions,
30 current_pure_covering_row_assembly_local_instructions,
31};
32#[cfg(test)]
33use crate::db::sql::parser::parse_sql;
34use crate::db::sql::parser::{SqlDeleteStatement, SqlInsertStatement, SqlUpdateStatement};
35use crate::{
36 db::{
37 DbSession, GroupedRow, MissingRowPolicy, PersistedRow, QueryError,
38 commit::CommitSchemaFingerprint,
39 executor::{EntityAuthority, SharedPreparedExecutionPlan},
40 query::intent::StructuralQuery,
41 schema::commit_schema_fingerprint_for_entity,
42 session::query::QueryPlanCacheAttribution,
43 session::sql::projection::{
44 projection_fixed_scales_from_projection_spec, projection_labels_from_projection_spec,
45 },
46 sql::identifier::identifiers_tail_match,
47 sql::lowering::{
48 LoweredSqlCommand, SqlGlobalAggregateCommandCore, SqlLoweringError,
49 bind_lowered_sql_delete_query_structural, bind_lowered_sql_select_query_structural,
50 compile_sql_global_aggregate_command_core_from_prepared,
51 extract_prepared_sql_insert_statement, extract_prepared_sql_update_statement,
52 lower_prepared_sql_delete_statement_with_source, lower_prepared_sql_select_statement,
53 lower_sql_command_from_prepared_statement, prepare_sql_statement,
54 },
55 sql::parser::{SqlStatement, parse_sql_with_attribution},
56 },
57 traits::{CanisterKind, EntityValue},
58 value::OutputValue,
59};
60
61#[cfg(all(test, not(feature = "diagnostics")))]
62pub(crate) use crate::db::session::sql::projection::with_sql_projection_materialization_metrics;
63#[cfg(feature = "diagnostics")]
64pub use crate::db::session::sql::projection::{
65 SqlProjectionMaterializationMetrics, with_sql_projection_materialization_metrics,
66};
67
68#[derive(Debug)]
70pub enum SqlStatementResult {
71 Count {
72 row_count: u32,
73 },
74 Projection {
75 columns: Vec<String>,
76 fixed_scales: Vec<Option<u32>>,
77 rows: Vec<Vec<OutputValue>>,
78 row_count: u32,
79 },
80 ProjectionText {
81 columns: Vec<String>,
82 rows: Vec<Vec<String>>,
83 row_count: u32,
84 },
85 Grouped {
86 columns: Vec<String>,
87 fixed_scales: Vec<Option<u32>>,
88 rows: Vec<GroupedRow>,
89 row_count: u32,
90 next_cursor: Option<String>,
91 },
92 Explain(String),
93 Describe(crate::db::EntitySchemaDescription),
94 ShowIndexes(Vec<String>),
95 ShowColumns(Vec<crate::db::EntityFieldDescription>),
96 ShowEntities(Vec<String>),
97}
98
99#[cfg(feature = "diagnostics")]
110#[derive(CandidType, Clone, Debug, Deserialize, Eq, PartialEq)]
111pub struct SqlQueryExecutionAttribution {
112 pub compile_local_instructions: u64,
113 pub compile_cache_key_local_instructions: u64,
114 pub compile_cache_lookup_local_instructions: u64,
115 pub compile_parse_local_instructions: u64,
116 pub compile_parse_tokenize_local_instructions: u64,
117 pub compile_parse_select_local_instructions: u64,
118 pub compile_parse_expr_local_instructions: u64,
119 pub compile_parse_predicate_local_instructions: u64,
120 pub compile_aggregate_lane_check_local_instructions: u64,
121 pub compile_prepare_local_instructions: u64,
122 pub compile_lower_local_instructions: u64,
123 pub compile_bind_local_instructions: u64,
124 pub compile_cache_insert_local_instructions: u64,
125 pub planner_local_instructions: u64,
126 pub store_local_instructions: u64,
127 pub executor_local_instructions: u64,
128 pub grouped_stream_local_instructions: u64,
129 pub grouped_fold_local_instructions: u64,
130 pub grouped_finalize_local_instructions: u64,
131 pub grouped_count_borrowed_hash_computations: u64,
132 pub grouped_count_bucket_candidate_checks: u64,
133 pub grouped_count_existing_group_hits: u64,
134 pub grouped_count_new_group_inserts: u64,
135 pub grouped_count_row_materialization_local_instructions: u64,
136 pub grouped_count_group_lookup_local_instructions: u64,
137 pub grouped_count_existing_group_update_local_instructions: u64,
138 pub grouped_count_new_group_insert_local_instructions: u64,
139 pub pure_covering_decode_local_instructions: u64,
140 pub pure_covering_row_assembly_local_instructions: u64,
141 pub store_get_calls: u64,
142 pub response_decode_local_instructions: u64,
143 pub execute_local_instructions: u64,
144 pub total_local_instructions: u64,
145 pub sql_compiled_command_cache_hits: u64,
146 pub sql_compiled_command_cache_misses: u64,
147 pub shared_query_plan_cache_hits: u64,
148 pub shared_query_plan_cache_misses: u64,
149}
150
151#[cfg(feature = "diagnostics")]
155#[derive(Clone, Copy, Debug, Eq, PartialEq)]
156pub(in crate::db) struct SqlExecutePhaseAttribution {
157 pub planner_local_instructions: u64,
158 pub store_local_instructions: u64,
159 pub executor_local_instructions: u64,
160 pub grouped_stream_local_instructions: u64,
161 pub grouped_fold_local_instructions: u64,
162 pub grouped_finalize_local_instructions: u64,
163 pub grouped_count: GroupedCountAttribution,
164}
165
166#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
178pub(in crate::db) struct SqlCompilePhaseAttribution {
179 pub cache_key: u64,
180 pub cache_lookup: u64,
181 pub parse: u64,
182 pub parse_tokenize: u64,
183 pub parse_select: u64,
184 pub parse_expr: u64,
185 pub parse_predicate: u64,
186 pub aggregate_lane_check: u64,
187 pub prepare: u64,
188 pub lower: u64,
189 pub bind: u64,
190 pub cache_insert: u64,
191}
192
193impl SqlCompilePhaseAttribution {
194 #[must_use]
195 const fn cache_hit(cache_key: u64, cache_lookup: u64) -> Self {
196 Self {
197 cache_key,
198 cache_lookup,
199 parse: 0,
200 parse_tokenize: 0,
201 parse_select: 0,
202 parse_expr: 0,
203 parse_predicate: 0,
204 aggregate_lane_check: 0,
205 prepare: 0,
206 lower: 0,
207 bind: 0,
208 cache_insert: 0,
209 }
210 }
211}
212
213#[cfg(feature = "diagnostics")]
214impl SqlExecutePhaseAttribution {
215 #[must_use]
216 pub(in crate::db) const fn from_execute_total_and_store_total(
217 execute_local_instructions: u64,
218 store_local_instructions: u64,
219 ) -> Self {
220 Self {
221 planner_local_instructions: 0,
222 store_local_instructions,
223 executor_local_instructions: execute_local_instructions
224 .saturating_sub(store_local_instructions),
225 grouped_stream_local_instructions: 0,
226 grouped_fold_local_instructions: 0,
227 grouped_finalize_local_instructions: 0,
228 grouped_count: GroupedCountAttribution::none(),
229 }
230 }
231}
232
233#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
239pub(in crate::db) struct SqlCacheAttribution {
240 pub sql_compiled_command_cache_hits: u64,
241 pub sql_compiled_command_cache_misses: u64,
242 pub shared_query_plan_cache_hits: u64,
243 pub shared_query_plan_cache_misses: u64,
244}
245
246impl SqlCacheAttribution {
247 #[must_use]
248 const fn none() -> Self {
249 Self {
250 sql_compiled_command_cache_hits: 0,
251 sql_compiled_command_cache_misses: 0,
252 shared_query_plan_cache_hits: 0,
253 shared_query_plan_cache_misses: 0,
254 }
255 }
256
257 #[must_use]
258 const fn sql_compiled_command_cache_hit() -> Self {
259 Self {
260 sql_compiled_command_cache_hits: 1,
261 ..Self::none()
262 }
263 }
264
265 #[must_use]
266 const fn sql_compiled_command_cache_miss() -> Self {
267 Self {
268 sql_compiled_command_cache_misses: 1,
269 ..Self::none()
270 }
271 }
272
273 #[must_use]
274 const fn from_shared_query_plan_cache(attribution: QueryPlanCacheAttribution) -> Self {
275 Self {
276 shared_query_plan_cache_hits: attribution.hits,
277 shared_query_plan_cache_misses: attribution.misses,
278 ..Self::none()
279 }
280 }
281
282 #[must_use]
283 const fn merge(self, other: Self) -> Self {
284 Self {
285 sql_compiled_command_cache_hits: self
286 .sql_compiled_command_cache_hits
287 .saturating_add(other.sql_compiled_command_cache_hits),
288 sql_compiled_command_cache_misses: self
289 .sql_compiled_command_cache_misses
290 .saturating_add(other.sql_compiled_command_cache_misses),
291 shared_query_plan_cache_hits: self
292 .shared_query_plan_cache_hits
293 .saturating_add(other.shared_query_plan_cache_hits),
294 shared_query_plan_cache_misses: self
295 .shared_query_plan_cache_misses
296 .saturating_add(other.shared_query_plan_cache_misses),
297 }
298 }
299}
300
301#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
302enum SqlCompiledCommandSurface {
303 Query,
304 Update,
305}
306
307#[derive(Clone, Debug, Eq, Hash, PartialEq)]
318pub(in crate::db) struct SqlCompiledCommandCacheKey {
319 cache_method_version: u8,
320 surface: SqlCompiledCommandSurface,
321 entity_path: &'static str,
322 schema_fingerprint: CommitSchemaFingerprint,
323 sql: String,
324}
325
326#[derive(Clone, Debug)]
336pub(in crate::db) struct SqlProjectionContract {
337 columns: Vec<String>,
338 fixed_scales: Vec<Option<u32>>,
339}
340
341impl SqlProjectionContract {
342 #[must_use]
343 pub(in crate::db) const fn new(columns: Vec<String>, fixed_scales: Vec<Option<u32>>) -> Self {
344 Self {
345 columns,
346 fixed_scales,
347 }
348 }
349
350 #[must_use]
351 pub(in crate::db) fn into_parts(self) -> (Vec<String>, Vec<Option<u32>>) {
352 (self.columns, self.fixed_scales)
353 }
354}
355
356impl SqlCompiledCommandCacheKey {
357 fn for_entity<E>(surface: SqlCompiledCommandSurface, sql: &str) -> Self
358 where
359 E: PersistedRow + EntityValue,
360 {
361 Self {
362 cache_method_version: SQL_COMPILED_COMMAND_CACHE_METHOD_VERSION,
363 surface,
364 entity_path: E::PATH,
365 schema_fingerprint: commit_schema_fingerprint_for_entity::<E>(),
366 sql: sql.to_string(),
367 }
368 }
369
370 #[must_use]
371 pub(in crate::db) const fn schema_fingerprint(&self) -> CommitSchemaFingerprint {
372 self.schema_fingerprint
373 }
374}
375
376#[cfg(test)]
377impl SqlCompiledCommandCacheKey {
378 pub(in crate::db) fn query_for_entity_with_method_version<E>(
379 sql: &str,
380 cache_method_version: u8,
381 ) -> Self
382 where
383 E: PersistedRow + EntityValue,
384 {
385 Self::for_entity_with_method_version::<E>(
386 SqlCompiledCommandSurface::Query,
387 sql,
388 cache_method_version,
389 )
390 }
391
392 pub(in crate::db) fn update_for_entity_with_method_version<E>(
393 sql: &str,
394 cache_method_version: u8,
395 ) -> Self
396 where
397 E: PersistedRow + EntityValue,
398 {
399 Self::for_entity_with_method_version::<E>(
400 SqlCompiledCommandSurface::Update,
401 sql,
402 cache_method_version,
403 )
404 }
405
406 fn for_entity_with_method_version<E>(
407 surface: SqlCompiledCommandSurface,
408 sql: &str,
409 cache_method_version: u8,
410 ) -> Self
411 where
412 E: PersistedRow + EntityValue,
413 {
414 Self {
415 cache_method_version,
416 surface,
417 entity_path: E::PATH,
418 schema_fingerprint: commit_schema_fingerprint_for_entity::<E>(),
419 sql: sql.to_string(),
420 }
421 }
422}
423
424pub(in crate::db) type SqlCompiledCommandCache =
425 HashMap<SqlCompiledCommandCacheKey, CompiledSqlCommand>;
426
427thread_local! {
428 static SQL_COMPILED_COMMAND_CACHES: RefCell<HashMap<usize, SqlCompiledCommandCache>> =
432 RefCell::new(HashMap::default());
433}
434
435#[derive(Clone, Debug)]
439pub(in crate::db) enum CompiledSqlCommand {
440 Select {
441 query: Arc<StructuralQuery>,
442 compiled_cache_key: SqlCompiledCommandCacheKey,
443 },
444 Delete {
445 query: Arc<StructuralQuery>,
446 statement: SqlDeleteStatement,
447 },
448 GlobalAggregate {
449 command: Box<SqlGlobalAggregateCommandCore>,
450 },
451 Explain(Box<LoweredSqlCommand>),
452 Insert(SqlInsertStatement),
453 Update(SqlUpdateStatement),
454 DescribeEntity,
455 ShowIndexesEntity,
456 ShowColumnsEntity,
457 ShowEntities,
458}
459
460#[cfg(test)]
463pub(in crate::db) fn parse_sql_statement(sql: &str) -> Result<SqlStatement, QueryError> {
464 parse_sql(sql).map_err(QueryError::from_sql_parse_error)
465}
466
467#[cfg(feature = "diagnostics")]
468#[expect(
469 clippy::missing_const_for_fn,
470 reason = "the wasm32 branch reads the runtime performance counter and cannot be const"
471)]
472fn read_sql_local_instruction_counter() -> u64 {
473 #[cfg(all(feature = "diagnostics", target_arch = "wasm32"))]
474 {
475 canic_cdk::api::performance_counter(1)
476 }
477
478 #[cfg(not(all(feature = "diagnostics", target_arch = "wasm32")))]
479 {
480 0
481 }
482}
483
484pub(in crate::db::session::sql) fn measure_sql_stage<T, E>(
485 run: impl FnOnce() -> Result<T, E>,
486) -> (u64, Result<T, E>) {
487 #[cfg(feature = "diagnostics")]
488 let start = read_sql_local_instruction_counter();
489
490 let result = run();
491
492 #[cfg(feature = "diagnostics")]
493 let delta = read_sql_local_instruction_counter().saturating_sub(start);
494
495 #[cfg(not(feature = "diagnostics"))]
496 let delta = 0;
497
498 (delta, result)
499}
500
501impl<C: CanisterKind> DbSession<C> {
502 fn with_sql_compiled_command_cache<R>(
503 &self,
504 f: impl FnOnce(&mut SqlCompiledCommandCache) -> R,
505 ) -> R {
506 let scope_id = self.db.cache_scope_id();
507
508 SQL_COMPILED_COMMAND_CACHES.with(|caches| {
509 let mut caches = caches.borrow_mut();
510 let cache = caches.entry(scope_id).or_default();
511
512 f(cache)
513 })
514 }
515
516 #[cfg(test)]
517 pub(in crate::db) fn sql_compiled_command_cache_len(&self) -> usize {
518 self.with_sql_compiled_command_cache(|cache| cache.len())
519 }
520
521 #[cfg(test)]
522 pub(in crate::db) fn clear_sql_caches_for_tests(&self) {
523 self.with_sql_compiled_command_cache(SqlCompiledCommandCache::clear);
524 }
525
526 #[expect(clippy::too_many_lines)]
529 fn compile_sql_statement_for_authority(
530 statement: &SqlStatement,
531 authority: EntityAuthority,
532 compiled_cache_key: SqlCompiledCommandCacheKey,
533 ) -> Result<(CompiledSqlCommand, u64, u64, u64, u64), QueryError> {
534 let prepare_statement = || {
538 measure_sql_stage(|| {
539 prepare_sql_statement(statement.clone(), authority.model().name())
540 .map_err(QueryError::from_sql_lowering_error)
541 })
542 };
543
544 let validate_metadata_entity = |sql_entity: &str| {
547 if identifiers_tail_match(sql_entity, authority.model().name()) {
548 return Ok(());
549 }
550
551 Err(QueryError::from_sql_lowering_error(
552 SqlLoweringError::EntityMismatch {
553 sql_entity: sql_entity.to_string(),
554 expected_entity: authority.model().name(),
555 },
556 ))
557 };
558
559 match statement {
560 SqlStatement::Select(_) => {
561 let (prepare_local_instructions, prepared) = prepare_statement();
562 let prepared = prepared?;
563 let (aggregate_lane_check_local_instructions, requires_aggregate_lane) =
564 measure_sql_stage(|| {
565 Ok::<_, QueryError>(prepared.statement().is_global_aggregate_lane_shape())
566 });
567 let requires_aggregate_lane = requires_aggregate_lane?;
568
569 if requires_aggregate_lane {
570 let (lower_local_instructions, command) = measure_sql_stage(|| {
571 compile_sql_global_aggregate_command_core_from_prepared(
572 prepared,
573 authority.model(),
574 MissingRowPolicy::Ignore,
575 )
576 .map_err(QueryError::from_sql_lowering_error)
577 });
578 let command = command?;
579
580 Ok((
581 CompiledSqlCommand::GlobalAggregate {
582 command: Box::new(command),
583 },
584 aggregate_lane_check_local_instructions,
585 prepare_local_instructions,
586 lower_local_instructions,
587 0,
588 ))
589 } else {
590 let (lower_local_instructions, select) = measure_sql_stage(|| {
591 lower_prepared_sql_select_statement(prepared, authority.model())
592 .map_err(QueryError::from_sql_lowering_error)
593 });
594 let select = select?;
595 let (bind_local_instructions, query) = measure_sql_stage(|| {
596 bind_lowered_sql_select_query_structural(
597 authority.model(),
598 select,
599 MissingRowPolicy::Ignore,
600 )
601 .map_err(QueryError::from_sql_lowering_error)
602 });
603 let query = query?;
604
605 Ok((
606 CompiledSqlCommand::Select {
607 query: Arc::new(query),
608 compiled_cache_key,
609 },
610 aggregate_lane_check_local_instructions,
611 prepare_local_instructions,
612 lower_local_instructions,
613 bind_local_instructions,
614 ))
615 }
616 }
617 SqlStatement::Delete(_) => {
618 let (prepare_local_instructions, prepared) = prepare_statement();
619 let prepared = prepared?;
620 let (lower_local_instructions, lowered) = measure_sql_stage(|| {
621 lower_prepared_sql_delete_statement_with_source(prepared)
622 .map_err(QueryError::from_sql_lowering_error)
623 });
624 let (query, statement) = lowered?;
625 let (bind_local_instructions, query) = measure_sql_stage(|| {
626 Ok::<_, QueryError>(bind_lowered_sql_delete_query_structural(
627 authority.model(),
628 query,
629 MissingRowPolicy::Ignore,
630 ))
631 });
632 let query = query?;
633
634 Ok((
635 CompiledSqlCommand::Delete {
636 query: Arc::new(query),
637 statement,
638 },
639 0,
640 prepare_local_instructions,
641 lower_local_instructions,
642 bind_local_instructions,
643 ))
644 }
645 SqlStatement::Insert(_) => {
646 let (prepare_local_instructions, prepared) = prepare_statement();
647 let prepared = prepared?;
648 let statement = extract_prepared_sql_insert_statement(prepared)
649 .map_err(QueryError::from_sql_lowering_error)?;
650
651 Ok((
652 CompiledSqlCommand::Insert(statement),
653 0,
654 prepare_local_instructions,
655 0,
656 0,
657 ))
658 }
659 SqlStatement::Update(_) => {
660 let (prepare_local_instructions, prepared) = prepare_statement();
661 let prepared = prepared?;
662 let statement = extract_prepared_sql_update_statement(prepared)
663 .map_err(QueryError::from_sql_lowering_error)?;
664
665 Ok((
666 CompiledSqlCommand::Update(statement),
667 0,
668 prepare_local_instructions,
669 0,
670 0,
671 ))
672 }
673 SqlStatement::Explain(_) => {
674 let (prepare_local_instructions, prepared) = prepare_statement();
675 let prepared = prepared?;
676 let (lower_local_instructions, lowered) = measure_sql_stage(|| {
677 lower_sql_command_from_prepared_statement(prepared, authority.model())
678 .map_err(QueryError::from_sql_lowering_error)
679 });
680 let lowered = lowered?;
681
682 Ok((
683 CompiledSqlCommand::Explain(Box::new(lowered)),
684 0,
685 prepare_local_instructions,
686 lower_local_instructions,
687 0,
688 ))
689 }
690 SqlStatement::Describe(_) => {
691 let (prepare_local_instructions, validated) = measure_sql_stage(|| {
692 let SqlStatement::Describe(statement) = statement else {
693 return Err(QueryError::invariant(
694 "compiled SQL DESCRIBE lane must preserve DESCRIBE statement ownership",
695 ));
696 };
697
698 validate_metadata_entity(statement.entity.as_str())
699 });
700 validated?;
701
702 Ok((
703 CompiledSqlCommand::DescribeEntity,
704 0,
705 prepare_local_instructions,
706 0,
707 0,
708 ))
709 }
710 SqlStatement::ShowIndexes(entity) => {
711 let (prepare_local_instructions, validated) =
712 measure_sql_stage(|| validate_metadata_entity(entity.entity.as_str()));
713 validated?;
714
715 Ok((
716 CompiledSqlCommand::ShowIndexesEntity,
717 0,
718 prepare_local_instructions,
719 0,
720 0,
721 ))
722 }
723 SqlStatement::ShowColumns(entity) => {
724 let (prepare_local_instructions, validated) =
725 measure_sql_stage(|| validate_metadata_entity(entity.entity.as_str()));
726 validated?;
727
728 Ok((
729 CompiledSqlCommand::ShowColumnsEntity,
730 0,
731 prepare_local_instructions,
732 0,
733 0,
734 ))
735 }
736 SqlStatement::ShowEntities(_) => Ok((CompiledSqlCommand::ShowEntities, 0, 0, 0, 0)),
737 }
738 }
739
740 fn sql_select_prepared_plan(
743 &self,
744 query: &StructuralQuery,
745 authority: EntityAuthority,
746 cache_schema_fingerprint: CommitSchemaFingerprint,
747 ) -> Result<
748 (
749 SharedPreparedExecutionPlan,
750 SqlProjectionContract,
751 SqlCacheAttribution,
752 ),
753 QueryError,
754 > {
755 let (prepared_plan, cache_attribution) = self.cached_shared_query_plan_for_authority(
756 authority,
757 cache_schema_fingerprint,
758 query,
759 )?;
760 let projection_spec = prepared_plan
761 .logical_plan()
762 .projection_spec(authority.model());
763 let projection = SqlProjectionContract::new(
764 projection_labels_from_projection_spec(&projection_spec),
765 projection_fixed_scales_from_projection_spec(&projection_spec),
766 );
767
768 Ok((
769 prepared_plan,
770 projection,
771 SqlCacheAttribution::from_shared_query_plan_cache(cache_attribution),
772 ))
773 }
774
775 fn ensure_sql_statement_supported_for_surface(
779 statement: &SqlStatement,
780 surface: SqlCompiledCommandSurface,
781 ) -> Result<(), QueryError> {
782 match (surface, statement) {
783 (
784 SqlCompiledCommandSurface::Query,
785 SqlStatement::Select(_)
786 | SqlStatement::Explain(_)
787 | SqlStatement::Describe(_)
788 | SqlStatement::ShowIndexes(_)
789 | SqlStatement::ShowColumns(_)
790 | SqlStatement::ShowEntities(_),
791 )
792 | (
793 SqlCompiledCommandSurface::Update,
794 SqlStatement::Insert(_) | SqlStatement::Update(_) | SqlStatement::Delete(_),
795 ) => Ok(()),
796 (SqlCompiledCommandSurface::Query, SqlStatement::Insert(_)) => {
797 Err(QueryError::unsupported_query(
798 "execute_sql_query rejects INSERT; use execute_sql_update::<E>()",
799 ))
800 }
801 (SqlCompiledCommandSurface::Query, SqlStatement::Update(_)) => {
802 Err(QueryError::unsupported_query(
803 "execute_sql_query rejects UPDATE; use execute_sql_update::<E>()",
804 ))
805 }
806 (SqlCompiledCommandSurface::Query, SqlStatement::Delete(_)) => {
807 Err(QueryError::unsupported_query(
808 "execute_sql_query rejects DELETE; use execute_sql_update::<E>()",
809 ))
810 }
811 (SqlCompiledCommandSurface::Update, SqlStatement::Select(_)) => {
812 Err(QueryError::unsupported_query(
813 "execute_sql_update rejects SELECT; use execute_sql_query::<E>()",
814 ))
815 }
816 (SqlCompiledCommandSurface::Update, SqlStatement::Explain(_)) => {
817 Err(QueryError::unsupported_query(
818 "execute_sql_update rejects EXPLAIN; use execute_sql_query::<E>()",
819 ))
820 }
821 (SqlCompiledCommandSurface::Update, SqlStatement::Describe(_)) => {
822 Err(QueryError::unsupported_query(
823 "execute_sql_update rejects DESCRIBE; use execute_sql_query::<E>()",
824 ))
825 }
826 (SqlCompiledCommandSurface::Update, SqlStatement::ShowIndexes(_)) => {
827 Err(QueryError::unsupported_query(
828 "execute_sql_update rejects SHOW INDEXES; use execute_sql_query::<E>()",
829 ))
830 }
831 (SqlCompiledCommandSurface::Update, SqlStatement::ShowColumns(_)) => {
832 Err(QueryError::unsupported_query(
833 "execute_sql_update rejects SHOW COLUMNS; use execute_sql_query::<E>()",
834 ))
835 }
836 (SqlCompiledCommandSurface::Update, SqlStatement::ShowEntities(_)) => {
837 Err(QueryError::unsupported_query(
838 "execute_sql_update rejects SHOW ENTITIES; use execute_sql_query::<E>()",
839 ))
840 }
841 }
842 }
843
844 pub fn execute_sql_query<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
849 where
850 E: PersistedRow<Canister = C> + EntityValue,
851 {
852 let compiled = self.compile_sql_query::<E>(sql)?;
853
854 self.execute_compiled_sql::<E>(&compiled)
855 }
856
857 #[cfg(feature = "diagnostics")]
860 #[doc(hidden)]
861 pub fn execute_sql_query_with_attribution<E>(
862 &self,
863 sql: &str,
864 ) -> Result<(SqlStatementResult, SqlQueryExecutionAttribution), QueryError>
865 where
866 E: PersistedRow<Canister = C> + EntityValue,
867 {
868 let (compile_local_instructions, compiled) =
871 measure_sql_stage(|| self.compile_sql_query_with_cache_attribution::<E>(sql));
872 let (compiled, compile_cache_attribution, compile_phase_attribution) = compiled?;
873
874 let store_get_calls_before = DataStore::current_get_call_count();
877 let pure_covering_decode_before = current_pure_covering_decode_local_instructions();
878 let pure_covering_row_assembly_before =
879 current_pure_covering_row_assembly_local_instructions();
880 let (result, execute_cache_attribution, execute_phase_attribution) =
881 self.execute_compiled_sql_with_phase_attribution::<E>(&compiled)?;
882 let store_get_calls =
883 DataStore::current_get_call_count().saturating_sub(store_get_calls_before);
884 let pure_covering_decode_local_instructions =
885 current_pure_covering_decode_local_instructions()
886 .saturating_sub(pure_covering_decode_before);
887 let pure_covering_row_assembly_local_instructions =
888 current_pure_covering_row_assembly_local_instructions()
889 .saturating_sub(pure_covering_row_assembly_before);
890 let execute_local_instructions = execute_phase_attribution
891 .planner_local_instructions
892 .saturating_add(execute_phase_attribution.store_local_instructions)
893 .saturating_add(execute_phase_attribution.executor_local_instructions);
894 let cache_attribution = compile_cache_attribution.merge(execute_cache_attribution);
895 let total_local_instructions =
896 compile_local_instructions.saturating_add(execute_local_instructions);
897
898 Ok((
899 result,
900 SqlQueryExecutionAttribution {
901 compile_local_instructions,
902 compile_cache_key_local_instructions: compile_phase_attribution.cache_key,
903 compile_cache_lookup_local_instructions: compile_phase_attribution.cache_lookup,
904 compile_parse_local_instructions: compile_phase_attribution.parse,
905 compile_parse_tokenize_local_instructions: compile_phase_attribution.parse_tokenize,
906 compile_parse_select_local_instructions: compile_phase_attribution.parse_select,
907 compile_parse_expr_local_instructions: compile_phase_attribution.parse_expr,
908 compile_parse_predicate_local_instructions: compile_phase_attribution
909 .parse_predicate,
910 compile_aggregate_lane_check_local_instructions: compile_phase_attribution
911 .aggregate_lane_check,
912 compile_prepare_local_instructions: compile_phase_attribution.prepare,
913 compile_lower_local_instructions: compile_phase_attribution.lower,
914 compile_bind_local_instructions: compile_phase_attribution.bind,
915 compile_cache_insert_local_instructions: compile_phase_attribution.cache_insert,
916 planner_local_instructions: execute_phase_attribution.planner_local_instructions,
917 store_local_instructions: execute_phase_attribution.store_local_instructions,
918 executor_local_instructions: execute_phase_attribution.executor_local_instructions,
919 grouped_stream_local_instructions: execute_phase_attribution
920 .grouped_stream_local_instructions,
921 grouped_fold_local_instructions: execute_phase_attribution
922 .grouped_fold_local_instructions,
923 grouped_finalize_local_instructions: execute_phase_attribution
924 .grouped_finalize_local_instructions,
925 grouped_count_borrowed_hash_computations: execute_phase_attribution
926 .grouped_count
927 .borrowed_hash_computations,
928 grouped_count_bucket_candidate_checks: execute_phase_attribution
929 .grouped_count
930 .bucket_candidate_checks,
931 grouped_count_existing_group_hits: execute_phase_attribution
932 .grouped_count
933 .existing_group_hits,
934 grouped_count_new_group_inserts: execute_phase_attribution
935 .grouped_count
936 .new_group_inserts,
937 grouped_count_row_materialization_local_instructions: execute_phase_attribution
938 .grouped_count
939 .row_materialization_local_instructions,
940 grouped_count_group_lookup_local_instructions: execute_phase_attribution
941 .grouped_count
942 .group_lookup_local_instructions,
943 grouped_count_existing_group_update_local_instructions: execute_phase_attribution
944 .grouped_count
945 .existing_group_update_local_instructions,
946 grouped_count_new_group_insert_local_instructions: execute_phase_attribution
947 .grouped_count
948 .new_group_insert_local_instructions,
949 pure_covering_decode_local_instructions,
950 pure_covering_row_assembly_local_instructions,
951 store_get_calls,
952 response_decode_local_instructions: 0,
953 execute_local_instructions,
954 total_local_instructions,
955 sql_compiled_command_cache_hits: cache_attribution.sql_compiled_command_cache_hits,
956 sql_compiled_command_cache_misses: cache_attribution
957 .sql_compiled_command_cache_misses,
958 shared_query_plan_cache_hits: cache_attribution.shared_query_plan_cache_hits,
959 shared_query_plan_cache_misses: cache_attribution.shared_query_plan_cache_misses,
960 },
961 ))
962 }
963
964 pub fn execute_sql_update<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
969 where
970 E: PersistedRow<Canister = C> + EntityValue,
971 {
972 let compiled = self.compile_sql_update::<E>(sql)?;
973
974 self.execute_compiled_sql::<E>(&compiled)
975 }
976
977 pub(in crate::db) fn compile_sql_query<E>(
980 &self,
981 sql: &str,
982 ) -> Result<CompiledSqlCommand, QueryError>
983 where
984 E: PersistedRow<Canister = C> + EntityValue,
985 {
986 self.compile_sql_query_with_cache_attribution::<E>(sql)
987 .map(|(compiled, _, _)| compiled)
988 }
989
990 fn compile_sql_query_with_cache_attribution<E>(
991 &self,
992 sql: &str,
993 ) -> Result<
994 (
995 CompiledSqlCommand,
996 SqlCacheAttribution,
997 SqlCompilePhaseAttribution,
998 ),
999 QueryError,
1000 >
1001 where
1002 E: PersistedRow<Canister = C> + EntityValue,
1003 {
1004 self.compile_sql_surface_with_cache_attribution::<E>(sql, SqlCompiledCommandSurface::Query)
1005 }
1006
1007 pub(in crate::db) fn compile_sql_update<E>(
1010 &self,
1011 sql: &str,
1012 ) -> Result<CompiledSqlCommand, QueryError>
1013 where
1014 E: PersistedRow<Canister = C> + EntityValue,
1015 {
1016 self.compile_sql_update_with_cache_attribution::<E>(sql)
1017 .map(|(compiled, _, _)| compiled)
1018 }
1019
1020 fn compile_sql_update_with_cache_attribution<E>(
1021 &self,
1022 sql: &str,
1023 ) -> Result<
1024 (
1025 CompiledSqlCommand,
1026 SqlCacheAttribution,
1027 SqlCompilePhaseAttribution,
1028 ),
1029 QueryError,
1030 >
1031 where
1032 E: PersistedRow<Canister = C> + EntityValue,
1033 {
1034 self.compile_sql_surface_with_cache_attribution::<E>(sql, SqlCompiledCommandSurface::Update)
1035 }
1036
1037 fn compile_sql_surface_with_cache_attribution<E>(
1041 &self,
1042 sql: &str,
1043 surface: SqlCompiledCommandSurface,
1044 ) -> Result<
1045 (
1046 CompiledSqlCommand,
1047 SqlCacheAttribution,
1048 SqlCompilePhaseAttribution,
1049 ),
1050 QueryError,
1051 >
1052 where
1053 E: PersistedRow<Canister = C> + EntityValue,
1054 {
1055 let (cache_key_local_instructions, cache_key) = measure_sql_stage(|| {
1056 Ok::<_, QueryError>(SqlCompiledCommandCacheKey::for_entity::<E>(surface, sql))
1057 });
1058 let cache_key = cache_key?;
1059
1060 self.compile_sql_statement_with_cache::<E, _>(
1061 cache_key,
1062 cache_key_local_instructions,
1063 sql,
1064 |statement| Self::ensure_sql_statement_supported_for_surface(statement, surface),
1065 )
1066 }
1067
1068 fn compile_sql_statement_with_cache<E, F>(
1071 &self,
1072 cache_key: SqlCompiledCommandCacheKey,
1073 cache_key_local_instructions: u64,
1074 sql: &str,
1075 ensure_surface_supported: F,
1076 ) -> Result<
1077 (
1078 CompiledSqlCommand,
1079 SqlCacheAttribution,
1080 SqlCompilePhaseAttribution,
1081 ),
1082 QueryError,
1083 >
1084 where
1085 E: PersistedRow<Canister = C> + EntityValue,
1086 F: FnOnce(&SqlStatement) -> Result<(), QueryError>,
1087 {
1088 let (cache_lookup_local_instructions, cached) = measure_sql_stage(|| {
1089 let cached =
1090 self.with_sql_compiled_command_cache(|cache| cache.get(&cache_key).cloned());
1091 Ok::<_, QueryError>(cached)
1092 });
1093 let cached = cached?;
1094 if let Some(compiled) = cached {
1095 return Ok((
1096 compiled,
1097 SqlCacheAttribution::sql_compiled_command_cache_hit(),
1098 SqlCompilePhaseAttribution::cache_hit(
1099 cache_key_local_instructions,
1100 cache_lookup_local_instructions,
1101 ),
1102 ));
1103 }
1104
1105 let (parse_local_instructions, parsed) = measure_sql_stage(|| {
1106 parse_sql_with_attribution(sql).map_err(QueryError::from_sql_parse_error)
1107 });
1108 let (parsed, parse_attribution) = parsed?;
1109 let parse_select_local_instructions = parse_local_instructions
1110 .saturating_sub(parse_attribution.tokenize)
1111 .saturating_sub(parse_attribution.expr)
1112 .saturating_sub(parse_attribution.predicate);
1113 ensure_surface_supported(&parsed)?;
1114 let authority = EntityAuthority::for_type::<E>();
1115 let (
1116 compiled,
1117 aggregate_lane_check_local_instructions,
1118 prepare_local_instructions,
1119 lower_local_instructions,
1120 bind_local_instructions,
1121 ) = Self::compile_sql_statement_for_authority(&parsed, authority, cache_key.clone())?;
1122
1123 let (cache_insert_local_instructions, cache_insert) = measure_sql_stage(|| {
1124 self.with_sql_compiled_command_cache(|cache| {
1125 cache.insert(cache_key, compiled.clone());
1126 });
1127 Ok::<_, QueryError>(())
1128 });
1129 cache_insert?;
1130
1131 Ok((
1132 compiled,
1133 SqlCacheAttribution::sql_compiled_command_cache_miss(),
1134 SqlCompilePhaseAttribution {
1135 cache_key: cache_key_local_instructions,
1136 cache_lookup: cache_lookup_local_instructions,
1137 parse: parse_local_instructions,
1138 parse_tokenize: parse_attribution.tokenize,
1139 parse_select: parse_select_local_instructions,
1140 parse_expr: parse_attribution.expr,
1141 parse_predicate: parse_attribution.predicate,
1142 aggregate_lane_check: aggregate_lane_check_local_instructions,
1143 prepare: prepare_local_instructions,
1144 lower: lower_local_instructions,
1145 bind: bind_local_instructions,
1146 cache_insert: cache_insert_local_instructions,
1147 },
1148 ))
1149 }
1150}