1mod execute;
8mod projection;
9
10#[cfg(feature = "diagnostics")]
11use candid::CandidType;
12#[cfg(feature = "diagnostics")]
13use serde::Deserialize;
14use std::{cell::RefCell, collections::HashMap, sync::Arc};
15
16const SQL_COMPILED_COMMAND_CACHE_METHOD_VERSION: u8 = 1;
22
23#[cfg(feature = "diagnostics")]
24use crate::db::DataStore;
25#[cfg(feature = "diagnostics")]
26use crate::db::executor::GroupedCountAttribution;
27#[cfg(feature = "diagnostics")]
28use crate::db::session::sql::projection::{
29 current_pure_covering_decode_local_instructions,
30 current_pure_covering_row_assembly_local_instructions,
31};
32#[cfg(test)]
33use crate::db::sql::parser::parse_sql;
34use crate::db::sql::parser::{SqlDeleteStatement, SqlInsertStatement, SqlUpdateStatement};
35use crate::{
36 db::{
37 DbSession, GroupedRow, MissingRowPolicy, PersistedRow, QueryError,
38 commit::CommitSchemaFingerprint,
39 executor::{EntityAuthority, SharedPreparedExecutionPlan},
40 query::intent::StructuralQuery,
41 schema::commit_schema_fingerprint_for_entity,
42 session::query::QueryPlanCacheAttribution,
43 session::sql::projection::{
44 projection_fixed_scales_from_projection_spec, projection_labels_from_projection_spec,
45 },
46 sql::identifier::identifiers_tail_match,
47 sql::lowering::{
48 LoweredBaseQueryShape, LoweredSqlCommand, LoweredSqlQuery,
49 SqlGlobalAggregateCommandCore, SqlLoweringError,
50 bind_lowered_sql_select_query_structural,
51 compile_sql_global_aggregate_command_core_from_prepared,
52 lower_sql_command_from_prepared_statement, prepare_sql_statement,
53 },
54 sql::parser::{SqlStatement, parse_sql_with_attribution},
55 },
56 traits::{CanisterKind, EntityValue},
57};
58
59#[cfg(all(test, not(feature = "diagnostics")))]
60pub(crate) use crate::db::session::sql::projection::with_sql_projection_materialization_metrics;
61#[cfg(feature = "diagnostics")]
62pub use crate::db::session::sql::projection::{
63 SqlProjectionMaterializationMetrics, with_sql_projection_materialization_metrics,
64};
65
66#[derive(Debug)]
68pub enum SqlStatementResult {
69 Count {
70 row_count: u32,
71 },
72 Projection {
73 columns: Vec<String>,
74 fixed_scales: Vec<Option<u32>>,
75 rows: Vec<Vec<crate::value::Value>>,
76 row_count: u32,
77 },
78 ProjectionText {
79 columns: Vec<String>,
80 rows: Vec<Vec<String>>,
81 row_count: u32,
82 },
83 Grouped {
84 columns: Vec<String>,
85 fixed_scales: Vec<Option<u32>>,
86 rows: Vec<GroupedRow>,
87 row_count: u32,
88 next_cursor: Option<String>,
89 },
90 Explain(String),
91 Describe(crate::db::EntitySchemaDescription),
92 ShowIndexes(Vec<String>),
93 ShowColumns(Vec<crate::db::EntityFieldDescription>),
94 ShowEntities(Vec<String>),
95}
96
97#[cfg(feature = "diagnostics")]
108#[derive(CandidType, Clone, Debug, Deserialize, Eq, PartialEq)]
109pub struct SqlQueryExecutionAttribution {
110 pub compile_local_instructions: u64,
111 pub compile_cache_key_local_instructions: u64,
112 pub compile_cache_lookup_local_instructions: u64,
113 pub compile_parse_local_instructions: u64,
114 pub compile_parse_tokenize_local_instructions: u64,
115 pub compile_parse_select_local_instructions: u64,
116 pub compile_parse_expr_local_instructions: u64,
117 pub compile_parse_predicate_local_instructions: u64,
118 pub compile_aggregate_lane_check_local_instructions: u64,
119 pub compile_prepare_local_instructions: u64,
120 pub compile_lower_local_instructions: u64,
121 pub compile_bind_local_instructions: u64,
122 pub compile_cache_insert_local_instructions: u64,
123 pub planner_local_instructions: u64,
124 pub store_local_instructions: u64,
125 pub executor_local_instructions: u64,
126 pub grouped_stream_local_instructions: u64,
127 pub grouped_fold_local_instructions: u64,
128 pub grouped_finalize_local_instructions: u64,
129 pub grouped_count_borrowed_hash_computations: u64,
130 pub grouped_count_bucket_candidate_checks: u64,
131 pub grouped_count_existing_group_hits: u64,
132 pub grouped_count_new_group_inserts: u64,
133 pub grouped_count_row_materialization_local_instructions: u64,
134 pub grouped_count_group_lookup_local_instructions: u64,
135 pub grouped_count_existing_group_update_local_instructions: u64,
136 pub grouped_count_new_group_insert_local_instructions: u64,
137 pub pure_covering_decode_local_instructions: u64,
138 pub pure_covering_row_assembly_local_instructions: u64,
139 pub store_get_calls: u64,
140 pub response_decode_local_instructions: u64,
141 pub execute_local_instructions: u64,
142 pub total_local_instructions: u64,
143 pub sql_compiled_command_cache_hits: u64,
144 pub sql_compiled_command_cache_misses: u64,
145 pub shared_query_plan_cache_hits: u64,
146 pub shared_query_plan_cache_misses: u64,
147}
148
149#[cfg(feature = "diagnostics")]
153#[derive(Clone, Copy, Debug, Eq, PartialEq)]
154pub(in crate::db) struct SqlExecutePhaseAttribution {
155 pub planner_local_instructions: u64,
156 pub store_local_instructions: u64,
157 pub executor_local_instructions: u64,
158 pub grouped_stream_local_instructions: u64,
159 pub grouped_fold_local_instructions: u64,
160 pub grouped_finalize_local_instructions: u64,
161 pub grouped_count: GroupedCountAttribution,
162}
163
164#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
176pub(in crate::db) struct SqlCompilePhaseAttribution {
177 pub cache_key: u64,
178 pub cache_lookup: u64,
179 pub parse: u64,
180 pub parse_tokenize: u64,
181 pub parse_select: u64,
182 pub parse_expr: u64,
183 pub parse_predicate: u64,
184 pub aggregate_lane_check: u64,
185 pub prepare: u64,
186 pub lower: u64,
187 pub bind: u64,
188 pub cache_insert: u64,
189}
190
191impl SqlCompilePhaseAttribution {
192 #[must_use]
193 const fn cache_hit(cache_key: u64, cache_lookup: u64) -> Self {
194 Self {
195 cache_key,
196 cache_lookup,
197 parse: 0,
198 parse_tokenize: 0,
199 parse_select: 0,
200 parse_expr: 0,
201 parse_predicate: 0,
202 aggregate_lane_check: 0,
203 prepare: 0,
204 lower: 0,
205 bind: 0,
206 cache_insert: 0,
207 }
208 }
209}
210
211#[cfg(feature = "diagnostics")]
212impl SqlExecutePhaseAttribution {
213 #[must_use]
214 pub(in crate::db) const fn from_execute_total_and_store_total(
215 execute_local_instructions: u64,
216 store_local_instructions: u64,
217 ) -> Self {
218 Self {
219 planner_local_instructions: 0,
220 store_local_instructions,
221 executor_local_instructions: execute_local_instructions
222 .saturating_sub(store_local_instructions),
223 grouped_stream_local_instructions: 0,
224 grouped_fold_local_instructions: 0,
225 grouped_finalize_local_instructions: 0,
226 grouped_count: GroupedCountAttribution::none(),
227 }
228 }
229}
230
231#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
237pub(in crate::db) struct SqlCacheAttribution {
238 pub sql_compiled_command_cache_hits: u64,
239 pub sql_compiled_command_cache_misses: u64,
240 pub shared_query_plan_cache_hits: u64,
241 pub shared_query_plan_cache_misses: u64,
242}
243
244impl SqlCacheAttribution {
245 #[must_use]
246 const fn none() -> Self {
247 Self {
248 sql_compiled_command_cache_hits: 0,
249 sql_compiled_command_cache_misses: 0,
250 shared_query_plan_cache_hits: 0,
251 shared_query_plan_cache_misses: 0,
252 }
253 }
254
255 #[must_use]
256 const fn sql_compiled_command_cache_hit() -> Self {
257 Self {
258 sql_compiled_command_cache_hits: 1,
259 ..Self::none()
260 }
261 }
262
263 #[must_use]
264 const fn sql_compiled_command_cache_miss() -> Self {
265 Self {
266 sql_compiled_command_cache_misses: 1,
267 ..Self::none()
268 }
269 }
270
271 #[must_use]
272 const fn from_shared_query_plan_cache(attribution: QueryPlanCacheAttribution) -> Self {
273 Self {
274 shared_query_plan_cache_hits: attribution.hits,
275 shared_query_plan_cache_misses: attribution.misses,
276 ..Self::none()
277 }
278 }
279
280 #[must_use]
281 const fn merge(self, other: Self) -> Self {
282 Self {
283 sql_compiled_command_cache_hits: self
284 .sql_compiled_command_cache_hits
285 .saturating_add(other.sql_compiled_command_cache_hits),
286 sql_compiled_command_cache_misses: self
287 .sql_compiled_command_cache_misses
288 .saturating_add(other.sql_compiled_command_cache_misses),
289 shared_query_plan_cache_hits: self
290 .shared_query_plan_cache_hits
291 .saturating_add(other.shared_query_plan_cache_hits),
292 shared_query_plan_cache_misses: self
293 .shared_query_plan_cache_misses
294 .saturating_add(other.shared_query_plan_cache_misses),
295 }
296 }
297}
298
299#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
300enum SqlCompiledCommandSurface {
301 Query,
302 Update,
303}
304
305#[derive(Clone, Debug, Eq, Hash, PartialEq)]
316pub(in crate::db) struct SqlCompiledCommandCacheKey {
317 cache_method_version: u8,
318 surface: SqlCompiledCommandSurface,
319 entity_path: &'static str,
320 schema_fingerprint: CommitSchemaFingerprint,
321 sql: String,
322}
323
324#[derive(Clone, Debug)]
334pub(in crate::db) struct SqlProjectionContract {
335 columns: Vec<String>,
336 fixed_scales: Vec<Option<u32>>,
337}
338
339impl SqlProjectionContract {
340 #[must_use]
341 pub(in crate::db) const fn new(columns: Vec<String>, fixed_scales: Vec<Option<u32>>) -> Self {
342 Self {
343 columns,
344 fixed_scales,
345 }
346 }
347
348 #[must_use]
349 pub(in crate::db) fn into_parts(self) -> (Vec<String>, Vec<Option<u32>>) {
350 (self.columns, self.fixed_scales)
351 }
352}
353
354impl SqlCompiledCommandCacheKey {
355 fn for_entity<E>(surface: SqlCompiledCommandSurface, sql: &str) -> Self
356 where
357 E: PersistedRow + EntityValue,
358 {
359 Self {
360 cache_method_version: SQL_COMPILED_COMMAND_CACHE_METHOD_VERSION,
361 surface,
362 entity_path: E::PATH,
363 schema_fingerprint: commit_schema_fingerprint_for_entity::<E>(),
364 sql: sql.to_string(),
365 }
366 }
367
368 #[must_use]
369 pub(in crate::db) const fn schema_fingerprint(&self) -> CommitSchemaFingerprint {
370 self.schema_fingerprint
371 }
372}
373
374#[cfg(test)]
375impl SqlCompiledCommandCacheKey {
376 pub(in crate::db) fn query_for_entity_with_method_version<E>(
377 sql: &str,
378 cache_method_version: u8,
379 ) -> Self
380 where
381 E: PersistedRow + EntityValue,
382 {
383 Self::for_entity_with_method_version::<E>(
384 SqlCompiledCommandSurface::Query,
385 sql,
386 cache_method_version,
387 )
388 }
389
390 pub(in crate::db) fn update_for_entity_with_method_version<E>(
391 sql: &str,
392 cache_method_version: u8,
393 ) -> Self
394 where
395 E: PersistedRow + EntityValue,
396 {
397 Self::for_entity_with_method_version::<E>(
398 SqlCompiledCommandSurface::Update,
399 sql,
400 cache_method_version,
401 )
402 }
403
404 fn for_entity_with_method_version<E>(
405 surface: SqlCompiledCommandSurface,
406 sql: &str,
407 cache_method_version: u8,
408 ) -> Self
409 where
410 E: PersistedRow + EntityValue,
411 {
412 Self {
413 cache_method_version,
414 surface,
415 entity_path: E::PATH,
416 schema_fingerprint: commit_schema_fingerprint_for_entity::<E>(),
417 sql: sql.to_string(),
418 }
419 }
420}
421
422pub(in crate::db) type SqlCompiledCommandCache =
423 HashMap<SqlCompiledCommandCacheKey, CompiledSqlCommand>;
424
425thread_local! {
426 static SQL_COMPILED_COMMAND_CACHES: RefCell<HashMap<usize, SqlCompiledCommandCache>> =
430 RefCell::new(HashMap::default());
431}
432
433#[derive(Clone, Debug)]
437pub(in crate::db) enum CompiledSqlCommand {
438 Select {
439 query: Arc<StructuralQuery>,
440 compiled_cache_key: SqlCompiledCommandCacheKey,
441 },
442 Delete {
443 query: LoweredBaseQueryShape,
444 statement: SqlDeleteStatement,
445 },
446 GlobalAggregate {
447 command: Box<SqlGlobalAggregateCommandCore>,
448 },
449 Explain(LoweredSqlCommand),
450 Insert(SqlInsertStatement),
451 Update(SqlUpdateStatement),
452 DescribeEntity,
453 ShowIndexesEntity,
454 ShowColumnsEntity,
455 ShowEntities,
456}
457
458#[cfg(test)]
461pub(in crate::db) fn parse_sql_statement(sql: &str) -> Result<SqlStatement, QueryError> {
462 parse_sql(sql).map_err(QueryError::from_sql_parse_error)
463}
464
465#[cfg(feature = "diagnostics")]
466#[expect(
467 clippy::missing_const_for_fn,
468 reason = "the wasm32 branch reads the runtime performance counter and cannot be const"
469)]
470fn read_sql_local_instruction_counter() -> u64 {
471 #[cfg(all(feature = "diagnostics", target_arch = "wasm32"))]
472 {
473 canic_cdk::api::performance_counter(1)
474 }
475
476 #[cfg(not(all(feature = "diagnostics", target_arch = "wasm32")))]
477 {
478 0
479 }
480}
481
482pub(in crate::db::session::sql) fn measure_sql_stage<T, E>(
483 run: impl FnOnce() -> Result<T, E>,
484) -> (u64, Result<T, E>) {
485 #[cfg(feature = "diagnostics")]
486 let start = read_sql_local_instruction_counter();
487
488 let result = run();
489
490 #[cfg(feature = "diagnostics")]
491 let delta = read_sql_local_instruction_counter().saturating_sub(start);
492
493 #[cfg(not(feature = "diagnostics"))]
494 let delta = 0;
495
496 (delta, result)
497}
498
499impl<C: CanisterKind> DbSession<C> {
500 fn with_sql_compiled_command_cache<R>(
501 &self,
502 f: impl FnOnce(&mut SqlCompiledCommandCache) -> R,
503 ) -> R {
504 let scope_id = self.db.cache_scope_id();
505
506 SQL_COMPILED_COMMAND_CACHES.with(|caches| {
507 let mut caches = caches.borrow_mut();
508 let cache = caches.entry(scope_id).or_default();
509
510 f(cache)
511 })
512 }
513
514 #[cfg(test)]
515 pub(in crate::db) fn sql_compiled_command_cache_len(&self) -> usize {
516 self.with_sql_compiled_command_cache(|cache| cache.len())
517 }
518
519 #[cfg(test)]
520 pub(in crate::db) fn clear_sql_caches_for_tests(&self) {
521 self.with_sql_compiled_command_cache(SqlCompiledCommandCache::clear);
522 }
523
524 #[expect(clippy::too_many_lines)]
527 fn compile_sql_statement_for_authority(
528 statement: &SqlStatement,
529 authority: EntityAuthority,
530 compiled_cache_key: SqlCompiledCommandCacheKey,
531 ) -> Result<(CompiledSqlCommand, u64, u64, u64, u64), QueryError> {
532 let prepare_statement = || {
536 measure_sql_stage(|| {
537 prepare_sql_statement(statement.clone(), authority.model().name())
538 .map_err(QueryError::from_sql_lowering_error)
539 })
540 };
541
542 let validate_metadata_entity = |sql_entity: &str| {
545 if identifiers_tail_match(sql_entity, authority.model().name()) {
546 return Ok(());
547 }
548
549 Err(QueryError::from_sql_lowering_error(
550 SqlLoweringError::EntityMismatch {
551 sql_entity: sql_entity.to_string(),
552 expected_entity: authority.model().name(),
553 },
554 ))
555 };
556
557 match statement {
558 SqlStatement::Select(_) => {
559 let (prepare_local_instructions, prepared) = prepare_statement();
560 let prepared = prepared?;
561 let (aggregate_lane_check_local_instructions, requires_aggregate_lane) =
562 measure_sql_stage(|| {
563 Ok::<_, QueryError>(prepared.statement().is_global_aggregate_lane_shape())
564 });
565 let requires_aggregate_lane = requires_aggregate_lane?;
566
567 if requires_aggregate_lane {
568 let (lower_local_instructions, command) = measure_sql_stage(|| {
569 compile_sql_global_aggregate_command_core_from_prepared(
570 prepared,
571 authority.model(),
572 MissingRowPolicy::Ignore,
573 )
574 .map_err(QueryError::from_sql_lowering_error)
575 });
576 let command = command?;
577
578 Ok((
579 CompiledSqlCommand::GlobalAggregate {
580 command: Box::new(command),
581 },
582 aggregate_lane_check_local_instructions,
583 prepare_local_instructions,
584 lower_local_instructions,
585 0,
586 ))
587 } else {
588 let (lower_local_instructions, lowered) = measure_sql_stage(|| {
589 lower_sql_command_from_prepared_statement(prepared, authority.model()).map_err(
590 |err| match err {
591 SqlLoweringError::UnexpectedQueryLaneStatement => {
592 QueryError::invariant(
593 "query-lane SQL lowering reached a non query-compatible statement",
594 )
595 }
596 other => QueryError::from_sql_lowering_error(other),
597 },
598 )
599 });
600 let lowered = lowered?;
601 let Some(LoweredSqlQuery::Select(select)) = lowered.into_query() else {
602 return Err(QueryError::invariant(
603 "compiled SQL SELECT lane must lower to lowered SQL SELECT",
604 ));
605 };
606 let (bind_local_instructions, query) = measure_sql_stage(|| {
607 bind_lowered_sql_select_query_structural(
608 authority.model(),
609 select,
610 MissingRowPolicy::Ignore,
611 )
612 .map_err(QueryError::from_sql_lowering_error)
613 });
614 let query = query?;
615
616 Ok((
617 CompiledSqlCommand::Select {
618 query: Arc::new(query),
619 compiled_cache_key,
620 },
621 aggregate_lane_check_local_instructions,
622 prepare_local_instructions,
623 lower_local_instructions,
624 bind_local_instructions,
625 ))
626 }
627 }
628 SqlStatement::Delete(_) => {
629 let (prepare_local_instructions, prepared) = prepare_statement();
630 let prepared = prepared?;
631 let normalized_statement = prepared.clone().into_statement();
632 let (lower_local_instructions, lowered) = measure_sql_stage(|| {
633 lower_sql_command_from_prepared_statement(prepared, authority.model())
634 .map_err(QueryError::from_sql_lowering_error)
635 });
636 let lowered = lowered?;
637 let Some(LoweredSqlQuery::Delete(query)) = lowered.into_query() else {
638 return Err(QueryError::invariant(
639 "compiled SQL DELETE lane must lower to lowered SQL DELETE",
640 ));
641 };
642 let SqlStatement::Delete(statement) = normalized_statement else {
643 return Err(QueryError::invariant(
644 "prepared SQL DELETE compilation must preserve DELETE statement ownership",
645 ));
646 };
647
648 Ok((
649 CompiledSqlCommand::Delete { query, statement },
650 0,
651 prepare_local_instructions,
652 lower_local_instructions,
653 0,
654 ))
655 }
656 SqlStatement::Insert(_) => {
657 let (prepare_local_instructions, prepared) = prepare_statement();
658 let prepared = prepared?;
659 let SqlStatement::Insert(statement) = prepared.into_statement() else {
660 return Err(QueryError::invariant(
661 "prepared SQL INSERT compilation must preserve INSERT statement ownership",
662 ));
663 };
664
665 Ok((
666 CompiledSqlCommand::Insert(statement),
667 0,
668 prepare_local_instructions,
669 0,
670 0,
671 ))
672 }
673 SqlStatement::Update(_) => {
674 let (prepare_local_instructions, prepared) = prepare_statement();
675 let prepared = prepared?;
676 let SqlStatement::Update(statement) = prepared.into_statement() else {
677 return Err(QueryError::invariant(
678 "prepared SQL UPDATE compilation must preserve UPDATE statement ownership",
679 ));
680 };
681
682 Ok((
683 CompiledSqlCommand::Update(statement),
684 0,
685 prepare_local_instructions,
686 0,
687 0,
688 ))
689 }
690 SqlStatement::Explain(_) => {
691 let (prepare_local_instructions, prepared) = prepare_statement();
692 let prepared = prepared?;
693 let (lower_local_instructions, lowered) = measure_sql_stage(|| {
694 lower_sql_command_from_prepared_statement(prepared, authority.model())
695 .map_err(QueryError::from_sql_lowering_error)
696 });
697 let lowered = lowered?;
698
699 Ok((
700 CompiledSqlCommand::Explain(lowered),
701 0,
702 prepare_local_instructions,
703 lower_local_instructions,
704 0,
705 ))
706 }
707 SqlStatement::Describe(_) => {
708 let (prepare_local_instructions, validated) = measure_sql_stage(|| {
709 let SqlStatement::Describe(statement) = statement else {
710 return Err(QueryError::invariant(
711 "compiled SQL DESCRIBE lane must preserve DESCRIBE statement ownership",
712 ));
713 };
714
715 validate_metadata_entity(statement.entity.as_str())
716 });
717 validated?;
718
719 Ok((
720 CompiledSqlCommand::DescribeEntity,
721 0,
722 prepare_local_instructions,
723 0,
724 0,
725 ))
726 }
727 SqlStatement::ShowIndexes(entity) => {
728 let (prepare_local_instructions, validated) =
729 measure_sql_stage(|| validate_metadata_entity(entity.entity.as_str()));
730 validated?;
731
732 Ok((
733 CompiledSqlCommand::ShowIndexesEntity,
734 0,
735 prepare_local_instructions,
736 0,
737 0,
738 ))
739 }
740 SqlStatement::ShowColumns(entity) => {
741 let (prepare_local_instructions, validated) =
742 measure_sql_stage(|| validate_metadata_entity(entity.entity.as_str()));
743 validated?;
744
745 Ok((
746 CompiledSqlCommand::ShowColumnsEntity,
747 0,
748 prepare_local_instructions,
749 0,
750 0,
751 ))
752 }
753 SqlStatement::ShowEntities(_) => Ok((CompiledSqlCommand::ShowEntities, 0, 0, 0, 0)),
754 }
755 }
756
757 fn sql_select_prepared_plan(
760 &self,
761 query: &StructuralQuery,
762 authority: EntityAuthority,
763 cache_schema_fingerprint: CommitSchemaFingerprint,
764 ) -> Result<
765 (
766 SharedPreparedExecutionPlan,
767 SqlProjectionContract,
768 SqlCacheAttribution,
769 ),
770 QueryError,
771 > {
772 let (prepared_plan, cache_attribution) = self.cached_shared_query_plan_for_authority(
773 authority,
774 cache_schema_fingerprint,
775 query,
776 )?;
777 let projection_spec = prepared_plan
778 .logical_plan()
779 .projection_spec(authority.model());
780 let projection = SqlProjectionContract::new(
781 projection_labels_from_projection_spec(&projection_spec),
782 projection_fixed_scales_from_projection_spec(&projection_spec),
783 );
784
785 Ok((
786 prepared_plan,
787 projection,
788 SqlCacheAttribution::from_shared_query_plan_cache(cache_attribution),
789 ))
790 }
791
792 fn ensure_sql_statement_supported_for_surface(
796 statement: &SqlStatement,
797 surface: SqlCompiledCommandSurface,
798 ) -> Result<(), QueryError> {
799 match (surface, statement) {
800 (
801 SqlCompiledCommandSurface::Query,
802 SqlStatement::Select(_)
803 | SqlStatement::Explain(_)
804 | SqlStatement::Describe(_)
805 | SqlStatement::ShowIndexes(_)
806 | SqlStatement::ShowColumns(_)
807 | SqlStatement::ShowEntities(_),
808 )
809 | (
810 SqlCompiledCommandSurface::Update,
811 SqlStatement::Insert(_) | SqlStatement::Update(_) | SqlStatement::Delete(_),
812 ) => Ok(()),
813 (SqlCompiledCommandSurface::Query, SqlStatement::Insert(_)) => {
814 Err(QueryError::unsupported_query(
815 "execute_sql_query rejects INSERT; use execute_sql_update::<E>()",
816 ))
817 }
818 (SqlCompiledCommandSurface::Query, SqlStatement::Update(_)) => {
819 Err(QueryError::unsupported_query(
820 "execute_sql_query rejects UPDATE; use execute_sql_update::<E>()",
821 ))
822 }
823 (SqlCompiledCommandSurface::Query, SqlStatement::Delete(_)) => {
824 Err(QueryError::unsupported_query(
825 "execute_sql_query rejects DELETE; use execute_sql_update::<E>()",
826 ))
827 }
828 (SqlCompiledCommandSurface::Update, SqlStatement::Select(_)) => {
829 Err(QueryError::unsupported_query(
830 "execute_sql_update rejects SELECT; use execute_sql_query::<E>()",
831 ))
832 }
833 (SqlCompiledCommandSurface::Update, SqlStatement::Explain(_)) => {
834 Err(QueryError::unsupported_query(
835 "execute_sql_update rejects EXPLAIN; use execute_sql_query::<E>()",
836 ))
837 }
838 (SqlCompiledCommandSurface::Update, SqlStatement::Describe(_)) => {
839 Err(QueryError::unsupported_query(
840 "execute_sql_update rejects DESCRIBE; use execute_sql_query::<E>()",
841 ))
842 }
843 (SqlCompiledCommandSurface::Update, SqlStatement::ShowIndexes(_)) => {
844 Err(QueryError::unsupported_query(
845 "execute_sql_update rejects SHOW INDEXES; use execute_sql_query::<E>()",
846 ))
847 }
848 (SqlCompiledCommandSurface::Update, SqlStatement::ShowColumns(_)) => {
849 Err(QueryError::unsupported_query(
850 "execute_sql_update rejects SHOW COLUMNS; use execute_sql_query::<E>()",
851 ))
852 }
853 (SqlCompiledCommandSurface::Update, SqlStatement::ShowEntities(_)) => {
854 Err(QueryError::unsupported_query(
855 "execute_sql_update rejects SHOW ENTITIES; use execute_sql_query::<E>()",
856 ))
857 }
858 }
859 }
860
861 pub fn execute_sql_query<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
866 where
867 E: PersistedRow<Canister = C> + EntityValue,
868 {
869 let compiled = self.compile_sql_query::<E>(sql)?;
870
871 self.execute_compiled_sql::<E>(&compiled)
872 }
873
874 #[cfg(feature = "diagnostics")]
877 #[doc(hidden)]
878 pub fn execute_sql_query_with_attribution<E>(
879 &self,
880 sql: &str,
881 ) -> Result<(SqlStatementResult, SqlQueryExecutionAttribution), QueryError>
882 where
883 E: PersistedRow<Canister = C> + EntityValue,
884 {
885 let (compile_local_instructions, compiled) =
888 measure_sql_stage(|| self.compile_sql_query_with_cache_attribution::<E>(sql));
889 let (compiled, compile_cache_attribution, compile_phase_attribution) = compiled?;
890
891 let store_get_calls_before = DataStore::current_get_call_count();
894 let pure_covering_decode_before = current_pure_covering_decode_local_instructions();
895 let pure_covering_row_assembly_before =
896 current_pure_covering_row_assembly_local_instructions();
897 let (result, execute_cache_attribution, execute_phase_attribution) =
898 self.execute_compiled_sql_with_phase_attribution::<E>(&compiled)?;
899 let store_get_calls =
900 DataStore::current_get_call_count().saturating_sub(store_get_calls_before);
901 let pure_covering_decode_local_instructions =
902 current_pure_covering_decode_local_instructions()
903 .saturating_sub(pure_covering_decode_before);
904 let pure_covering_row_assembly_local_instructions =
905 current_pure_covering_row_assembly_local_instructions()
906 .saturating_sub(pure_covering_row_assembly_before);
907 let execute_local_instructions = execute_phase_attribution
908 .planner_local_instructions
909 .saturating_add(execute_phase_attribution.store_local_instructions)
910 .saturating_add(execute_phase_attribution.executor_local_instructions);
911 let cache_attribution = compile_cache_attribution.merge(execute_cache_attribution);
912 let total_local_instructions =
913 compile_local_instructions.saturating_add(execute_local_instructions);
914
915 Ok((
916 result,
917 SqlQueryExecutionAttribution {
918 compile_local_instructions,
919 compile_cache_key_local_instructions: compile_phase_attribution.cache_key,
920 compile_cache_lookup_local_instructions: compile_phase_attribution.cache_lookup,
921 compile_parse_local_instructions: compile_phase_attribution.parse,
922 compile_parse_tokenize_local_instructions: compile_phase_attribution.parse_tokenize,
923 compile_parse_select_local_instructions: compile_phase_attribution.parse_select,
924 compile_parse_expr_local_instructions: compile_phase_attribution.parse_expr,
925 compile_parse_predicate_local_instructions: compile_phase_attribution
926 .parse_predicate,
927 compile_aggregate_lane_check_local_instructions: compile_phase_attribution
928 .aggregate_lane_check,
929 compile_prepare_local_instructions: compile_phase_attribution.prepare,
930 compile_lower_local_instructions: compile_phase_attribution.lower,
931 compile_bind_local_instructions: compile_phase_attribution.bind,
932 compile_cache_insert_local_instructions: compile_phase_attribution.cache_insert,
933 planner_local_instructions: execute_phase_attribution.planner_local_instructions,
934 store_local_instructions: execute_phase_attribution.store_local_instructions,
935 executor_local_instructions: execute_phase_attribution.executor_local_instructions,
936 grouped_stream_local_instructions: execute_phase_attribution
937 .grouped_stream_local_instructions,
938 grouped_fold_local_instructions: execute_phase_attribution
939 .grouped_fold_local_instructions,
940 grouped_finalize_local_instructions: execute_phase_attribution
941 .grouped_finalize_local_instructions,
942 grouped_count_borrowed_hash_computations: execute_phase_attribution
943 .grouped_count
944 .borrowed_hash_computations,
945 grouped_count_bucket_candidate_checks: execute_phase_attribution
946 .grouped_count
947 .bucket_candidate_checks,
948 grouped_count_existing_group_hits: execute_phase_attribution
949 .grouped_count
950 .existing_group_hits,
951 grouped_count_new_group_inserts: execute_phase_attribution
952 .grouped_count
953 .new_group_inserts,
954 grouped_count_row_materialization_local_instructions: execute_phase_attribution
955 .grouped_count
956 .row_materialization_local_instructions,
957 grouped_count_group_lookup_local_instructions: execute_phase_attribution
958 .grouped_count
959 .group_lookup_local_instructions,
960 grouped_count_existing_group_update_local_instructions: execute_phase_attribution
961 .grouped_count
962 .existing_group_update_local_instructions,
963 grouped_count_new_group_insert_local_instructions: execute_phase_attribution
964 .grouped_count
965 .new_group_insert_local_instructions,
966 pure_covering_decode_local_instructions,
967 pure_covering_row_assembly_local_instructions,
968 store_get_calls,
969 response_decode_local_instructions: 0,
970 execute_local_instructions,
971 total_local_instructions,
972 sql_compiled_command_cache_hits: cache_attribution.sql_compiled_command_cache_hits,
973 sql_compiled_command_cache_misses: cache_attribution
974 .sql_compiled_command_cache_misses,
975 shared_query_plan_cache_hits: cache_attribution.shared_query_plan_cache_hits,
976 shared_query_plan_cache_misses: cache_attribution.shared_query_plan_cache_misses,
977 },
978 ))
979 }
980
981 pub fn execute_sql_update<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
986 where
987 E: PersistedRow<Canister = C> + EntityValue,
988 {
989 let compiled = self.compile_sql_update::<E>(sql)?;
990
991 self.execute_compiled_sql::<E>(&compiled)
992 }
993
994 pub(in crate::db) fn compile_sql_query<E>(
997 &self,
998 sql: &str,
999 ) -> Result<CompiledSqlCommand, QueryError>
1000 where
1001 E: PersistedRow<Canister = C> + EntityValue,
1002 {
1003 self.compile_sql_query_with_cache_attribution::<E>(sql)
1004 .map(|(compiled, _, _)| compiled)
1005 }
1006
1007 fn compile_sql_query_with_cache_attribution<E>(
1008 &self,
1009 sql: &str,
1010 ) -> Result<
1011 (
1012 CompiledSqlCommand,
1013 SqlCacheAttribution,
1014 SqlCompilePhaseAttribution,
1015 ),
1016 QueryError,
1017 >
1018 where
1019 E: PersistedRow<Canister = C> + EntityValue,
1020 {
1021 self.compile_sql_surface_with_cache_attribution::<E>(sql, SqlCompiledCommandSurface::Query)
1022 }
1023
1024 pub(in crate::db) fn compile_sql_update<E>(
1027 &self,
1028 sql: &str,
1029 ) -> Result<CompiledSqlCommand, QueryError>
1030 where
1031 E: PersistedRow<Canister = C> + EntityValue,
1032 {
1033 self.compile_sql_update_with_cache_attribution::<E>(sql)
1034 .map(|(compiled, _, _)| compiled)
1035 }
1036
1037 fn compile_sql_update_with_cache_attribution<E>(
1038 &self,
1039 sql: &str,
1040 ) -> Result<
1041 (
1042 CompiledSqlCommand,
1043 SqlCacheAttribution,
1044 SqlCompilePhaseAttribution,
1045 ),
1046 QueryError,
1047 >
1048 where
1049 E: PersistedRow<Canister = C> + EntityValue,
1050 {
1051 self.compile_sql_surface_with_cache_attribution::<E>(sql, SqlCompiledCommandSurface::Update)
1052 }
1053
1054 fn compile_sql_surface_with_cache_attribution<E>(
1058 &self,
1059 sql: &str,
1060 surface: SqlCompiledCommandSurface,
1061 ) -> Result<
1062 (
1063 CompiledSqlCommand,
1064 SqlCacheAttribution,
1065 SqlCompilePhaseAttribution,
1066 ),
1067 QueryError,
1068 >
1069 where
1070 E: PersistedRow<Canister = C> + EntityValue,
1071 {
1072 let (cache_key_local_instructions, cache_key) = measure_sql_stage(|| {
1073 Ok::<_, QueryError>(SqlCompiledCommandCacheKey::for_entity::<E>(surface, sql))
1074 });
1075 let cache_key = cache_key?;
1076
1077 self.compile_sql_statement_with_cache::<E, _>(
1078 cache_key,
1079 cache_key_local_instructions,
1080 sql,
1081 |statement| Self::ensure_sql_statement_supported_for_surface(statement, surface),
1082 )
1083 }
1084
1085 fn compile_sql_statement_with_cache<E, F>(
1088 &self,
1089 cache_key: SqlCompiledCommandCacheKey,
1090 cache_key_local_instructions: u64,
1091 sql: &str,
1092 ensure_surface_supported: F,
1093 ) -> Result<
1094 (
1095 CompiledSqlCommand,
1096 SqlCacheAttribution,
1097 SqlCompilePhaseAttribution,
1098 ),
1099 QueryError,
1100 >
1101 where
1102 E: PersistedRow<Canister = C> + EntityValue,
1103 F: FnOnce(&SqlStatement) -> Result<(), QueryError>,
1104 {
1105 let (cache_lookup_local_instructions, cached) = measure_sql_stage(|| {
1106 let cached =
1107 self.with_sql_compiled_command_cache(|cache| cache.get(&cache_key).cloned());
1108 Ok::<_, QueryError>(cached)
1109 });
1110 let cached = cached?;
1111 if let Some(compiled) = cached {
1112 return Ok((
1113 compiled,
1114 SqlCacheAttribution::sql_compiled_command_cache_hit(),
1115 SqlCompilePhaseAttribution::cache_hit(
1116 cache_key_local_instructions,
1117 cache_lookup_local_instructions,
1118 ),
1119 ));
1120 }
1121
1122 let (parse_local_instructions, parsed) = measure_sql_stage(|| {
1123 parse_sql_with_attribution(sql).map_err(QueryError::from_sql_parse_error)
1124 });
1125 let (parsed, parse_attribution) = parsed?;
1126 let parse_select_local_instructions = parse_local_instructions
1127 .saturating_sub(parse_attribution.tokenize)
1128 .saturating_sub(parse_attribution.expr)
1129 .saturating_sub(parse_attribution.predicate);
1130 ensure_surface_supported(&parsed)?;
1131 let authority = EntityAuthority::for_type::<E>();
1132 let (
1133 compiled,
1134 aggregate_lane_check_local_instructions,
1135 prepare_local_instructions,
1136 lower_local_instructions,
1137 bind_local_instructions,
1138 ) = Self::compile_sql_statement_for_authority(&parsed, authority, cache_key.clone())?;
1139
1140 let (cache_insert_local_instructions, cache_insert) = measure_sql_stage(|| {
1141 self.with_sql_compiled_command_cache(|cache| {
1142 cache.insert(cache_key, compiled.clone());
1143 });
1144 Ok::<_, QueryError>(())
1145 });
1146 cache_insert?;
1147
1148 Ok((
1149 compiled,
1150 SqlCacheAttribution::sql_compiled_command_cache_miss(),
1151 SqlCompilePhaseAttribution {
1152 cache_key: cache_key_local_instructions,
1153 cache_lookup: cache_lookup_local_instructions,
1154 parse: parse_local_instructions,
1155 parse_tokenize: parse_attribution.tokenize,
1156 parse_select: parse_select_local_instructions,
1157 parse_expr: parse_attribution.expr,
1158 parse_predicate: parse_attribution.predicate,
1159 aggregate_lane_check: aggregate_lane_check_local_instructions,
1160 prepare: prepare_local_instructions,
1161 lower: lower_local_instructions,
1162 bind: bind_local_instructions,
1163 cache_insert: cache_insert_local_instructions,
1164 },
1165 ))
1166 }
1167}