1mod execute;
8mod projection;
9
10#[cfg(feature = "diagnostics")]
11use candid::CandidType;
12#[cfg(feature = "diagnostics")]
13use serde::Deserialize;
14use std::{cell::RefCell, collections::HashMap, sync::Arc};
15
16const SQL_COMPILED_COMMAND_CACHE_METHOD_VERSION: u8 = 1;
22
23#[cfg(feature = "diagnostics")]
24use crate::db::DataStore;
25#[cfg(feature = "diagnostics")]
26use crate::db::executor::GroupedCountAttribution;
27#[cfg(feature = "diagnostics")]
28use crate::db::session::sql::projection::{
29 current_pure_covering_decode_local_instructions,
30 current_pure_covering_row_assembly_local_instructions,
31};
32#[cfg(test)]
33use crate::db::sql::parser::parse_sql;
34use crate::db::sql::parser::{SqlDeleteStatement, SqlInsertStatement, SqlUpdateStatement};
35use crate::{
36 db::{
37 DbSession, GroupedRow, MissingRowPolicy, PersistedRow, QueryError,
38 commit::CommitSchemaFingerprint,
39 executor::{EntityAuthority, SharedPreparedExecutionPlan},
40 query::intent::StructuralQuery,
41 schema::commit_schema_fingerprint_for_entity,
42 session::query::QueryPlanCacheAttribution,
43 session::sql::projection::{
44 projection_fixed_scales_from_projection_spec, projection_labels_from_projection_spec,
45 },
46 sql::identifier::identifiers_tail_match,
47 sql::lowering::{
48 LoweredBaseQueryShape, LoweredSqlCommand, LoweredSqlQuery,
49 SqlGlobalAggregateCommandCore, SqlLoweringError,
50 bind_lowered_sql_select_query_structural,
51 compile_sql_global_aggregate_command_core_from_prepared,
52 lower_sql_command_from_prepared_statement, prepare_sql_statement,
53 },
54 sql::parser::{SqlStatement, parse_sql_with_attribution},
55 },
56 traits::{CanisterKind, EntityValue},
57 value::OutputValue,
58};
59
60#[cfg(all(test, not(feature = "diagnostics")))]
61pub(crate) use crate::db::session::sql::projection::with_sql_projection_materialization_metrics;
62#[cfg(feature = "diagnostics")]
63pub use crate::db::session::sql::projection::{
64 SqlProjectionMaterializationMetrics, with_sql_projection_materialization_metrics,
65};
66
67#[derive(Debug)]
69pub enum SqlStatementResult {
70 Count {
71 row_count: u32,
72 },
73 Projection {
74 columns: Vec<String>,
75 fixed_scales: Vec<Option<u32>>,
76 rows: Vec<Vec<OutputValue>>,
77 row_count: u32,
78 },
79 ProjectionText {
80 columns: Vec<String>,
81 rows: Vec<Vec<String>>,
82 row_count: u32,
83 },
84 Grouped {
85 columns: Vec<String>,
86 fixed_scales: Vec<Option<u32>>,
87 rows: Vec<GroupedRow>,
88 row_count: u32,
89 next_cursor: Option<String>,
90 },
91 Explain(String),
92 Describe(crate::db::EntitySchemaDescription),
93 ShowIndexes(Vec<String>),
94 ShowColumns(Vec<crate::db::EntityFieldDescription>),
95 ShowEntities(Vec<String>),
96}
97
98#[cfg(feature = "diagnostics")]
109#[derive(CandidType, Clone, Debug, Deserialize, Eq, PartialEq)]
110pub struct SqlQueryExecutionAttribution {
111 pub compile_local_instructions: u64,
112 pub compile_cache_key_local_instructions: u64,
113 pub compile_cache_lookup_local_instructions: u64,
114 pub compile_parse_local_instructions: u64,
115 pub compile_parse_tokenize_local_instructions: u64,
116 pub compile_parse_select_local_instructions: u64,
117 pub compile_parse_expr_local_instructions: u64,
118 pub compile_parse_predicate_local_instructions: u64,
119 pub compile_aggregate_lane_check_local_instructions: u64,
120 pub compile_prepare_local_instructions: u64,
121 pub compile_lower_local_instructions: u64,
122 pub compile_bind_local_instructions: u64,
123 pub compile_cache_insert_local_instructions: u64,
124 pub planner_local_instructions: u64,
125 pub store_local_instructions: u64,
126 pub executor_local_instructions: u64,
127 pub grouped_stream_local_instructions: u64,
128 pub grouped_fold_local_instructions: u64,
129 pub grouped_finalize_local_instructions: u64,
130 pub grouped_count_borrowed_hash_computations: u64,
131 pub grouped_count_bucket_candidate_checks: u64,
132 pub grouped_count_existing_group_hits: u64,
133 pub grouped_count_new_group_inserts: u64,
134 pub grouped_count_row_materialization_local_instructions: u64,
135 pub grouped_count_group_lookup_local_instructions: u64,
136 pub grouped_count_existing_group_update_local_instructions: u64,
137 pub grouped_count_new_group_insert_local_instructions: u64,
138 pub pure_covering_decode_local_instructions: u64,
139 pub pure_covering_row_assembly_local_instructions: u64,
140 pub store_get_calls: u64,
141 pub response_decode_local_instructions: u64,
142 pub execute_local_instructions: u64,
143 pub total_local_instructions: u64,
144 pub sql_compiled_command_cache_hits: u64,
145 pub sql_compiled_command_cache_misses: u64,
146 pub shared_query_plan_cache_hits: u64,
147 pub shared_query_plan_cache_misses: u64,
148}
149
150#[cfg(feature = "diagnostics")]
154#[derive(Clone, Copy, Debug, Eq, PartialEq)]
155pub(in crate::db) struct SqlExecutePhaseAttribution {
156 pub planner_local_instructions: u64,
157 pub store_local_instructions: u64,
158 pub executor_local_instructions: u64,
159 pub grouped_stream_local_instructions: u64,
160 pub grouped_fold_local_instructions: u64,
161 pub grouped_finalize_local_instructions: u64,
162 pub grouped_count: GroupedCountAttribution,
163}
164
165#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
177pub(in crate::db) struct SqlCompilePhaseAttribution {
178 pub cache_key: u64,
179 pub cache_lookup: u64,
180 pub parse: u64,
181 pub parse_tokenize: u64,
182 pub parse_select: u64,
183 pub parse_expr: u64,
184 pub parse_predicate: u64,
185 pub aggregate_lane_check: u64,
186 pub prepare: u64,
187 pub lower: u64,
188 pub bind: u64,
189 pub cache_insert: u64,
190}
191
192impl SqlCompilePhaseAttribution {
193 #[must_use]
194 const fn cache_hit(cache_key: u64, cache_lookup: u64) -> Self {
195 Self {
196 cache_key,
197 cache_lookup,
198 parse: 0,
199 parse_tokenize: 0,
200 parse_select: 0,
201 parse_expr: 0,
202 parse_predicate: 0,
203 aggregate_lane_check: 0,
204 prepare: 0,
205 lower: 0,
206 bind: 0,
207 cache_insert: 0,
208 }
209 }
210}
211
212#[cfg(feature = "diagnostics")]
213impl SqlExecutePhaseAttribution {
214 #[must_use]
215 pub(in crate::db) const fn from_execute_total_and_store_total(
216 execute_local_instructions: u64,
217 store_local_instructions: u64,
218 ) -> Self {
219 Self {
220 planner_local_instructions: 0,
221 store_local_instructions,
222 executor_local_instructions: execute_local_instructions
223 .saturating_sub(store_local_instructions),
224 grouped_stream_local_instructions: 0,
225 grouped_fold_local_instructions: 0,
226 grouped_finalize_local_instructions: 0,
227 grouped_count: GroupedCountAttribution::none(),
228 }
229 }
230}
231
232#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
238pub(in crate::db) struct SqlCacheAttribution {
239 pub sql_compiled_command_cache_hits: u64,
240 pub sql_compiled_command_cache_misses: u64,
241 pub shared_query_plan_cache_hits: u64,
242 pub shared_query_plan_cache_misses: u64,
243}
244
245impl SqlCacheAttribution {
246 #[must_use]
247 const fn none() -> Self {
248 Self {
249 sql_compiled_command_cache_hits: 0,
250 sql_compiled_command_cache_misses: 0,
251 shared_query_plan_cache_hits: 0,
252 shared_query_plan_cache_misses: 0,
253 }
254 }
255
256 #[must_use]
257 const fn sql_compiled_command_cache_hit() -> Self {
258 Self {
259 sql_compiled_command_cache_hits: 1,
260 ..Self::none()
261 }
262 }
263
264 #[must_use]
265 const fn sql_compiled_command_cache_miss() -> Self {
266 Self {
267 sql_compiled_command_cache_misses: 1,
268 ..Self::none()
269 }
270 }
271
272 #[must_use]
273 const fn from_shared_query_plan_cache(attribution: QueryPlanCacheAttribution) -> Self {
274 Self {
275 shared_query_plan_cache_hits: attribution.hits,
276 shared_query_plan_cache_misses: attribution.misses,
277 ..Self::none()
278 }
279 }
280
281 #[must_use]
282 const fn merge(self, other: Self) -> Self {
283 Self {
284 sql_compiled_command_cache_hits: self
285 .sql_compiled_command_cache_hits
286 .saturating_add(other.sql_compiled_command_cache_hits),
287 sql_compiled_command_cache_misses: self
288 .sql_compiled_command_cache_misses
289 .saturating_add(other.sql_compiled_command_cache_misses),
290 shared_query_plan_cache_hits: self
291 .shared_query_plan_cache_hits
292 .saturating_add(other.shared_query_plan_cache_hits),
293 shared_query_plan_cache_misses: self
294 .shared_query_plan_cache_misses
295 .saturating_add(other.shared_query_plan_cache_misses),
296 }
297 }
298}
299
300#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
301enum SqlCompiledCommandSurface {
302 Query,
303 Update,
304}
305
306#[derive(Clone, Debug, Eq, Hash, PartialEq)]
317pub(in crate::db) struct SqlCompiledCommandCacheKey {
318 cache_method_version: u8,
319 surface: SqlCompiledCommandSurface,
320 entity_path: &'static str,
321 schema_fingerprint: CommitSchemaFingerprint,
322 sql: String,
323}
324
325#[derive(Clone, Debug)]
335pub(in crate::db) struct SqlProjectionContract {
336 columns: Vec<String>,
337 fixed_scales: Vec<Option<u32>>,
338}
339
340impl SqlProjectionContract {
341 #[must_use]
342 pub(in crate::db) const fn new(columns: Vec<String>, fixed_scales: Vec<Option<u32>>) -> Self {
343 Self {
344 columns,
345 fixed_scales,
346 }
347 }
348
349 #[must_use]
350 pub(in crate::db) fn into_parts(self) -> (Vec<String>, Vec<Option<u32>>) {
351 (self.columns, self.fixed_scales)
352 }
353}
354
355impl SqlCompiledCommandCacheKey {
356 fn for_entity<E>(surface: SqlCompiledCommandSurface, sql: &str) -> Self
357 where
358 E: PersistedRow + EntityValue,
359 {
360 Self {
361 cache_method_version: SQL_COMPILED_COMMAND_CACHE_METHOD_VERSION,
362 surface,
363 entity_path: E::PATH,
364 schema_fingerprint: commit_schema_fingerprint_for_entity::<E>(),
365 sql: sql.to_string(),
366 }
367 }
368
369 #[must_use]
370 pub(in crate::db) const fn schema_fingerprint(&self) -> CommitSchemaFingerprint {
371 self.schema_fingerprint
372 }
373}
374
375#[cfg(test)]
376impl SqlCompiledCommandCacheKey {
377 pub(in crate::db) fn query_for_entity_with_method_version<E>(
378 sql: &str,
379 cache_method_version: u8,
380 ) -> Self
381 where
382 E: PersistedRow + EntityValue,
383 {
384 Self::for_entity_with_method_version::<E>(
385 SqlCompiledCommandSurface::Query,
386 sql,
387 cache_method_version,
388 )
389 }
390
391 pub(in crate::db) fn update_for_entity_with_method_version<E>(
392 sql: &str,
393 cache_method_version: u8,
394 ) -> Self
395 where
396 E: PersistedRow + EntityValue,
397 {
398 Self::for_entity_with_method_version::<E>(
399 SqlCompiledCommandSurface::Update,
400 sql,
401 cache_method_version,
402 )
403 }
404
405 fn for_entity_with_method_version<E>(
406 surface: SqlCompiledCommandSurface,
407 sql: &str,
408 cache_method_version: u8,
409 ) -> Self
410 where
411 E: PersistedRow + EntityValue,
412 {
413 Self {
414 cache_method_version,
415 surface,
416 entity_path: E::PATH,
417 schema_fingerprint: commit_schema_fingerprint_for_entity::<E>(),
418 sql: sql.to_string(),
419 }
420 }
421}
422
423pub(in crate::db) type SqlCompiledCommandCache =
424 HashMap<SqlCompiledCommandCacheKey, CompiledSqlCommand>;
425
426thread_local! {
427 static SQL_COMPILED_COMMAND_CACHES: RefCell<HashMap<usize, SqlCompiledCommandCache>> =
431 RefCell::new(HashMap::default());
432}
433
434#[derive(Clone, Debug)]
438pub(in crate::db) enum CompiledSqlCommand {
439 Select {
440 query: Arc<StructuralQuery>,
441 compiled_cache_key: SqlCompiledCommandCacheKey,
442 },
443 Delete {
444 query: LoweredBaseQueryShape,
445 statement: SqlDeleteStatement,
446 },
447 GlobalAggregate {
448 command: Box<SqlGlobalAggregateCommandCore>,
449 },
450 Explain(LoweredSqlCommand),
451 Insert(SqlInsertStatement),
452 Update(SqlUpdateStatement),
453 DescribeEntity,
454 ShowIndexesEntity,
455 ShowColumnsEntity,
456 ShowEntities,
457}
458
459#[cfg(test)]
462pub(in crate::db) fn parse_sql_statement(sql: &str) -> Result<SqlStatement, QueryError> {
463 parse_sql(sql).map_err(QueryError::from_sql_parse_error)
464}
465
466#[cfg(feature = "diagnostics")]
467#[expect(
468 clippy::missing_const_for_fn,
469 reason = "the wasm32 branch reads the runtime performance counter and cannot be const"
470)]
471fn read_sql_local_instruction_counter() -> u64 {
472 #[cfg(all(feature = "diagnostics", target_arch = "wasm32"))]
473 {
474 canic_cdk::api::performance_counter(1)
475 }
476
477 #[cfg(not(all(feature = "diagnostics", target_arch = "wasm32")))]
478 {
479 0
480 }
481}
482
483pub(in crate::db::session::sql) fn measure_sql_stage<T, E>(
484 run: impl FnOnce() -> Result<T, E>,
485) -> (u64, Result<T, E>) {
486 #[cfg(feature = "diagnostics")]
487 let start = read_sql_local_instruction_counter();
488
489 let result = run();
490
491 #[cfg(feature = "diagnostics")]
492 let delta = read_sql_local_instruction_counter().saturating_sub(start);
493
494 #[cfg(not(feature = "diagnostics"))]
495 let delta = 0;
496
497 (delta, result)
498}
499
500impl<C: CanisterKind> DbSession<C> {
501 fn with_sql_compiled_command_cache<R>(
502 &self,
503 f: impl FnOnce(&mut SqlCompiledCommandCache) -> R,
504 ) -> R {
505 let scope_id = self.db.cache_scope_id();
506
507 SQL_COMPILED_COMMAND_CACHES.with(|caches| {
508 let mut caches = caches.borrow_mut();
509 let cache = caches.entry(scope_id).or_default();
510
511 f(cache)
512 })
513 }
514
515 #[cfg(test)]
516 pub(in crate::db) fn sql_compiled_command_cache_len(&self) -> usize {
517 self.with_sql_compiled_command_cache(|cache| cache.len())
518 }
519
520 #[cfg(test)]
521 pub(in crate::db) fn clear_sql_caches_for_tests(&self) {
522 self.with_sql_compiled_command_cache(SqlCompiledCommandCache::clear);
523 }
524
525 #[expect(clippy::too_many_lines)]
528 fn compile_sql_statement_for_authority(
529 statement: &SqlStatement,
530 authority: EntityAuthority,
531 compiled_cache_key: SqlCompiledCommandCacheKey,
532 ) -> Result<(CompiledSqlCommand, u64, u64, u64, u64), QueryError> {
533 let prepare_statement = || {
537 measure_sql_stage(|| {
538 prepare_sql_statement(statement.clone(), authority.model().name())
539 .map_err(QueryError::from_sql_lowering_error)
540 })
541 };
542
543 let validate_metadata_entity = |sql_entity: &str| {
546 if identifiers_tail_match(sql_entity, authority.model().name()) {
547 return Ok(());
548 }
549
550 Err(QueryError::from_sql_lowering_error(
551 SqlLoweringError::EntityMismatch {
552 sql_entity: sql_entity.to_string(),
553 expected_entity: authority.model().name(),
554 },
555 ))
556 };
557
558 match statement {
559 SqlStatement::Select(_) => {
560 let (prepare_local_instructions, prepared) = prepare_statement();
561 let prepared = prepared?;
562 let (aggregate_lane_check_local_instructions, requires_aggregate_lane) =
563 measure_sql_stage(|| {
564 Ok::<_, QueryError>(prepared.statement().is_global_aggregate_lane_shape())
565 });
566 let requires_aggregate_lane = requires_aggregate_lane?;
567
568 if requires_aggregate_lane {
569 let (lower_local_instructions, command) = measure_sql_stage(|| {
570 compile_sql_global_aggregate_command_core_from_prepared(
571 prepared,
572 authority.model(),
573 MissingRowPolicy::Ignore,
574 )
575 .map_err(QueryError::from_sql_lowering_error)
576 });
577 let command = command?;
578
579 Ok((
580 CompiledSqlCommand::GlobalAggregate {
581 command: Box::new(command),
582 },
583 aggregate_lane_check_local_instructions,
584 prepare_local_instructions,
585 lower_local_instructions,
586 0,
587 ))
588 } else {
589 let (lower_local_instructions, lowered) = measure_sql_stage(|| {
590 lower_sql_command_from_prepared_statement(prepared, authority.model()).map_err(
591 |err| match err {
592 SqlLoweringError::UnexpectedQueryLaneStatement => {
593 QueryError::invariant(
594 "query-lane SQL lowering reached a non query-compatible statement",
595 )
596 }
597 other => QueryError::from_sql_lowering_error(other),
598 },
599 )
600 });
601 let lowered = lowered?;
602 let Some(LoweredSqlQuery::Select(select)) = lowered.into_query() else {
603 return Err(QueryError::invariant(
604 "compiled SQL SELECT lane must lower to lowered SQL SELECT",
605 ));
606 };
607 let (bind_local_instructions, query) = measure_sql_stage(|| {
608 bind_lowered_sql_select_query_structural(
609 authority.model(),
610 select,
611 MissingRowPolicy::Ignore,
612 )
613 .map_err(QueryError::from_sql_lowering_error)
614 });
615 let query = query?;
616
617 Ok((
618 CompiledSqlCommand::Select {
619 query: Arc::new(query),
620 compiled_cache_key,
621 },
622 aggregate_lane_check_local_instructions,
623 prepare_local_instructions,
624 lower_local_instructions,
625 bind_local_instructions,
626 ))
627 }
628 }
629 SqlStatement::Delete(_) => {
630 let (prepare_local_instructions, prepared) = prepare_statement();
631 let prepared = prepared?;
632 let normalized_statement = prepared.clone().into_statement();
633 let (lower_local_instructions, lowered) = measure_sql_stage(|| {
634 lower_sql_command_from_prepared_statement(prepared, authority.model())
635 .map_err(QueryError::from_sql_lowering_error)
636 });
637 let lowered = lowered?;
638 let Some(LoweredSqlQuery::Delete(query)) = lowered.into_query() else {
639 return Err(QueryError::invariant(
640 "compiled SQL DELETE lane must lower to lowered SQL DELETE",
641 ));
642 };
643 let SqlStatement::Delete(statement) = normalized_statement else {
644 return Err(QueryError::invariant(
645 "prepared SQL DELETE compilation must preserve DELETE statement ownership",
646 ));
647 };
648
649 Ok((
650 CompiledSqlCommand::Delete { query, statement },
651 0,
652 prepare_local_instructions,
653 lower_local_instructions,
654 0,
655 ))
656 }
657 SqlStatement::Insert(_) => {
658 let (prepare_local_instructions, prepared) = prepare_statement();
659 let prepared = prepared?;
660 let SqlStatement::Insert(statement) = prepared.into_statement() else {
661 return Err(QueryError::invariant(
662 "prepared SQL INSERT compilation must preserve INSERT statement ownership",
663 ));
664 };
665
666 Ok((
667 CompiledSqlCommand::Insert(statement),
668 0,
669 prepare_local_instructions,
670 0,
671 0,
672 ))
673 }
674 SqlStatement::Update(_) => {
675 let (prepare_local_instructions, prepared) = prepare_statement();
676 let prepared = prepared?;
677 let SqlStatement::Update(statement) = prepared.into_statement() else {
678 return Err(QueryError::invariant(
679 "prepared SQL UPDATE compilation must preserve UPDATE statement ownership",
680 ));
681 };
682
683 Ok((
684 CompiledSqlCommand::Update(statement),
685 0,
686 prepare_local_instructions,
687 0,
688 0,
689 ))
690 }
691 SqlStatement::Explain(_) => {
692 let (prepare_local_instructions, prepared) = prepare_statement();
693 let prepared = prepared?;
694 let (lower_local_instructions, lowered) = measure_sql_stage(|| {
695 lower_sql_command_from_prepared_statement(prepared, authority.model())
696 .map_err(QueryError::from_sql_lowering_error)
697 });
698 let lowered = lowered?;
699
700 Ok((
701 CompiledSqlCommand::Explain(lowered),
702 0,
703 prepare_local_instructions,
704 lower_local_instructions,
705 0,
706 ))
707 }
708 SqlStatement::Describe(_) => {
709 let (prepare_local_instructions, validated) = measure_sql_stage(|| {
710 let SqlStatement::Describe(statement) = statement else {
711 return Err(QueryError::invariant(
712 "compiled SQL DESCRIBE lane must preserve DESCRIBE statement ownership",
713 ));
714 };
715
716 validate_metadata_entity(statement.entity.as_str())
717 });
718 validated?;
719
720 Ok((
721 CompiledSqlCommand::DescribeEntity,
722 0,
723 prepare_local_instructions,
724 0,
725 0,
726 ))
727 }
728 SqlStatement::ShowIndexes(entity) => {
729 let (prepare_local_instructions, validated) =
730 measure_sql_stage(|| validate_metadata_entity(entity.entity.as_str()));
731 validated?;
732
733 Ok((
734 CompiledSqlCommand::ShowIndexesEntity,
735 0,
736 prepare_local_instructions,
737 0,
738 0,
739 ))
740 }
741 SqlStatement::ShowColumns(entity) => {
742 let (prepare_local_instructions, validated) =
743 measure_sql_stage(|| validate_metadata_entity(entity.entity.as_str()));
744 validated?;
745
746 Ok((
747 CompiledSqlCommand::ShowColumnsEntity,
748 0,
749 prepare_local_instructions,
750 0,
751 0,
752 ))
753 }
754 SqlStatement::ShowEntities(_) => Ok((CompiledSqlCommand::ShowEntities, 0, 0, 0, 0)),
755 }
756 }
757
758 fn sql_select_prepared_plan(
761 &self,
762 query: &StructuralQuery,
763 authority: EntityAuthority,
764 cache_schema_fingerprint: CommitSchemaFingerprint,
765 ) -> Result<
766 (
767 SharedPreparedExecutionPlan,
768 SqlProjectionContract,
769 SqlCacheAttribution,
770 ),
771 QueryError,
772 > {
773 let (prepared_plan, cache_attribution) = self.cached_shared_query_plan_for_authority(
774 authority,
775 cache_schema_fingerprint,
776 query,
777 )?;
778 let projection_spec = prepared_plan
779 .logical_plan()
780 .projection_spec(authority.model());
781 let projection = SqlProjectionContract::new(
782 projection_labels_from_projection_spec(&projection_spec),
783 projection_fixed_scales_from_projection_spec(&projection_spec),
784 );
785
786 Ok((
787 prepared_plan,
788 projection,
789 SqlCacheAttribution::from_shared_query_plan_cache(cache_attribution),
790 ))
791 }
792
793 fn ensure_sql_statement_supported_for_surface(
797 statement: &SqlStatement,
798 surface: SqlCompiledCommandSurface,
799 ) -> Result<(), QueryError> {
800 match (surface, statement) {
801 (
802 SqlCompiledCommandSurface::Query,
803 SqlStatement::Select(_)
804 | SqlStatement::Explain(_)
805 | SqlStatement::Describe(_)
806 | SqlStatement::ShowIndexes(_)
807 | SqlStatement::ShowColumns(_)
808 | SqlStatement::ShowEntities(_),
809 )
810 | (
811 SqlCompiledCommandSurface::Update,
812 SqlStatement::Insert(_) | SqlStatement::Update(_) | SqlStatement::Delete(_),
813 ) => Ok(()),
814 (SqlCompiledCommandSurface::Query, SqlStatement::Insert(_)) => {
815 Err(QueryError::unsupported_query(
816 "execute_sql_query rejects INSERT; use execute_sql_update::<E>()",
817 ))
818 }
819 (SqlCompiledCommandSurface::Query, SqlStatement::Update(_)) => {
820 Err(QueryError::unsupported_query(
821 "execute_sql_query rejects UPDATE; use execute_sql_update::<E>()",
822 ))
823 }
824 (SqlCompiledCommandSurface::Query, SqlStatement::Delete(_)) => {
825 Err(QueryError::unsupported_query(
826 "execute_sql_query rejects DELETE; use execute_sql_update::<E>()",
827 ))
828 }
829 (SqlCompiledCommandSurface::Update, SqlStatement::Select(_)) => {
830 Err(QueryError::unsupported_query(
831 "execute_sql_update rejects SELECT; use execute_sql_query::<E>()",
832 ))
833 }
834 (SqlCompiledCommandSurface::Update, SqlStatement::Explain(_)) => {
835 Err(QueryError::unsupported_query(
836 "execute_sql_update rejects EXPLAIN; use execute_sql_query::<E>()",
837 ))
838 }
839 (SqlCompiledCommandSurface::Update, SqlStatement::Describe(_)) => {
840 Err(QueryError::unsupported_query(
841 "execute_sql_update rejects DESCRIBE; use execute_sql_query::<E>()",
842 ))
843 }
844 (SqlCompiledCommandSurface::Update, SqlStatement::ShowIndexes(_)) => {
845 Err(QueryError::unsupported_query(
846 "execute_sql_update rejects SHOW INDEXES; use execute_sql_query::<E>()",
847 ))
848 }
849 (SqlCompiledCommandSurface::Update, SqlStatement::ShowColumns(_)) => {
850 Err(QueryError::unsupported_query(
851 "execute_sql_update rejects SHOW COLUMNS; use execute_sql_query::<E>()",
852 ))
853 }
854 (SqlCompiledCommandSurface::Update, SqlStatement::ShowEntities(_)) => {
855 Err(QueryError::unsupported_query(
856 "execute_sql_update rejects SHOW ENTITIES; use execute_sql_query::<E>()",
857 ))
858 }
859 }
860 }
861
862 pub fn execute_sql_query<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
867 where
868 E: PersistedRow<Canister = C> + EntityValue,
869 {
870 let compiled = self.compile_sql_query::<E>(sql)?;
871
872 self.execute_compiled_sql::<E>(&compiled)
873 }
874
875 #[cfg(feature = "diagnostics")]
878 #[doc(hidden)]
879 pub fn execute_sql_query_with_attribution<E>(
880 &self,
881 sql: &str,
882 ) -> Result<(SqlStatementResult, SqlQueryExecutionAttribution), QueryError>
883 where
884 E: PersistedRow<Canister = C> + EntityValue,
885 {
886 let (compile_local_instructions, compiled) =
889 measure_sql_stage(|| self.compile_sql_query_with_cache_attribution::<E>(sql));
890 let (compiled, compile_cache_attribution, compile_phase_attribution) = compiled?;
891
892 let store_get_calls_before = DataStore::current_get_call_count();
895 let pure_covering_decode_before = current_pure_covering_decode_local_instructions();
896 let pure_covering_row_assembly_before =
897 current_pure_covering_row_assembly_local_instructions();
898 let (result, execute_cache_attribution, execute_phase_attribution) =
899 self.execute_compiled_sql_with_phase_attribution::<E>(&compiled)?;
900 let store_get_calls =
901 DataStore::current_get_call_count().saturating_sub(store_get_calls_before);
902 let pure_covering_decode_local_instructions =
903 current_pure_covering_decode_local_instructions()
904 .saturating_sub(pure_covering_decode_before);
905 let pure_covering_row_assembly_local_instructions =
906 current_pure_covering_row_assembly_local_instructions()
907 .saturating_sub(pure_covering_row_assembly_before);
908 let execute_local_instructions = execute_phase_attribution
909 .planner_local_instructions
910 .saturating_add(execute_phase_attribution.store_local_instructions)
911 .saturating_add(execute_phase_attribution.executor_local_instructions);
912 let cache_attribution = compile_cache_attribution.merge(execute_cache_attribution);
913 let total_local_instructions =
914 compile_local_instructions.saturating_add(execute_local_instructions);
915
916 Ok((
917 result,
918 SqlQueryExecutionAttribution {
919 compile_local_instructions,
920 compile_cache_key_local_instructions: compile_phase_attribution.cache_key,
921 compile_cache_lookup_local_instructions: compile_phase_attribution.cache_lookup,
922 compile_parse_local_instructions: compile_phase_attribution.parse,
923 compile_parse_tokenize_local_instructions: compile_phase_attribution.parse_tokenize,
924 compile_parse_select_local_instructions: compile_phase_attribution.parse_select,
925 compile_parse_expr_local_instructions: compile_phase_attribution.parse_expr,
926 compile_parse_predicate_local_instructions: compile_phase_attribution
927 .parse_predicate,
928 compile_aggregate_lane_check_local_instructions: compile_phase_attribution
929 .aggregate_lane_check,
930 compile_prepare_local_instructions: compile_phase_attribution.prepare,
931 compile_lower_local_instructions: compile_phase_attribution.lower,
932 compile_bind_local_instructions: compile_phase_attribution.bind,
933 compile_cache_insert_local_instructions: compile_phase_attribution.cache_insert,
934 planner_local_instructions: execute_phase_attribution.planner_local_instructions,
935 store_local_instructions: execute_phase_attribution.store_local_instructions,
936 executor_local_instructions: execute_phase_attribution.executor_local_instructions,
937 grouped_stream_local_instructions: execute_phase_attribution
938 .grouped_stream_local_instructions,
939 grouped_fold_local_instructions: execute_phase_attribution
940 .grouped_fold_local_instructions,
941 grouped_finalize_local_instructions: execute_phase_attribution
942 .grouped_finalize_local_instructions,
943 grouped_count_borrowed_hash_computations: execute_phase_attribution
944 .grouped_count
945 .borrowed_hash_computations,
946 grouped_count_bucket_candidate_checks: execute_phase_attribution
947 .grouped_count
948 .bucket_candidate_checks,
949 grouped_count_existing_group_hits: execute_phase_attribution
950 .grouped_count
951 .existing_group_hits,
952 grouped_count_new_group_inserts: execute_phase_attribution
953 .grouped_count
954 .new_group_inserts,
955 grouped_count_row_materialization_local_instructions: execute_phase_attribution
956 .grouped_count
957 .row_materialization_local_instructions,
958 grouped_count_group_lookup_local_instructions: execute_phase_attribution
959 .grouped_count
960 .group_lookup_local_instructions,
961 grouped_count_existing_group_update_local_instructions: execute_phase_attribution
962 .grouped_count
963 .existing_group_update_local_instructions,
964 grouped_count_new_group_insert_local_instructions: execute_phase_attribution
965 .grouped_count
966 .new_group_insert_local_instructions,
967 pure_covering_decode_local_instructions,
968 pure_covering_row_assembly_local_instructions,
969 store_get_calls,
970 response_decode_local_instructions: 0,
971 execute_local_instructions,
972 total_local_instructions,
973 sql_compiled_command_cache_hits: cache_attribution.sql_compiled_command_cache_hits,
974 sql_compiled_command_cache_misses: cache_attribution
975 .sql_compiled_command_cache_misses,
976 shared_query_plan_cache_hits: cache_attribution.shared_query_plan_cache_hits,
977 shared_query_plan_cache_misses: cache_attribution.shared_query_plan_cache_misses,
978 },
979 ))
980 }
981
982 pub fn execute_sql_update<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
987 where
988 E: PersistedRow<Canister = C> + EntityValue,
989 {
990 let compiled = self.compile_sql_update::<E>(sql)?;
991
992 self.execute_compiled_sql::<E>(&compiled)
993 }
994
995 pub(in crate::db) fn compile_sql_query<E>(
998 &self,
999 sql: &str,
1000 ) -> Result<CompiledSqlCommand, QueryError>
1001 where
1002 E: PersistedRow<Canister = C> + EntityValue,
1003 {
1004 self.compile_sql_query_with_cache_attribution::<E>(sql)
1005 .map(|(compiled, _, _)| compiled)
1006 }
1007
1008 fn compile_sql_query_with_cache_attribution<E>(
1009 &self,
1010 sql: &str,
1011 ) -> Result<
1012 (
1013 CompiledSqlCommand,
1014 SqlCacheAttribution,
1015 SqlCompilePhaseAttribution,
1016 ),
1017 QueryError,
1018 >
1019 where
1020 E: PersistedRow<Canister = C> + EntityValue,
1021 {
1022 self.compile_sql_surface_with_cache_attribution::<E>(sql, SqlCompiledCommandSurface::Query)
1023 }
1024
1025 pub(in crate::db) fn compile_sql_update<E>(
1028 &self,
1029 sql: &str,
1030 ) -> Result<CompiledSqlCommand, QueryError>
1031 where
1032 E: PersistedRow<Canister = C> + EntityValue,
1033 {
1034 self.compile_sql_update_with_cache_attribution::<E>(sql)
1035 .map(|(compiled, _, _)| compiled)
1036 }
1037
1038 fn compile_sql_update_with_cache_attribution<E>(
1039 &self,
1040 sql: &str,
1041 ) -> Result<
1042 (
1043 CompiledSqlCommand,
1044 SqlCacheAttribution,
1045 SqlCompilePhaseAttribution,
1046 ),
1047 QueryError,
1048 >
1049 where
1050 E: PersistedRow<Canister = C> + EntityValue,
1051 {
1052 self.compile_sql_surface_with_cache_attribution::<E>(sql, SqlCompiledCommandSurface::Update)
1053 }
1054
1055 fn compile_sql_surface_with_cache_attribution<E>(
1059 &self,
1060 sql: &str,
1061 surface: SqlCompiledCommandSurface,
1062 ) -> Result<
1063 (
1064 CompiledSqlCommand,
1065 SqlCacheAttribution,
1066 SqlCompilePhaseAttribution,
1067 ),
1068 QueryError,
1069 >
1070 where
1071 E: PersistedRow<Canister = C> + EntityValue,
1072 {
1073 let (cache_key_local_instructions, cache_key) = measure_sql_stage(|| {
1074 Ok::<_, QueryError>(SqlCompiledCommandCacheKey::for_entity::<E>(surface, sql))
1075 });
1076 let cache_key = cache_key?;
1077
1078 self.compile_sql_statement_with_cache::<E, _>(
1079 cache_key,
1080 cache_key_local_instructions,
1081 sql,
1082 |statement| Self::ensure_sql_statement_supported_for_surface(statement, surface),
1083 )
1084 }
1085
1086 fn compile_sql_statement_with_cache<E, F>(
1089 &self,
1090 cache_key: SqlCompiledCommandCacheKey,
1091 cache_key_local_instructions: u64,
1092 sql: &str,
1093 ensure_surface_supported: F,
1094 ) -> Result<
1095 (
1096 CompiledSqlCommand,
1097 SqlCacheAttribution,
1098 SqlCompilePhaseAttribution,
1099 ),
1100 QueryError,
1101 >
1102 where
1103 E: PersistedRow<Canister = C> + EntityValue,
1104 F: FnOnce(&SqlStatement) -> Result<(), QueryError>,
1105 {
1106 let (cache_lookup_local_instructions, cached) = measure_sql_stage(|| {
1107 let cached =
1108 self.with_sql_compiled_command_cache(|cache| cache.get(&cache_key).cloned());
1109 Ok::<_, QueryError>(cached)
1110 });
1111 let cached = cached?;
1112 if let Some(compiled) = cached {
1113 return Ok((
1114 compiled,
1115 SqlCacheAttribution::sql_compiled_command_cache_hit(),
1116 SqlCompilePhaseAttribution::cache_hit(
1117 cache_key_local_instructions,
1118 cache_lookup_local_instructions,
1119 ),
1120 ));
1121 }
1122
1123 let (parse_local_instructions, parsed) = measure_sql_stage(|| {
1124 parse_sql_with_attribution(sql).map_err(QueryError::from_sql_parse_error)
1125 });
1126 let (parsed, parse_attribution) = parsed?;
1127 let parse_select_local_instructions = parse_local_instructions
1128 .saturating_sub(parse_attribution.tokenize)
1129 .saturating_sub(parse_attribution.expr)
1130 .saturating_sub(parse_attribution.predicate);
1131 ensure_surface_supported(&parsed)?;
1132 let authority = EntityAuthority::for_type::<E>();
1133 let (
1134 compiled,
1135 aggregate_lane_check_local_instructions,
1136 prepare_local_instructions,
1137 lower_local_instructions,
1138 bind_local_instructions,
1139 ) = Self::compile_sql_statement_for_authority(&parsed, authority, cache_key.clone())?;
1140
1141 let (cache_insert_local_instructions, cache_insert) = measure_sql_stage(|| {
1142 self.with_sql_compiled_command_cache(|cache| {
1143 cache.insert(cache_key, compiled.clone());
1144 });
1145 Ok::<_, QueryError>(())
1146 });
1147 cache_insert?;
1148
1149 Ok((
1150 compiled,
1151 SqlCacheAttribution::sql_compiled_command_cache_miss(),
1152 SqlCompilePhaseAttribution {
1153 cache_key: cache_key_local_instructions,
1154 cache_lookup: cache_lookup_local_instructions,
1155 parse: parse_local_instructions,
1156 parse_tokenize: parse_attribution.tokenize,
1157 parse_select: parse_select_local_instructions,
1158 parse_expr: parse_attribution.expr,
1159 parse_predicate: parse_attribution.predicate,
1160 aggregate_lane_check: aggregate_lane_check_local_instructions,
1161 prepare: prepare_local_instructions,
1162 lower: lower_local_instructions,
1163 bind: bind_local_instructions,
1164 cache_insert: cache_insert_local_instructions,
1165 },
1166 ))
1167 }
1168}