1mod execute;
8mod projection;
9
10#[cfg(feature = "diagnostics")]
11use candid::CandidType;
12#[cfg(feature = "diagnostics")]
13use serde::Deserialize;
14use std::{cell::RefCell, collections::HashMap, sync::Arc};
15
16const SQL_COMPILED_COMMAND_CACHE_METHOD_VERSION: u8 = 1;
22
23#[cfg(feature = "diagnostics")]
24use crate::db::DataStore;
25#[cfg(feature = "diagnostics")]
26use crate::db::executor::GroupedCountAttribution;
27#[cfg(feature = "diagnostics")]
28use crate::db::session::sql::projection::{
29 current_pure_covering_decode_local_instructions,
30 current_pure_covering_row_assembly_local_instructions,
31};
32#[cfg(test)]
33use crate::db::sql::parser::parse_sql;
34use crate::db::sql::parser::{SqlInsertStatement, SqlReturningProjection, SqlUpdateStatement};
35use crate::{
36 db::{
37 DbSession, GroupedRow, MissingRowPolicy, PersistedRow, QueryError,
38 commit::CommitSchemaFingerprint,
39 executor::{EntityAuthority, SharedPreparedExecutionPlan},
40 query::intent::StructuralQuery,
41 schema::commit_schema_fingerprint_for_entity,
42 session::query::QueryPlanCacheAttribution,
43 session::sql::projection::{
44 projection_fixed_scales_from_projection_spec, projection_labels_from_projection_spec,
45 },
46 sql::lowering::{
47 LoweredSqlCommand, SqlGlobalAggregateCommandCore,
48 bind_lowered_sql_delete_query_structural, bind_lowered_sql_select_query_structural,
49 compile_sql_global_aggregate_command_core_from_prepared,
50 extract_prepared_sql_insert_statement, extract_prepared_sql_update_statement,
51 lower_prepared_sql_delete_statement, lower_prepared_sql_select_statement,
52 lower_sql_command_from_prepared_statement, prepare_sql_statement,
53 },
54 sql::parser::{SqlStatement, parse_sql_with_attribution},
55 },
56 traits::{CanisterKind, EntityValue},
57 value::OutputValue,
58};
59
60#[cfg(all(test, not(feature = "diagnostics")))]
61pub(crate) use crate::db::session::sql::projection::with_sql_projection_materialization_metrics;
62#[cfg(feature = "diagnostics")]
63pub use crate::db::session::sql::projection::{
64 SqlProjectionMaterializationMetrics, with_sql_projection_materialization_metrics,
65};
66
67#[derive(Debug)]
69pub enum SqlStatementResult {
70 Count {
71 row_count: u32,
72 },
73 Projection {
74 columns: Vec<String>,
75 fixed_scales: Vec<Option<u32>>,
76 rows: Vec<Vec<OutputValue>>,
77 row_count: u32,
78 },
79 ProjectionText {
80 columns: Vec<String>,
81 rows: Vec<Vec<String>>,
82 row_count: u32,
83 },
84 Grouped {
85 columns: Vec<String>,
86 fixed_scales: Vec<Option<u32>>,
87 rows: Vec<GroupedRow>,
88 row_count: u32,
89 next_cursor: Option<String>,
90 },
91 Explain(String),
92 Describe(crate::db::EntitySchemaDescription),
93 ShowIndexes(Vec<String>),
94 ShowColumns(Vec<crate::db::EntityFieldDescription>),
95 ShowEntities(Vec<String>),
96}
97
98#[cfg(feature = "diagnostics")]
109#[derive(CandidType, Clone, Debug, Deserialize, Eq, PartialEq)]
110pub struct SqlQueryExecutionAttribution {
111 pub compile_local_instructions: u64,
112 pub compile_cache_key_local_instructions: u64,
113 pub compile_cache_lookup_local_instructions: u64,
114 pub compile_parse_local_instructions: u64,
115 pub compile_parse_tokenize_local_instructions: u64,
116 pub compile_parse_select_local_instructions: u64,
117 pub compile_parse_expr_local_instructions: u64,
118 pub compile_parse_predicate_local_instructions: u64,
119 pub compile_aggregate_lane_check_local_instructions: u64,
120 pub compile_prepare_local_instructions: u64,
121 pub compile_lower_local_instructions: u64,
122 pub compile_bind_local_instructions: u64,
123 pub compile_cache_insert_local_instructions: u64,
124 pub planner_local_instructions: u64,
125 pub store_local_instructions: u64,
126 pub executor_local_instructions: u64,
127 pub grouped_stream_local_instructions: u64,
128 pub grouped_fold_local_instructions: u64,
129 pub grouped_finalize_local_instructions: u64,
130 pub grouped_count_borrowed_hash_computations: u64,
131 pub grouped_count_bucket_candidate_checks: u64,
132 pub grouped_count_existing_group_hits: u64,
133 pub grouped_count_new_group_inserts: u64,
134 pub grouped_count_row_materialization_local_instructions: u64,
135 pub grouped_count_group_lookup_local_instructions: u64,
136 pub grouped_count_existing_group_update_local_instructions: u64,
137 pub grouped_count_new_group_insert_local_instructions: u64,
138 pub pure_covering_decode_local_instructions: u64,
139 pub pure_covering_row_assembly_local_instructions: u64,
140 pub store_get_calls: u64,
141 pub response_decode_local_instructions: u64,
142 pub execute_local_instructions: u64,
143 pub total_local_instructions: u64,
144 pub sql_compiled_command_cache_hits: u64,
145 pub sql_compiled_command_cache_misses: u64,
146 pub shared_query_plan_cache_hits: u64,
147 pub shared_query_plan_cache_misses: u64,
148}
149
150#[cfg(feature = "diagnostics")]
154#[derive(Clone, Copy, Debug, Eq, PartialEq)]
155pub(in crate::db) struct SqlExecutePhaseAttribution {
156 pub planner_local_instructions: u64,
157 pub store_local_instructions: u64,
158 pub executor_local_instructions: u64,
159 pub grouped_stream_local_instructions: u64,
160 pub grouped_fold_local_instructions: u64,
161 pub grouped_finalize_local_instructions: u64,
162 pub grouped_count: GroupedCountAttribution,
163}
164
165#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
177pub(in crate::db) struct SqlCompilePhaseAttribution {
178 pub cache_key: u64,
179 pub cache_lookup: u64,
180 pub parse: u64,
181 pub parse_tokenize: u64,
182 pub parse_select: u64,
183 pub parse_expr: u64,
184 pub parse_predicate: u64,
185 pub aggregate_lane_check: u64,
186 pub prepare: u64,
187 pub lower: u64,
188 pub bind: u64,
189 pub cache_insert: u64,
190}
191
192impl SqlCompilePhaseAttribution {
193 #[must_use]
194 const fn cache_hit(cache_key: u64, cache_lookup: u64) -> Self {
195 Self {
196 cache_key,
197 cache_lookup,
198 parse: 0,
199 parse_tokenize: 0,
200 parse_select: 0,
201 parse_expr: 0,
202 parse_predicate: 0,
203 aggregate_lane_check: 0,
204 prepare: 0,
205 lower: 0,
206 bind: 0,
207 cache_insert: 0,
208 }
209 }
210}
211
212#[cfg(feature = "diagnostics")]
213impl SqlExecutePhaseAttribution {
214 #[must_use]
215 pub(in crate::db) const fn from_execute_total_and_store_total(
216 execute_local_instructions: u64,
217 store_local_instructions: u64,
218 ) -> Self {
219 Self {
220 planner_local_instructions: 0,
221 store_local_instructions,
222 executor_local_instructions: execute_local_instructions
223 .saturating_sub(store_local_instructions),
224 grouped_stream_local_instructions: 0,
225 grouped_fold_local_instructions: 0,
226 grouped_finalize_local_instructions: 0,
227 grouped_count: GroupedCountAttribution::none(),
228 }
229 }
230}
231
232#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
238pub(in crate::db) struct SqlCacheAttribution {
239 pub sql_compiled_command_cache_hits: u64,
240 pub sql_compiled_command_cache_misses: u64,
241 pub shared_query_plan_cache_hits: u64,
242 pub shared_query_plan_cache_misses: u64,
243}
244
245impl SqlCacheAttribution {
246 #[must_use]
247 const fn none() -> Self {
248 Self {
249 sql_compiled_command_cache_hits: 0,
250 sql_compiled_command_cache_misses: 0,
251 shared_query_plan_cache_hits: 0,
252 shared_query_plan_cache_misses: 0,
253 }
254 }
255
256 #[must_use]
257 const fn sql_compiled_command_cache_hit() -> Self {
258 Self {
259 sql_compiled_command_cache_hits: 1,
260 ..Self::none()
261 }
262 }
263
264 #[must_use]
265 const fn sql_compiled_command_cache_miss() -> Self {
266 Self {
267 sql_compiled_command_cache_misses: 1,
268 ..Self::none()
269 }
270 }
271
272 #[must_use]
273 const fn from_shared_query_plan_cache(attribution: QueryPlanCacheAttribution) -> Self {
274 Self {
275 shared_query_plan_cache_hits: attribution.hits,
276 shared_query_plan_cache_misses: attribution.misses,
277 ..Self::none()
278 }
279 }
280
281 #[must_use]
282 const fn merge(self, other: Self) -> Self {
283 Self {
284 sql_compiled_command_cache_hits: self
285 .sql_compiled_command_cache_hits
286 .saturating_add(other.sql_compiled_command_cache_hits),
287 sql_compiled_command_cache_misses: self
288 .sql_compiled_command_cache_misses
289 .saturating_add(other.sql_compiled_command_cache_misses),
290 shared_query_plan_cache_hits: self
291 .shared_query_plan_cache_hits
292 .saturating_add(other.shared_query_plan_cache_hits),
293 shared_query_plan_cache_misses: self
294 .shared_query_plan_cache_misses
295 .saturating_add(other.shared_query_plan_cache_misses),
296 }
297 }
298}
299
300#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
301enum SqlCompiledCommandSurface {
302 Query,
303 Update,
304}
305
306#[derive(Clone, Debug, Eq, Hash, PartialEq)]
317pub(in crate::db) struct SqlCompiledCommandCacheKey {
318 cache_method_version: u8,
319 surface: SqlCompiledCommandSurface,
320 entity_path: &'static str,
321 schema_fingerprint: CommitSchemaFingerprint,
322 sql: String,
323}
324
325#[derive(Clone, Debug)]
335pub(in crate::db) struct SqlProjectionContract {
336 columns: Vec<String>,
337 fixed_scales: Vec<Option<u32>>,
338}
339
340impl SqlProjectionContract {
341 #[must_use]
342 pub(in crate::db) const fn new(columns: Vec<String>, fixed_scales: Vec<Option<u32>>) -> Self {
343 Self {
344 columns,
345 fixed_scales,
346 }
347 }
348
349 #[must_use]
350 pub(in crate::db) fn into_parts(self) -> (Vec<String>, Vec<Option<u32>>) {
351 (self.columns, self.fixed_scales)
352 }
353}
354
355impl SqlCompiledCommandCacheKey {
356 fn for_entity<E>(surface: SqlCompiledCommandSurface, sql: &str) -> Self
357 where
358 E: PersistedRow + EntityValue,
359 {
360 Self {
361 cache_method_version: SQL_COMPILED_COMMAND_CACHE_METHOD_VERSION,
362 surface,
363 entity_path: E::PATH,
364 schema_fingerprint: commit_schema_fingerprint_for_entity::<E>(),
365 sql: sql.to_string(),
366 }
367 }
368
369 #[must_use]
370 pub(in crate::db) const fn schema_fingerprint(&self) -> CommitSchemaFingerprint {
371 self.schema_fingerprint
372 }
373}
374
375#[cfg(test)]
376impl SqlCompiledCommandCacheKey {
377 pub(in crate::db) fn query_for_entity_with_method_version<E>(
378 sql: &str,
379 cache_method_version: u8,
380 ) -> Self
381 where
382 E: PersistedRow + EntityValue,
383 {
384 Self::for_entity_with_method_version::<E>(
385 SqlCompiledCommandSurface::Query,
386 sql,
387 cache_method_version,
388 )
389 }
390
391 pub(in crate::db) fn update_for_entity_with_method_version<E>(
392 sql: &str,
393 cache_method_version: u8,
394 ) -> Self
395 where
396 E: PersistedRow + EntityValue,
397 {
398 Self::for_entity_with_method_version::<E>(
399 SqlCompiledCommandSurface::Update,
400 sql,
401 cache_method_version,
402 )
403 }
404
405 fn for_entity_with_method_version<E>(
406 surface: SqlCompiledCommandSurface,
407 sql: &str,
408 cache_method_version: u8,
409 ) -> Self
410 where
411 E: PersistedRow + EntityValue,
412 {
413 Self {
414 cache_method_version,
415 surface,
416 entity_path: E::PATH,
417 schema_fingerprint: commit_schema_fingerprint_for_entity::<E>(),
418 sql: sql.to_string(),
419 }
420 }
421}
422
423pub(in crate::db) type SqlCompiledCommandCache =
424 HashMap<SqlCompiledCommandCacheKey, CompiledSqlCommand>;
425
426thread_local! {
427 static SQL_COMPILED_COMMAND_CACHES: RefCell<HashMap<usize, SqlCompiledCommandCache>> =
431 RefCell::new(HashMap::default());
432}
433
434#[derive(Clone, Debug)]
438pub(in crate::db) enum CompiledSqlCommand {
439 Select {
440 query: Arc<StructuralQuery>,
441 compiled_cache_key: SqlCompiledCommandCacheKey,
442 },
443 Delete {
444 query: Arc<StructuralQuery>,
445 returning: Option<SqlReturningProjection>,
446 },
447 GlobalAggregate {
448 command: Box<SqlGlobalAggregateCommandCore>,
449 },
450 Explain(Box<LoweredSqlCommand>),
451 Insert(SqlInsertStatement),
452 Update(SqlUpdateStatement),
453 DescribeEntity,
454 ShowIndexesEntity,
455 ShowColumnsEntity,
456 ShowEntities,
457}
458
459#[cfg(test)]
462pub(in crate::db) fn parse_sql_statement(sql: &str) -> Result<SqlStatement, QueryError> {
463 parse_sql(sql).map_err(QueryError::from_sql_parse_error)
464}
465
466#[cfg(feature = "diagnostics")]
467#[expect(
468 clippy::missing_const_for_fn,
469 reason = "the wasm32 branch reads the runtime performance counter and cannot be const"
470)]
471fn read_sql_local_instruction_counter() -> u64 {
472 #[cfg(all(feature = "diagnostics", target_arch = "wasm32"))]
473 {
474 canic_cdk::api::performance_counter(1)
475 }
476
477 #[cfg(not(all(feature = "diagnostics", target_arch = "wasm32")))]
478 {
479 0
480 }
481}
482
483pub(in crate::db::session::sql) fn measure_sql_stage<T, E>(
484 run: impl FnOnce() -> Result<T, E>,
485) -> (u64, Result<T, E>) {
486 #[cfg(feature = "diagnostics")]
487 let start = read_sql_local_instruction_counter();
488
489 let result = run();
490
491 #[cfg(feature = "diagnostics")]
492 let delta = read_sql_local_instruction_counter().saturating_sub(start);
493
494 #[cfg(not(feature = "diagnostics"))]
495 let delta = 0;
496
497 (delta, result)
498}
499
500impl<C: CanisterKind> DbSession<C> {
501 fn with_sql_compiled_command_cache<R>(
502 &self,
503 f: impl FnOnce(&mut SqlCompiledCommandCache) -> R,
504 ) -> R {
505 let scope_id = self.db.cache_scope_id();
506
507 SQL_COMPILED_COMMAND_CACHES.with(|caches| {
508 let mut caches = caches.borrow_mut();
509 let cache = caches.entry(scope_id).or_default();
510
511 f(cache)
512 })
513 }
514
515 #[cfg(test)]
516 pub(in crate::db) fn sql_compiled_command_cache_len(&self) -> usize {
517 self.with_sql_compiled_command_cache(|cache| cache.len())
518 }
519
520 #[cfg(test)]
521 pub(in crate::db) fn clear_sql_caches_for_tests(&self) {
522 self.with_sql_compiled_command_cache(SqlCompiledCommandCache::clear);
523 }
524
525 #[expect(clippy::too_many_lines)]
528 fn compile_sql_statement_for_authority(
529 statement: &SqlStatement,
530 authority: EntityAuthority,
531 compiled_cache_key: SqlCompiledCommandCacheKey,
532 ) -> Result<(CompiledSqlCommand, u64, u64, u64, u64), QueryError> {
533 let prepare_statement = || {
537 measure_sql_stage(|| {
538 prepare_sql_statement(statement.clone(), authority.model().name())
539 .map_err(QueryError::from_sql_lowering_error)
540 })
541 };
542
543 match statement {
544 SqlStatement::Select(_) => {
545 let (prepare_local_instructions, prepared) = prepare_statement();
546 let prepared = prepared?;
547 let (aggregate_lane_check_local_instructions, requires_aggregate_lane) =
548 measure_sql_stage(|| {
549 Ok::<_, QueryError>(prepared.statement().is_global_aggregate_lane_shape())
550 });
551 let requires_aggregate_lane = requires_aggregate_lane?;
552
553 if requires_aggregate_lane {
554 let (lower_local_instructions, command) = measure_sql_stage(|| {
555 compile_sql_global_aggregate_command_core_from_prepared(
556 prepared,
557 authority.model(),
558 MissingRowPolicy::Ignore,
559 )
560 .map_err(QueryError::from_sql_lowering_error)
561 });
562 let command = command?;
563
564 Ok((
565 CompiledSqlCommand::GlobalAggregate {
566 command: Box::new(command),
567 },
568 aggregate_lane_check_local_instructions,
569 prepare_local_instructions,
570 lower_local_instructions,
571 0,
572 ))
573 } else {
574 let (lower_local_instructions, select) = measure_sql_stage(|| {
575 lower_prepared_sql_select_statement(prepared, authority.model())
576 .map_err(QueryError::from_sql_lowering_error)
577 });
578 let select = select?;
579 let (bind_local_instructions, query) = measure_sql_stage(|| {
580 bind_lowered_sql_select_query_structural(
581 authority.model(),
582 select,
583 MissingRowPolicy::Ignore,
584 )
585 .map_err(QueryError::from_sql_lowering_error)
586 });
587 let query = query?;
588
589 Ok((
590 CompiledSqlCommand::Select {
591 query: Arc::new(query),
592 compiled_cache_key,
593 },
594 aggregate_lane_check_local_instructions,
595 prepare_local_instructions,
596 lower_local_instructions,
597 bind_local_instructions,
598 ))
599 }
600 }
601 SqlStatement::Delete(_) => {
602 let (prepare_local_instructions, prepared) = prepare_statement();
603 let prepared = prepared?;
604 let (lower_local_instructions, lowered) = measure_sql_stage(|| {
605 lower_prepared_sql_delete_statement(prepared)
606 .map_err(QueryError::from_sql_lowering_error)
607 });
608 let delete = lowered?;
609 let returning = delete.returning().cloned();
610 let query = delete.into_base_query();
611 let (bind_local_instructions, query) = measure_sql_stage(|| {
612 Ok::<_, QueryError>(bind_lowered_sql_delete_query_structural(
613 authority.model(),
614 query,
615 MissingRowPolicy::Ignore,
616 ))
617 });
618 let query = query?;
619
620 Ok((
621 CompiledSqlCommand::Delete {
622 query: Arc::new(query),
623 returning,
624 },
625 0,
626 prepare_local_instructions,
627 lower_local_instructions,
628 bind_local_instructions,
629 ))
630 }
631 SqlStatement::Insert(_) => {
632 let (prepare_local_instructions, prepared) = prepare_statement();
633 let prepared = prepared?;
634 let statement = extract_prepared_sql_insert_statement(prepared)
635 .map_err(QueryError::from_sql_lowering_error)?;
636
637 Ok((
638 CompiledSqlCommand::Insert(statement),
639 0,
640 prepare_local_instructions,
641 0,
642 0,
643 ))
644 }
645 SqlStatement::Update(_) => {
646 let (prepare_local_instructions, prepared) = prepare_statement();
647 let prepared = prepared?;
648 let statement = extract_prepared_sql_update_statement(prepared)
649 .map_err(QueryError::from_sql_lowering_error)?;
650
651 Ok((
652 CompiledSqlCommand::Update(statement),
653 0,
654 prepare_local_instructions,
655 0,
656 0,
657 ))
658 }
659 SqlStatement::Explain(_) => {
660 let (prepare_local_instructions, prepared) = prepare_statement();
661 let prepared = prepared?;
662 let (lower_local_instructions, lowered) = measure_sql_stage(|| {
663 lower_sql_command_from_prepared_statement(prepared, authority.model())
664 .map_err(QueryError::from_sql_lowering_error)
665 });
666 let lowered = lowered?;
667
668 Ok((
669 CompiledSqlCommand::Explain(Box::new(lowered)),
670 0,
671 prepare_local_instructions,
672 lower_local_instructions,
673 0,
674 ))
675 }
676 SqlStatement::Describe(_) => {
677 let (prepare_local_instructions, prepared) = prepare_statement();
678 let _prepared = prepared?;
679
680 Ok((
681 CompiledSqlCommand::DescribeEntity,
682 0,
683 prepare_local_instructions,
684 0,
685 0,
686 ))
687 }
688 SqlStatement::ShowIndexes(_) => {
689 let (prepare_local_instructions, prepared) = prepare_statement();
690 let _prepared = prepared?;
691
692 Ok((
693 CompiledSqlCommand::ShowIndexesEntity,
694 0,
695 prepare_local_instructions,
696 0,
697 0,
698 ))
699 }
700 SqlStatement::ShowColumns(_) => {
701 let (prepare_local_instructions, prepared) = prepare_statement();
702 let _prepared = prepared?;
703
704 Ok((
705 CompiledSqlCommand::ShowColumnsEntity,
706 0,
707 prepare_local_instructions,
708 0,
709 0,
710 ))
711 }
712 SqlStatement::ShowEntities(_) => Ok((CompiledSqlCommand::ShowEntities, 0, 0, 0, 0)),
713 }
714 }
715
716 fn sql_select_prepared_plan(
719 &self,
720 query: &StructuralQuery,
721 authority: EntityAuthority,
722 cache_schema_fingerprint: CommitSchemaFingerprint,
723 ) -> Result<
724 (
725 SharedPreparedExecutionPlan,
726 SqlProjectionContract,
727 SqlCacheAttribution,
728 ),
729 QueryError,
730 > {
731 let (prepared_plan, cache_attribution) = self.cached_shared_query_plan_for_authority(
732 authority,
733 cache_schema_fingerprint,
734 query,
735 )?;
736 let projection_spec = prepared_plan
737 .logical_plan()
738 .projection_spec(authority.model());
739 let projection = SqlProjectionContract::new(
740 projection_labels_from_projection_spec(&projection_spec),
741 projection_fixed_scales_from_projection_spec(&projection_spec),
742 );
743
744 Ok((
745 prepared_plan,
746 projection,
747 SqlCacheAttribution::from_shared_query_plan_cache(cache_attribution),
748 ))
749 }
750
751 fn ensure_sql_statement_supported_for_surface(
755 statement: &SqlStatement,
756 surface: SqlCompiledCommandSurface,
757 ) -> Result<(), QueryError> {
758 match (surface, statement) {
759 (
760 SqlCompiledCommandSurface::Query,
761 SqlStatement::Select(_)
762 | SqlStatement::Explain(_)
763 | SqlStatement::Describe(_)
764 | SqlStatement::ShowIndexes(_)
765 | SqlStatement::ShowColumns(_)
766 | SqlStatement::ShowEntities(_),
767 )
768 | (
769 SqlCompiledCommandSurface::Update,
770 SqlStatement::Insert(_) | SqlStatement::Update(_) | SqlStatement::Delete(_),
771 ) => Ok(()),
772 (SqlCompiledCommandSurface::Query, SqlStatement::Insert(_)) => {
773 Err(QueryError::unsupported_query(
774 "execute_sql_query rejects INSERT; use execute_sql_update::<E>()",
775 ))
776 }
777 (SqlCompiledCommandSurface::Query, SqlStatement::Update(_)) => {
778 Err(QueryError::unsupported_query(
779 "execute_sql_query rejects UPDATE; use execute_sql_update::<E>()",
780 ))
781 }
782 (SqlCompiledCommandSurface::Query, SqlStatement::Delete(_)) => {
783 Err(QueryError::unsupported_query(
784 "execute_sql_query rejects DELETE; use execute_sql_update::<E>()",
785 ))
786 }
787 (SqlCompiledCommandSurface::Update, SqlStatement::Select(_)) => {
788 Err(QueryError::unsupported_query(
789 "execute_sql_update rejects SELECT; use execute_sql_query::<E>()",
790 ))
791 }
792 (SqlCompiledCommandSurface::Update, SqlStatement::Explain(_)) => {
793 Err(QueryError::unsupported_query(
794 "execute_sql_update rejects EXPLAIN; use execute_sql_query::<E>()",
795 ))
796 }
797 (SqlCompiledCommandSurface::Update, SqlStatement::Describe(_)) => {
798 Err(QueryError::unsupported_query(
799 "execute_sql_update rejects DESCRIBE; use execute_sql_query::<E>()",
800 ))
801 }
802 (SqlCompiledCommandSurface::Update, SqlStatement::ShowIndexes(_)) => {
803 Err(QueryError::unsupported_query(
804 "execute_sql_update rejects SHOW INDEXES; use execute_sql_query::<E>()",
805 ))
806 }
807 (SqlCompiledCommandSurface::Update, SqlStatement::ShowColumns(_)) => {
808 Err(QueryError::unsupported_query(
809 "execute_sql_update rejects SHOW COLUMNS; use execute_sql_query::<E>()",
810 ))
811 }
812 (SqlCompiledCommandSurface::Update, SqlStatement::ShowEntities(_)) => {
813 Err(QueryError::unsupported_query(
814 "execute_sql_update rejects SHOW ENTITIES; use execute_sql_query::<E>()",
815 ))
816 }
817 }
818 }
819
820 pub fn execute_sql_query<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
825 where
826 E: PersistedRow<Canister = C> + EntityValue,
827 {
828 let compiled = self.compile_sql_query::<E>(sql)?;
829
830 self.execute_compiled_sql::<E>(&compiled)
831 }
832
833 #[cfg(feature = "diagnostics")]
836 #[doc(hidden)]
837 pub fn execute_sql_query_with_attribution<E>(
838 &self,
839 sql: &str,
840 ) -> Result<(SqlStatementResult, SqlQueryExecutionAttribution), QueryError>
841 where
842 E: PersistedRow<Canister = C> + EntityValue,
843 {
844 let (compile_local_instructions, compiled) =
847 measure_sql_stage(|| self.compile_sql_query_with_cache_attribution::<E>(sql));
848 let (compiled, compile_cache_attribution, compile_phase_attribution) = compiled?;
849
850 let store_get_calls_before = DataStore::current_get_call_count();
853 let pure_covering_decode_before = current_pure_covering_decode_local_instructions();
854 let pure_covering_row_assembly_before =
855 current_pure_covering_row_assembly_local_instructions();
856 let (result, execute_cache_attribution, execute_phase_attribution) =
857 self.execute_compiled_sql_with_phase_attribution::<E>(&compiled)?;
858 let store_get_calls =
859 DataStore::current_get_call_count().saturating_sub(store_get_calls_before);
860 let pure_covering_decode_local_instructions =
861 current_pure_covering_decode_local_instructions()
862 .saturating_sub(pure_covering_decode_before);
863 let pure_covering_row_assembly_local_instructions =
864 current_pure_covering_row_assembly_local_instructions()
865 .saturating_sub(pure_covering_row_assembly_before);
866 let execute_local_instructions = execute_phase_attribution
867 .planner_local_instructions
868 .saturating_add(execute_phase_attribution.store_local_instructions)
869 .saturating_add(execute_phase_attribution.executor_local_instructions);
870 let cache_attribution = compile_cache_attribution.merge(execute_cache_attribution);
871 let total_local_instructions =
872 compile_local_instructions.saturating_add(execute_local_instructions);
873
874 Ok((
875 result,
876 SqlQueryExecutionAttribution {
877 compile_local_instructions,
878 compile_cache_key_local_instructions: compile_phase_attribution.cache_key,
879 compile_cache_lookup_local_instructions: compile_phase_attribution.cache_lookup,
880 compile_parse_local_instructions: compile_phase_attribution.parse,
881 compile_parse_tokenize_local_instructions: compile_phase_attribution.parse_tokenize,
882 compile_parse_select_local_instructions: compile_phase_attribution.parse_select,
883 compile_parse_expr_local_instructions: compile_phase_attribution.parse_expr,
884 compile_parse_predicate_local_instructions: compile_phase_attribution
885 .parse_predicate,
886 compile_aggregate_lane_check_local_instructions: compile_phase_attribution
887 .aggregate_lane_check,
888 compile_prepare_local_instructions: compile_phase_attribution.prepare,
889 compile_lower_local_instructions: compile_phase_attribution.lower,
890 compile_bind_local_instructions: compile_phase_attribution.bind,
891 compile_cache_insert_local_instructions: compile_phase_attribution.cache_insert,
892 planner_local_instructions: execute_phase_attribution.planner_local_instructions,
893 store_local_instructions: execute_phase_attribution.store_local_instructions,
894 executor_local_instructions: execute_phase_attribution.executor_local_instructions,
895 grouped_stream_local_instructions: execute_phase_attribution
896 .grouped_stream_local_instructions,
897 grouped_fold_local_instructions: execute_phase_attribution
898 .grouped_fold_local_instructions,
899 grouped_finalize_local_instructions: execute_phase_attribution
900 .grouped_finalize_local_instructions,
901 grouped_count_borrowed_hash_computations: execute_phase_attribution
902 .grouped_count
903 .borrowed_hash_computations,
904 grouped_count_bucket_candidate_checks: execute_phase_attribution
905 .grouped_count
906 .bucket_candidate_checks,
907 grouped_count_existing_group_hits: execute_phase_attribution
908 .grouped_count
909 .existing_group_hits,
910 grouped_count_new_group_inserts: execute_phase_attribution
911 .grouped_count
912 .new_group_inserts,
913 grouped_count_row_materialization_local_instructions: execute_phase_attribution
914 .grouped_count
915 .row_materialization_local_instructions,
916 grouped_count_group_lookup_local_instructions: execute_phase_attribution
917 .grouped_count
918 .group_lookup_local_instructions,
919 grouped_count_existing_group_update_local_instructions: execute_phase_attribution
920 .grouped_count
921 .existing_group_update_local_instructions,
922 grouped_count_new_group_insert_local_instructions: execute_phase_attribution
923 .grouped_count
924 .new_group_insert_local_instructions,
925 pure_covering_decode_local_instructions,
926 pure_covering_row_assembly_local_instructions,
927 store_get_calls,
928 response_decode_local_instructions: 0,
929 execute_local_instructions,
930 total_local_instructions,
931 sql_compiled_command_cache_hits: cache_attribution.sql_compiled_command_cache_hits,
932 sql_compiled_command_cache_misses: cache_attribution
933 .sql_compiled_command_cache_misses,
934 shared_query_plan_cache_hits: cache_attribution.shared_query_plan_cache_hits,
935 shared_query_plan_cache_misses: cache_attribution.shared_query_plan_cache_misses,
936 },
937 ))
938 }
939
940 pub fn execute_sql_update<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
945 where
946 E: PersistedRow<Canister = C> + EntityValue,
947 {
948 let compiled = self.compile_sql_update::<E>(sql)?;
949
950 self.execute_compiled_sql::<E>(&compiled)
951 }
952
953 pub(in crate::db) fn compile_sql_query<E>(
956 &self,
957 sql: &str,
958 ) -> Result<CompiledSqlCommand, QueryError>
959 where
960 E: PersistedRow<Canister = C> + EntityValue,
961 {
962 self.compile_sql_query_with_cache_attribution::<E>(sql)
963 .map(|(compiled, _, _)| compiled)
964 }
965
966 fn compile_sql_query_with_cache_attribution<E>(
967 &self,
968 sql: &str,
969 ) -> Result<
970 (
971 CompiledSqlCommand,
972 SqlCacheAttribution,
973 SqlCompilePhaseAttribution,
974 ),
975 QueryError,
976 >
977 where
978 E: PersistedRow<Canister = C> + EntityValue,
979 {
980 self.compile_sql_surface_with_cache_attribution::<E>(sql, SqlCompiledCommandSurface::Query)
981 }
982
983 pub(in crate::db) fn compile_sql_update<E>(
986 &self,
987 sql: &str,
988 ) -> Result<CompiledSqlCommand, QueryError>
989 where
990 E: PersistedRow<Canister = C> + EntityValue,
991 {
992 self.compile_sql_update_with_cache_attribution::<E>(sql)
993 .map(|(compiled, _, _)| compiled)
994 }
995
996 fn compile_sql_update_with_cache_attribution<E>(
997 &self,
998 sql: &str,
999 ) -> Result<
1000 (
1001 CompiledSqlCommand,
1002 SqlCacheAttribution,
1003 SqlCompilePhaseAttribution,
1004 ),
1005 QueryError,
1006 >
1007 where
1008 E: PersistedRow<Canister = C> + EntityValue,
1009 {
1010 self.compile_sql_surface_with_cache_attribution::<E>(sql, SqlCompiledCommandSurface::Update)
1011 }
1012
1013 fn compile_sql_surface_with_cache_attribution<E>(
1017 &self,
1018 sql: &str,
1019 surface: SqlCompiledCommandSurface,
1020 ) -> Result<
1021 (
1022 CompiledSqlCommand,
1023 SqlCacheAttribution,
1024 SqlCompilePhaseAttribution,
1025 ),
1026 QueryError,
1027 >
1028 where
1029 E: PersistedRow<Canister = C> + EntityValue,
1030 {
1031 let (cache_key_local_instructions, cache_key) = measure_sql_stage(|| {
1032 Ok::<_, QueryError>(SqlCompiledCommandCacheKey::for_entity::<E>(surface, sql))
1033 });
1034 let cache_key = cache_key?;
1035
1036 self.compile_sql_statement_with_cache::<E, _>(
1037 cache_key,
1038 cache_key_local_instructions,
1039 sql,
1040 |statement| Self::ensure_sql_statement_supported_for_surface(statement, surface),
1041 )
1042 }
1043
1044 fn compile_sql_statement_with_cache<E, F>(
1047 &self,
1048 cache_key: SqlCompiledCommandCacheKey,
1049 cache_key_local_instructions: u64,
1050 sql: &str,
1051 ensure_surface_supported: F,
1052 ) -> Result<
1053 (
1054 CompiledSqlCommand,
1055 SqlCacheAttribution,
1056 SqlCompilePhaseAttribution,
1057 ),
1058 QueryError,
1059 >
1060 where
1061 E: PersistedRow<Canister = C> + EntityValue,
1062 F: FnOnce(&SqlStatement) -> Result<(), QueryError>,
1063 {
1064 let (cache_lookup_local_instructions, cached) = measure_sql_stage(|| {
1065 let cached =
1066 self.with_sql_compiled_command_cache(|cache| cache.get(&cache_key).cloned());
1067 Ok::<_, QueryError>(cached)
1068 });
1069 let cached = cached?;
1070 if let Some(compiled) = cached {
1071 return Ok((
1072 compiled,
1073 SqlCacheAttribution::sql_compiled_command_cache_hit(),
1074 SqlCompilePhaseAttribution::cache_hit(
1075 cache_key_local_instructions,
1076 cache_lookup_local_instructions,
1077 ),
1078 ));
1079 }
1080
1081 let (parse_local_instructions, parsed) = measure_sql_stage(|| {
1082 parse_sql_with_attribution(sql).map_err(QueryError::from_sql_parse_error)
1083 });
1084 let (parsed, parse_attribution) = parsed?;
1085 let parse_select_local_instructions = parse_local_instructions
1086 .saturating_sub(parse_attribution.tokenize)
1087 .saturating_sub(parse_attribution.expr)
1088 .saturating_sub(parse_attribution.predicate);
1089 ensure_surface_supported(&parsed)?;
1090 let authority = EntityAuthority::for_type::<E>();
1091 let (
1092 compiled,
1093 aggregate_lane_check_local_instructions,
1094 prepare_local_instructions,
1095 lower_local_instructions,
1096 bind_local_instructions,
1097 ) = Self::compile_sql_statement_for_authority(&parsed, authority, cache_key.clone())?;
1098
1099 let (cache_insert_local_instructions, cache_insert) = measure_sql_stage(|| {
1100 self.with_sql_compiled_command_cache(|cache| {
1101 cache.insert(cache_key, compiled.clone());
1102 });
1103 Ok::<_, QueryError>(())
1104 });
1105 cache_insert?;
1106
1107 Ok((
1108 compiled,
1109 SqlCacheAttribution::sql_compiled_command_cache_miss(),
1110 SqlCompilePhaseAttribution {
1111 cache_key: cache_key_local_instructions,
1112 cache_lookup: cache_lookup_local_instructions,
1113 parse: parse_local_instructions,
1114 parse_tokenize: parse_attribution.tokenize,
1115 parse_select: parse_select_local_instructions,
1116 parse_expr: parse_attribution.expr,
1117 parse_predicate: parse_attribution.predicate,
1118 aggregate_lane_check: aggregate_lane_check_local_instructions,
1119 prepare: prepare_local_instructions,
1120 lower: lower_local_instructions,
1121 bind: bind_local_instructions,
1122 cache_insert: cache_insert_local_instructions,
1123 },
1124 ))
1125 }
1126}