1mod cache;
8mod compiled;
9mod execute;
10mod projection;
11
12#[cfg(feature = "diagnostics")]
13use candid::CandidType;
14#[cfg(feature = "diagnostics")]
15use serde::Deserialize;
16use std::sync::Arc;
17
18#[cfg(feature = "diagnostics")]
19use crate::db::DataStore;
20#[cfg(feature = "diagnostics")]
21use crate::db::executor::{
22 GroupedCountAttribution as ExecutorGroupedCountAttribution, ScalarAggregateTerminalAttribution,
23 current_pure_covering_decode_local_instructions,
24 current_pure_covering_row_assembly_local_instructions,
25};
26#[cfg(test)]
27use crate::db::sql::parser::parse_sql;
28#[cfg(feature = "diagnostics")]
29use crate::db::{GroupedCountAttribution, GroupedExecutionAttribution};
30use crate::{
31 db::{
32 DbSession, GroupedRow, MissingRowPolicy, PersistedRow, QueryError,
33 commit::CommitSchemaFingerprint,
34 executor::{EntityAuthority, SharedPreparedExecutionPlan},
35 query::intent::StructuralQuery,
36 session::sql::projection::{
37 projection_fixed_scales_from_projection_spec, projection_labels_from_projection_spec,
38 },
39 sql::lowering::{
40 PreparedSqlStatement, bind_lowered_sql_delete_query_structural,
41 bind_lowered_sql_select_query_structural,
42 compile_sql_global_aggregate_command_core_from_prepared,
43 extract_prepared_sql_insert_statement, extract_prepared_sql_update_statement,
44 lower_prepared_sql_delete_statement, lower_prepared_sql_select_statement,
45 lower_sql_command_from_prepared_statement, prepare_sql_statement,
46 },
47 sql::parser::{SqlParsePhaseAttribution, SqlStatement, parse_sql_with_attribution},
48 },
49 model::entity::EntityModel,
50 traits::{CanisterKind, EntityValue},
51 value::OutputValue,
52};
53
54pub(in crate::db::session::sql) use crate::db::diagnostics::measure_local_instruction_delta as measure_sql_stage;
55pub(in crate::db::session::sql) use cache::SqlCompiledCommandSurface;
56pub(in crate::db) use cache::{SqlCacheAttribution, SqlCompiledCommandCacheKey};
57pub(in crate::db) use compiled::{CompiledSqlCommand, SqlProjectionContract};
58
59#[cfg(all(test, not(feature = "diagnostics")))]
60pub(crate) use crate::db::session::sql::projection::with_sql_projection_materialization_metrics;
61#[cfg(feature = "diagnostics")]
62pub use crate::db::session::sql::projection::{
63 SqlProjectionMaterializationMetrics, with_sql_projection_materialization_metrics,
64};
65
66#[derive(Debug)]
68pub enum SqlStatementResult {
69 Count {
70 row_count: u32,
71 },
72 Projection {
73 columns: Vec<String>,
74 fixed_scales: Vec<Option<u32>>,
75 rows: Vec<Vec<OutputValue>>,
76 row_count: u32,
77 },
78 ProjectionText {
79 columns: Vec<String>,
80 rows: Vec<Vec<String>>,
81 row_count: u32,
82 },
83 Grouped {
84 columns: Vec<String>,
85 fixed_scales: Vec<Option<u32>>,
86 rows: Vec<GroupedRow>,
87 row_count: u32,
88 next_cursor: Option<String>,
89 },
90 Explain(String),
91 Describe(crate::db::EntitySchemaDescription),
92 ShowIndexes(Vec<String>),
93 ShowColumns(Vec<crate::db::EntityFieldDescription>),
94 ShowEntities(Vec<String>),
95}
96
97#[cfg(feature = "diagnostics")]
103#[derive(CandidType, Clone, Copy, Debug, Default, Deserialize, Eq, PartialEq)]
104pub struct SqlCompileAttribution {
105 pub cache_key_local_instructions: u64,
106 pub cache_lookup_local_instructions: u64,
107 pub parse_local_instructions: u64,
108 pub parse_tokenize_local_instructions: u64,
109 pub parse_select_local_instructions: u64,
110 pub parse_expr_local_instructions: u64,
111 pub parse_predicate_local_instructions: u64,
112 pub aggregate_lane_check_local_instructions: u64,
113 pub prepare_local_instructions: u64,
114 pub lower_local_instructions: u64,
115 pub bind_local_instructions: u64,
116 pub cache_insert_local_instructions: u64,
117}
118
119#[cfg(feature = "diagnostics")]
125#[derive(CandidType, Clone, Copy, Debug, Default, Deserialize, Eq, PartialEq)]
126pub struct SqlExecutionAttribution {
127 pub planner_local_instructions: u64,
128 pub store_local_instructions: u64,
129 pub executor_invocation_local_instructions: u64,
130 pub executor_local_instructions: u64,
131 pub response_finalization_local_instructions: u64,
132}
133
134#[cfg(feature = "diagnostics")]
140#[derive(CandidType, Clone, Debug, Default, Deserialize, Eq, PartialEq)]
141pub struct SqlScalarAggregateAttribution {
142 pub base_row_local_instructions: u64,
143 pub reducer_fold_local_instructions: u64,
144 pub expression_evaluations: u64,
145 pub filter_evaluations: u64,
146 pub rows_ingested: u64,
147 pub terminal_count: u64,
148 pub unique_input_expr_count: u64,
149 pub unique_filter_expr_count: u64,
150 pub sink_mode: Option<String>,
151}
152
153#[cfg(feature = "diagnostics")]
154impl SqlScalarAggregateAttribution {
155 fn from_executor(terminal: ScalarAggregateTerminalAttribution) -> Option<Self> {
156 let has_scalar_aggregate_work = terminal.base_row_local_instructions != 0
160 || terminal.reducer_fold_local_instructions != 0
161 || terminal.expression_evaluations != 0
162 || terminal.filter_evaluations != 0
163 || terminal.rows_ingested != 0
164 || terminal.terminal_count != 0
165 || terminal.unique_input_expr_count != 0
166 || terminal.unique_filter_expr_count != 0
167 || terminal.sink_mode.label().is_some();
168 if !has_scalar_aggregate_work {
169 return None;
170 }
171
172 Some(Self {
173 base_row_local_instructions: terminal.base_row_local_instructions,
174 reducer_fold_local_instructions: terminal.reducer_fold_local_instructions,
175 expression_evaluations: terminal.expression_evaluations,
176 filter_evaluations: terminal.filter_evaluations,
177 rows_ingested: terminal.rows_ingested,
178 terminal_count: terminal.terminal_count,
179 unique_input_expr_count: terminal.unique_input_expr_count,
180 unique_filter_expr_count: terminal.unique_filter_expr_count,
181 sink_mode: terminal.sink_mode.label().map(str::to_string),
182 })
183 }
184}
185
186#[cfg(feature = "diagnostics")]
192#[derive(CandidType, Clone, Copy, Debug, Default, Deserialize, Eq, PartialEq)]
193pub struct SqlPureCoveringAttribution {
194 pub decode_local_instructions: u64,
195 pub row_assembly_local_instructions: u64,
196}
197
198#[cfg(feature = "diagnostics")]
203#[derive(CandidType, Clone, Copy, Debug, Default, Deserialize, Eq, PartialEq)]
204pub struct SqlQueryCacheAttribution {
205 pub sql_compiled_command_hits: u64,
206 pub sql_compiled_command_misses: u64,
207 pub shared_query_plan_hits: u64,
208 pub shared_query_plan_misses: u64,
209}
210
211#[cfg(feature = "diagnostics")]
220#[derive(CandidType, Clone, Debug, Default, Deserialize, Eq, PartialEq)]
221pub struct SqlQueryExecutionAttribution {
222 pub compile_local_instructions: u64,
223 pub compile: SqlCompileAttribution,
224 pub plan_lookup_local_instructions: u64,
225 pub execution: SqlExecutionAttribution,
226 pub grouped: Option<GroupedExecutionAttribution>,
227 pub scalar_aggregate: Option<SqlScalarAggregateAttribution>,
228 pub pure_covering: Option<SqlPureCoveringAttribution>,
229 pub store_get_calls: u64,
230 pub response_decode_local_instructions: u64,
231 pub execute_local_instructions: u64,
232 pub total_local_instructions: u64,
233 pub cache: SqlQueryCacheAttribution,
234}
235
236#[cfg(feature = "diagnostics")]
240#[derive(Clone, Copy, Debug, Eq, PartialEq)]
241pub(in crate::db) struct SqlExecutePhaseAttribution {
242 pub planner_local_instructions: u64,
243 pub store_local_instructions: u64,
244 pub executor_invocation_local_instructions: u64,
245 pub executor_local_instructions: u64,
246 pub response_finalization_local_instructions: u64,
247 pub grouped_stream_local_instructions: u64,
248 pub grouped_fold_local_instructions: u64,
249 pub grouped_finalize_local_instructions: u64,
250 pub grouped_count: ExecutorGroupedCountAttribution,
251 pub scalar_aggregate_terminal: ScalarAggregateTerminalAttribution,
252}
253
254#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
266pub(in crate::db) struct SqlCompilePhaseAttribution {
267 pub cache_key: u64,
268 pub cache_lookup: u64,
269 pub parse: u64,
270 pub parse_tokenize: u64,
271 pub parse_select: u64,
272 pub parse_expr: u64,
273 pub parse_predicate: u64,
274 pub aggregate_lane_check: u64,
275 pub prepare: u64,
276 pub lower: u64,
277 pub bind: u64,
278 pub cache_insert: u64,
279}
280
281#[derive(Debug)]
291pub(in crate::db) struct SqlCompileArtifacts {
292 pub command: CompiledSqlCommand,
293 pub shape: SqlQueryShape,
294 pub aggregate_lane_check: u64,
295 pub prepare: u64,
296 pub lower: u64,
297 pub bind: u64,
298}
299
300#[derive(Clone, Copy, Debug, Eq, PartialEq)]
309pub(in crate::db) struct SqlQueryShape {
310 pub is_aggregate: bool,
311 pub returns_rows: bool,
312 pub is_mutation: bool,
313}
314
315impl SqlQueryShape {
316 #[must_use]
317 const fn read_rows(is_aggregate: bool) -> Self {
318 Self {
319 is_aggregate,
320 returns_rows: true,
321 is_mutation: false,
322 }
323 }
324
325 #[must_use]
326 const fn metadata() -> Self {
327 Self {
328 is_aggregate: false,
329 returns_rows: false,
330 is_mutation: false,
331 }
332 }
333
334 #[must_use]
335 const fn mutation(returns_rows: bool) -> Self {
336 Self {
337 is_aggregate: false,
338 returns_rows,
339 is_mutation: true,
340 }
341 }
342}
343
344#[derive(Clone, Copy, Debug, Default)]
354struct SqlCompileAttributionBuilder {
355 phase: SqlCompilePhaseAttribution,
356}
357
358impl SqlCompileAttributionBuilder {
359 const fn record_cache_key(&mut self, local_instructions: u64) {
362 self.phase.cache_key = local_instructions;
363 }
364
365 const fn record_cache_lookup(&mut self, local_instructions: u64) {
367 self.phase.cache_lookup = local_instructions;
368 }
369
370 const fn record_parse(
373 &mut self,
374 local_instructions: u64,
375 attribution: SqlParsePhaseAttribution,
376 ) {
377 let statement_shell = local_instructions
378 .saturating_sub(attribution.tokenize)
379 .saturating_sub(attribution.expr)
380 .saturating_sub(attribution.predicate);
381
382 self.phase.parse = local_instructions;
383 self.phase.parse_tokenize = attribution.tokenize;
384 self.phase.parse_select = statement_shell;
388 self.phase.parse_expr = attribution.expr;
389 self.phase.parse_predicate = attribution.predicate;
390 }
391
392 const fn record_core_compile(&mut self, attribution: SqlCompilePhaseAttribution) {
395 self.phase.aggregate_lane_check = attribution.aggregate_lane_check;
396 self.phase.prepare = attribution.prepare;
397 self.phase.lower = attribution.lower;
398 self.phase.bind = attribution.bind;
399 }
400
401 const fn record_cache_insert(&mut self, local_instructions: u64) {
403 self.phase.cache_insert = local_instructions;
404 }
405
406 #[must_use]
407 const fn finish(self) -> SqlCompilePhaseAttribution {
408 self.phase
409 }
410}
411
412impl SqlCompileArtifacts {
413 fn new(
418 command: CompiledSqlCommand,
419 shape: SqlQueryShape,
420 aggregate_lane_check: u64,
421 prepare: u64,
422 lower: u64,
423 bind: u64,
424 ) -> Self {
425 debug_assert_eq!(
426 shape.is_aggregate,
427 matches!(command, CompiledSqlCommand::GlobalAggregate { .. }),
428 "compile aggregate shape must match the compiled command variant"
429 );
430 debug_assert_eq!(
431 shape.is_mutation,
432 matches!(
433 command,
434 CompiledSqlCommand::Delete { .. }
435 | CompiledSqlCommand::Insert(_)
436 | CompiledSqlCommand::Update(_)
437 ),
438 "compile mutation shape must match the compiled command variant"
439 );
440 debug_assert_eq!(
441 shape.returns_rows,
442 Self::command_returns_rows(&command),
443 "compile row-returning shape must match the compiled command variant"
444 );
445
446 Self {
447 command,
448 shape,
449 aggregate_lane_check,
450 prepare,
451 lower,
452 bind,
453 }
454 }
455
456 const fn command_returns_rows(command: &CompiledSqlCommand) -> bool {
460 match command {
461 CompiledSqlCommand::Select { .. } | CompiledSqlCommand::GlobalAggregate { .. } => true,
462 CompiledSqlCommand::Delete { returning, .. } => returning.is_some(),
463 CompiledSqlCommand::Insert(statement) => statement.returning.is_some(),
464 CompiledSqlCommand::Update(statement) => statement.returning.is_some(),
465 CompiledSqlCommand::Explain(_)
466 | CompiledSqlCommand::DescribeEntity
467 | CompiledSqlCommand::ShowIndexesEntity
468 | CompiledSqlCommand::ShowColumnsEntity
469 | CompiledSqlCommand::ShowEntities => false,
470 }
471 }
472
473 #[must_use]
477 const fn phase_attribution(&self) -> SqlCompilePhaseAttribution {
478 SqlCompilePhaseAttribution {
479 cache_key: 0,
480 cache_lookup: 0,
481 parse: 0,
482 parse_tokenize: 0,
483 parse_select: 0,
484 parse_expr: 0,
485 parse_predicate: 0,
486 aggregate_lane_check: self.aggregate_lane_check,
487 prepare: self.prepare,
488 lower: self.lower,
489 bind: self.bind,
490 cache_insert: 0,
491 }
492 }
493}
494
495#[cfg(feature = "diagnostics")]
496impl SqlExecutePhaseAttribution {
497 #[must_use]
498 pub(in crate::db) const fn from_execute_total_and_store_total(
499 execute_local_instructions: u64,
500 store_local_instructions: u64,
501 ) -> Self {
502 Self {
503 planner_local_instructions: 0,
504 store_local_instructions,
505 executor_invocation_local_instructions: execute_local_instructions,
506 executor_local_instructions: execute_local_instructions
507 .saturating_sub(store_local_instructions),
508 response_finalization_local_instructions: 0,
509 grouped_stream_local_instructions: 0,
510 grouped_fold_local_instructions: 0,
511 grouped_finalize_local_instructions: 0,
512 grouped_count: ExecutorGroupedCountAttribution::none(),
513 scalar_aggregate_terminal: ScalarAggregateTerminalAttribution::none(),
514 }
515 }
516}
517
518#[cfg(test)]
521pub(in crate::db) fn parse_sql_statement(sql: &str) -> Result<SqlStatement, QueryError> {
522 parse_sql(sql).map_err(QueryError::from_sql_parse_error)
523}
524
525fn measured<T>(stage: impl FnOnce() -> Result<T, QueryError>) -> Result<(u64, T), QueryError> {
529 let (local_instructions, result) = measure_sql_stage(stage);
530 let value = result?;
531
532 Ok((local_instructions, value))
533}
534
535impl<C: CanisterKind> DbSession<C> {
536 fn compile_sql_statement_core(
539 statement: &SqlStatement,
540 authority: EntityAuthority,
541 compiled_cache_key: SqlCompiledCommandCacheKey,
542 ) -> Result<SqlCompileArtifacts, QueryError> {
543 let model = authority.model();
544
545 match statement {
546 SqlStatement::Select(_) => Self::compile_select(statement, model, compiled_cache_key),
547 SqlStatement::Delete(_) => Self::compile_delete(statement, model),
548 SqlStatement::Insert(_) => Self::compile_insert(statement, model),
549 SqlStatement::Update(_) => Self::compile_update(statement, model),
550 SqlStatement::Explain(_) => Self::compile_explain(statement, model),
551 SqlStatement::Describe(_) => Self::compile_describe(statement, model),
552 SqlStatement::ShowIndexes(_) => Self::compile_show_indexes(statement, model),
553 SqlStatement::ShowColumns(_) => Self::compile_show_columns(statement, model),
554 SqlStatement::ShowEntities(_) => Ok(Self::compile_show_entities()),
555 }
556 }
557
558 fn prepare_statement_for_model(
561 statement: &SqlStatement,
562 model: &'static EntityModel,
563 ) -> Result<(u64, PreparedSqlStatement), QueryError> {
564 measured(|| {
565 prepare_sql_statement(statement, model.name())
566 .map_err(QueryError::from_sql_lowering_error)
567 })
568 }
569
570 fn compile_select(
574 statement: &SqlStatement,
575 model: &'static EntityModel,
576 compiled_cache_key: SqlCompiledCommandCacheKey,
577 ) -> Result<SqlCompileArtifacts, QueryError> {
578 let (prepare_local_instructions, prepared) =
579 Self::prepare_statement_for_model(statement, model)?;
580 let (aggregate_lane_check_local_instructions, requires_aggregate_lane) =
581 measured(|| Ok(prepared.statement().is_global_aggregate_lane_shape()))?;
582
583 if requires_aggregate_lane {
584 Self::compile_select_global_aggregate(
585 prepared,
586 model,
587 aggregate_lane_check_local_instructions,
588 prepare_local_instructions,
589 )
590 } else {
591 Self::compile_select_non_aggregate(
592 prepared,
593 model,
594 compiled_cache_key,
595 aggregate_lane_check_local_instructions,
596 prepare_local_instructions,
597 )
598 }
599 }
600
601 fn compile_select_global_aggregate(
606 prepared: PreparedSqlStatement,
607 model: &'static EntityModel,
608 aggregate_lane_check_local_instructions: u64,
609 prepare_local_instructions: u64,
610 ) -> Result<SqlCompileArtifacts, QueryError> {
611 let (lower_local_instructions, command) = measured(|| {
612 compile_sql_global_aggregate_command_core_from_prepared(
613 prepared,
614 model,
615 MissingRowPolicy::Ignore,
616 )
617 .map_err(QueryError::from_sql_lowering_error)
618 })?;
619
620 Ok(SqlCompileArtifacts::new(
621 CompiledSqlCommand::GlobalAggregate {
622 command: Box::new(command),
623 },
624 SqlQueryShape::read_rows(true),
625 aggregate_lane_check_local_instructions,
626 prepare_local_instructions,
627 lower_local_instructions,
628 0,
629 ))
630 }
631
632 fn compile_select_non_aggregate(
636 prepared: PreparedSqlStatement,
637 model: &'static EntityModel,
638 compiled_cache_key: SqlCompiledCommandCacheKey,
639 aggregate_lane_check_local_instructions: u64,
640 prepare_local_instructions: u64,
641 ) -> Result<SqlCompileArtifacts, QueryError> {
642 let (lower_local_instructions, select) = measured(|| {
643 lower_prepared_sql_select_statement(prepared, model)
644 .map_err(QueryError::from_sql_lowering_error)
645 })?;
646 let (bind_local_instructions, query) = measured(|| {
647 bind_lowered_sql_select_query_structural(model, select, MissingRowPolicy::Ignore)
648 .map_err(QueryError::from_sql_lowering_error)
649 })?;
650
651 Ok(SqlCompileArtifacts::new(
652 CompiledSqlCommand::Select {
653 query: Arc::new(query),
654 compiled_cache_key,
655 },
656 SqlQueryShape::read_rows(false),
657 aggregate_lane_check_local_instructions,
658 prepare_local_instructions,
659 lower_local_instructions,
660 bind_local_instructions,
661 ))
662 }
663
664 fn compile_delete(
667 statement: &SqlStatement,
668 model: &'static EntityModel,
669 ) -> Result<SqlCompileArtifacts, QueryError> {
670 let (prepare_local_instructions, prepared) =
671 Self::prepare_statement_for_model(statement, model)?;
672 let (lower_local_instructions, delete) = measured(|| {
673 lower_prepared_sql_delete_statement(prepared)
674 .map_err(QueryError::from_sql_lowering_error)
675 })?;
676 let returning = delete.returning().cloned();
677 let query = delete.into_base_query();
678 let (bind_local_instructions, query) = measured(|| {
679 Ok(bind_lowered_sql_delete_query_structural(
680 model,
681 query,
682 MissingRowPolicy::Ignore,
683 ))
684 })?;
685
686 let shape = SqlQueryShape::mutation(returning.is_some());
687
688 Ok(SqlCompileArtifacts::new(
689 CompiledSqlCommand::Delete {
690 query: Arc::new(query),
691 returning,
692 },
693 shape,
694 0,
695 prepare_local_instructions,
696 lower_local_instructions,
697 bind_local_instructions,
698 ))
699 }
700
701 fn compile_insert(
705 statement: &SqlStatement,
706 model: &'static EntityModel,
707 ) -> Result<SqlCompileArtifacts, QueryError> {
708 let (prepare_local_instructions, prepared) =
709 Self::prepare_statement_for_model(statement, model)?;
710 let statement = extract_prepared_sql_insert_statement(prepared)
711 .map_err(QueryError::from_sql_lowering_error)?;
712
713 let shape = SqlQueryShape::mutation(statement.returning.is_some());
714
715 Ok(SqlCompileArtifacts::new(
716 CompiledSqlCommand::Insert(statement),
717 shape,
718 0,
719 prepare_local_instructions,
720 0,
721 0,
722 ))
723 }
724
725 fn compile_update(
728 statement: &SqlStatement,
729 model: &'static EntityModel,
730 ) -> Result<SqlCompileArtifacts, QueryError> {
731 let (prepare_local_instructions, prepared) =
732 Self::prepare_statement_for_model(statement, model)?;
733 let statement = extract_prepared_sql_update_statement(prepared)
734 .map_err(QueryError::from_sql_lowering_error)?;
735
736 let shape = SqlQueryShape::mutation(statement.returning.is_some());
737
738 Ok(SqlCompileArtifacts::new(
739 CompiledSqlCommand::Update(statement),
740 shape,
741 0,
742 prepare_local_instructions,
743 0,
744 0,
745 ))
746 }
747
748 fn compile_explain(
751 statement: &SqlStatement,
752 model: &'static EntityModel,
753 ) -> Result<SqlCompileArtifacts, QueryError> {
754 let (prepare_local_instructions, prepared) =
755 Self::prepare_statement_for_model(statement, model)?;
756 let (lower_local_instructions, lowered) = measured(|| {
757 lower_sql_command_from_prepared_statement(prepared, model)
758 .map_err(QueryError::from_sql_lowering_error)
759 })?;
760
761 Ok(SqlCompileArtifacts::new(
762 CompiledSqlCommand::Explain(Box::new(lowered)),
763 SqlQueryShape::metadata(),
764 0,
765 prepare_local_instructions,
766 lower_local_instructions,
767 0,
768 ))
769 }
770
771 fn compile_describe(
774 statement: &SqlStatement,
775 model: &'static EntityModel,
776 ) -> Result<SqlCompileArtifacts, QueryError> {
777 let (prepare_local_instructions, _prepared) =
778 Self::prepare_statement_for_model(statement, model)?;
779
780 Ok(SqlCompileArtifacts::new(
781 CompiledSqlCommand::DescribeEntity,
782 SqlQueryShape::metadata(),
783 0,
784 prepare_local_instructions,
785 0,
786 0,
787 ))
788 }
789
790 fn compile_show_indexes(
793 statement: &SqlStatement,
794 model: &'static EntityModel,
795 ) -> Result<SqlCompileArtifacts, QueryError> {
796 let (prepare_local_instructions, _prepared) =
797 Self::prepare_statement_for_model(statement, model)?;
798
799 Ok(SqlCompileArtifacts::new(
800 CompiledSqlCommand::ShowIndexesEntity,
801 SqlQueryShape::metadata(),
802 0,
803 prepare_local_instructions,
804 0,
805 0,
806 ))
807 }
808
809 fn compile_show_columns(
812 statement: &SqlStatement,
813 model: &'static EntityModel,
814 ) -> Result<SqlCompileArtifacts, QueryError> {
815 let (prepare_local_instructions, _prepared) =
816 Self::prepare_statement_for_model(statement, model)?;
817
818 Ok(SqlCompileArtifacts::new(
819 CompiledSqlCommand::ShowColumnsEntity,
820 SqlQueryShape::metadata(),
821 0,
822 prepare_local_instructions,
823 0,
824 0,
825 ))
826 }
827
828 fn compile_show_entities() -> SqlCompileArtifacts {
831 SqlCompileArtifacts::new(
832 CompiledSqlCommand::ShowEntities,
833 SqlQueryShape::metadata(),
834 0,
835 0,
836 0,
837 0,
838 )
839 }
840
841 fn compile_sql_statement_entry(
846 statement: &SqlStatement,
847 surface: SqlCompiledCommandSurface,
848 authority: EntityAuthority,
849 compiled_cache_key: SqlCompiledCommandCacheKey,
850 ) -> Result<SqlCompileArtifacts, QueryError> {
851 Self::ensure_sql_statement_supported_for_surface(statement, surface)?;
852
853 Self::compile_sql_statement_core(statement, authority, compiled_cache_key)
854 }
855
856 fn compile_sql_statement_measured(
860 statement: &SqlStatement,
861 surface: SqlCompiledCommandSurface,
862 authority: EntityAuthority,
863 compiled_cache_key: SqlCompiledCommandCacheKey,
864 ) -> Result<(SqlCompileArtifacts, SqlCompilePhaseAttribution), QueryError> {
865 let artifacts =
866 Self::compile_sql_statement_entry(statement, surface, authority, compiled_cache_key)?;
867 debug_assert!(
868 !artifacts.shape.is_aggregate || artifacts.bind == 0,
869 "aggregate SQL artifacts must not report scalar bind work"
870 );
871 debug_assert!(
872 !artifacts.shape.is_mutation || artifacts.aggregate_lane_check == 0,
873 "mutation SQL artifacts must not report SELECT lane checks"
874 );
875 let attribution = artifacts.phase_attribution();
876
877 Ok((artifacts, attribution))
878 }
879
880 fn sql_select_prepared_plan(
883 &self,
884 query: &StructuralQuery,
885 authority: EntityAuthority,
886 cache_schema_fingerprint: CommitSchemaFingerprint,
887 ) -> Result<
888 (
889 SharedPreparedExecutionPlan,
890 SqlProjectionContract,
891 SqlCacheAttribution,
892 ),
893 QueryError,
894 > {
895 let (prepared_plan, cache_attribution) = self.cached_shared_query_plan_for_authority(
896 authority,
897 cache_schema_fingerprint,
898 query,
899 )?;
900 let projection_spec = prepared_plan
901 .logical_plan()
902 .projection_spec(authority.model());
903 let projection = SqlProjectionContract::new(
904 projection_labels_from_projection_spec(&projection_spec),
905 projection_fixed_scales_from_projection_spec(&projection_spec),
906 );
907
908 Ok((
909 prepared_plan,
910 projection,
911 SqlCacheAttribution::from_shared_query_plan_cache(cache_attribution),
912 ))
913 }
914
915 fn ensure_sql_statement_supported_for_surface(
919 statement: &SqlStatement,
920 surface: SqlCompiledCommandSurface,
921 ) -> Result<(), QueryError> {
922 match (surface, statement) {
923 (
924 SqlCompiledCommandSurface::Query,
925 SqlStatement::Select(_)
926 | SqlStatement::Explain(_)
927 | SqlStatement::Describe(_)
928 | SqlStatement::ShowIndexes(_)
929 | SqlStatement::ShowColumns(_)
930 | SqlStatement::ShowEntities(_),
931 )
932 | (
933 SqlCompiledCommandSurface::Update,
934 SqlStatement::Insert(_) | SqlStatement::Update(_) | SqlStatement::Delete(_),
935 ) => Ok(()),
936 (SqlCompiledCommandSurface::Query, SqlStatement::Insert(_)) => {
937 Err(QueryError::unsupported_query(
938 "execute_sql_query rejects INSERT; use execute_sql_update::<E>()",
939 ))
940 }
941 (SqlCompiledCommandSurface::Query, SqlStatement::Update(_)) => {
942 Err(QueryError::unsupported_query(
943 "execute_sql_query rejects UPDATE; use execute_sql_update::<E>()",
944 ))
945 }
946 (SqlCompiledCommandSurface::Query, SqlStatement::Delete(_)) => {
947 Err(QueryError::unsupported_query(
948 "execute_sql_query rejects DELETE; use execute_sql_update::<E>()",
949 ))
950 }
951 (SqlCompiledCommandSurface::Update, SqlStatement::Select(_)) => {
952 Err(QueryError::unsupported_query(
953 "execute_sql_update rejects SELECT; use execute_sql_query::<E>()",
954 ))
955 }
956 (SqlCompiledCommandSurface::Update, SqlStatement::Explain(_)) => {
957 Err(QueryError::unsupported_query(
958 "execute_sql_update rejects EXPLAIN; use execute_sql_query::<E>()",
959 ))
960 }
961 (SqlCompiledCommandSurface::Update, SqlStatement::Describe(_)) => {
962 Err(QueryError::unsupported_query(
963 "execute_sql_update rejects DESCRIBE; use execute_sql_query::<E>()",
964 ))
965 }
966 (SqlCompiledCommandSurface::Update, SqlStatement::ShowIndexes(_)) => {
967 Err(QueryError::unsupported_query(
968 "execute_sql_update rejects SHOW INDEXES; use execute_sql_query::<E>()",
969 ))
970 }
971 (SqlCompiledCommandSurface::Update, SqlStatement::ShowColumns(_)) => {
972 Err(QueryError::unsupported_query(
973 "execute_sql_update rejects SHOW COLUMNS; use execute_sql_query::<E>()",
974 ))
975 }
976 (SqlCompiledCommandSurface::Update, SqlStatement::ShowEntities(_)) => {
977 Err(QueryError::unsupported_query(
978 "execute_sql_update rejects SHOW ENTITIES; use execute_sql_query::<E>()",
979 ))
980 }
981 }
982 }
983
984 pub fn execute_sql_query<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
989 where
990 E: PersistedRow<Canister = C> + EntityValue,
991 {
992 let compiled = self.compile_sql_query::<E>(sql)?;
993
994 self.execute_compiled_sql_owned::<E>(compiled)
995 }
996
997 #[cfg(feature = "diagnostics")]
1000 #[doc(hidden)]
1001 pub fn execute_sql_query_with_attribution<E>(
1002 &self,
1003 sql: &str,
1004 ) -> Result<(SqlStatementResult, SqlQueryExecutionAttribution), QueryError>
1005 where
1006 E: PersistedRow<Canister = C> + EntityValue,
1007 {
1008 let (compile_local_instructions, compiled) =
1011 measure_sql_stage(|| self.compile_sql_query_with_cache_attribution::<E>(sql));
1012 let (compiled, compile_cache_attribution, compile_phase_attribution) = compiled?;
1013
1014 let store_get_calls_before = DataStore::current_get_call_count();
1017 let pure_covering_decode_before = current_pure_covering_decode_local_instructions();
1018 let pure_covering_row_assembly_before =
1019 current_pure_covering_row_assembly_local_instructions();
1020 let (result, execute_cache_attribution, execute_phase_attribution) =
1021 self.execute_compiled_sql_with_phase_attribution::<E>(&compiled)?;
1022 let store_get_calls =
1023 DataStore::current_get_call_count().saturating_sub(store_get_calls_before);
1024 let pure_covering_decode_local_instructions =
1025 current_pure_covering_decode_local_instructions()
1026 .saturating_sub(pure_covering_decode_before);
1027 let pure_covering_row_assembly_local_instructions =
1028 current_pure_covering_row_assembly_local_instructions()
1029 .saturating_sub(pure_covering_row_assembly_before);
1030 let execute_local_instructions = execute_phase_attribution
1031 .planner_local_instructions
1032 .saturating_add(execute_phase_attribution.store_local_instructions)
1033 .saturating_add(execute_phase_attribution.executor_local_instructions)
1034 .saturating_add(execute_phase_attribution.response_finalization_local_instructions);
1035 let cache_attribution = compile_cache_attribution.merge(execute_cache_attribution);
1036 let total_local_instructions =
1037 compile_local_instructions.saturating_add(execute_local_instructions);
1038 let grouped = matches!(&result, SqlStatementResult::Grouped { .. }).then_some(
1039 GroupedExecutionAttribution {
1040 stream_local_instructions: execute_phase_attribution
1041 .grouped_stream_local_instructions,
1042 fold_local_instructions: execute_phase_attribution.grouped_fold_local_instructions,
1043 finalize_local_instructions: execute_phase_attribution
1044 .grouped_finalize_local_instructions,
1045 count: GroupedCountAttribution::from_executor(
1046 execute_phase_attribution.grouped_count,
1047 ),
1048 },
1049 );
1050 let pure_covering = (pure_covering_decode_local_instructions > 0
1051 || pure_covering_row_assembly_local_instructions > 0)
1052 .then_some(SqlPureCoveringAttribution {
1053 decode_local_instructions: pure_covering_decode_local_instructions,
1054 row_assembly_local_instructions: pure_covering_row_assembly_local_instructions,
1055 });
1056
1057 Ok((
1058 result,
1059 SqlQueryExecutionAttribution {
1060 compile_local_instructions,
1061 compile: SqlCompileAttribution {
1062 cache_key_local_instructions: compile_phase_attribution.cache_key,
1063 cache_lookup_local_instructions: compile_phase_attribution.cache_lookup,
1064 parse_local_instructions: compile_phase_attribution.parse,
1065 parse_tokenize_local_instructions: compile_phase_attribution.parse_tokenize,
1066 parse_select_local_instructions: compile_phase_attribution.parse_select,
1067 parse_expr_local_instructions: compile_phase_attribution.parse_expr,
1068 parse_predicate_local_instructions: compile_phase_attribution.parse_predicate,
1069 aggregate_lane_check_local_instructions: compile_phase_attribution
1070 .aggregate_lane_check,
1071 prepare_local_instructions: compile_phase_attribution.prepare,
1072 lower_local_instructions: compile_phase_attribution.lower,
1073 bind_local_instructions: compile_phase_attribution.bind,
1074 cache_insert_local_instructions: compile_phase_attribution.cache_insert,
1075 },
1076 plan_lookup_local_instructions: execute_phase_attribution
1077 .planner_local_instructions,
1078 execution: SqlExecutionAttribution {
1079 planner_local_instructions: execute_phase_attribution
1080 .planner_local_instructions,
1081 store_local_instructions: execute_phase_attribution.store_local_instructions,
1082 executor_invocation_local_instructions: execute_phase_attribution
1083 .executor_invocation_local_instructions,
1084 executor_local_instructions: execute_phase_attribution
1085 .executor_local_instructions,
1086 response_finalization_local_instructions: execute_phase_attribution
1087 .response_finalization_local_instructions,
1088 },
1089 grouped,
1090 scalar_aggregate: SqlScalarAggregateAttribution::from_executor(
1091 execute_phase_attribution.scalar_aggregate_terminal,
1092 ),
1093 pure_covering,
1094 store_get_calls,
1095 response_decode_local_instructions: 0,
1096 execute_local_instructions,
1097 total_local_instructions,
1098 cache: SqlQueryCacheAttribution {
1099 sql_compiled_command_hits: cache_attribution.sql_compiled_command_cache_hits,
1100 sql_compiled_command_misses: cache_attribution
1101 .sql_compiled_command_cache_misses,
1102 shared_query_plan_hits: cache_attribution.shared_query_plan_cache_hits,
1103 shared_query_plan_misses: cache_attribution.shared_query_plan_cache_misses,
1104 },
1105 },
1106 ))
1107 }
1108
1109 pub fn execute_sql_update<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
1114 where
1115 E: PersistedRow<Canister = C> + EntityValue,
1116 {
1117 let compiled = self.compile_sql_update::<E>(sql)?;
1118
1119 self.execute_compiled_sql_owned::<E>(compiled)
1120 }
1121
1122 pub(in crate::db) fn compile_sql_query<E>(
1125 &self,
1126 sql: &str,
1127 ) -> Result<CompiledSqlCommand, QueryError>
1128 where
1129 E: PersistedRow<Canister = C> + EntityValue,
1130 {
1131 self.compile_sql_query_with_cache_attribution::<E>(sql)
1132 .map(|(compiled, _, _)| compiled)
1133 }
1134
1135 fn compile_sql_query_with_cache_attribution<E>(
1136 &self,
1137 sql: &str,
1138 ) -> Result<
1139 (
1140 CompiledSqlCommand,
1141 SqlCacheAttribution,
1142 SqlCompilePhaseAttribution,
1143 ),
1144 QueryError,
1145 >
1146 where
1147 E: PersistedRow<Canister = C> + EntityValue,
1148 {
1149 self.compile_sql_surface_with_cache_attribution::<E>(sql, SqlCompiledCommandSurface::Query)
1150 }
1151
1152 pub(in crate::db) fn compile_sql_update<E>(
1155 &self,
1156 sql: &str,
1157 ) -> Result<CompiledSqlCommand, QueryError>
1158 where
1159 E: PersistedRow<Canister = C> + EntityValue,
1160 {
1161 self.compile_sql_update_with_cache_attribution::<E>(sql)
1162 .map(|(compiled, _, _)| compiled)
1163 }
1164
1165 fn compile_sql_update_with_cache_attribution<E>(
1166 &self,
1167 sql: &str,
1168 ) -> Result<
1169 (
1170 CompiledSqlCommand,
1171 SqlCacheAttribution,
1172 SqlCompilePhaseAttribution,
1173 ),
1174 QueryError,
1175 >
1176 where
1177 E: PersistedRow<Canister = C> + EntityValue,
1178 {
1179 self.compile_sql_surface_with_cache_attribution::<E>(sql, SqlCompiledCommandSurface::Update)
1180 }
1181
1182 fn compile_sql_surface_with_cache_attribution<E>(
1186 &self,
1187 sql: &str,
1188 surface: SqlCompiledCommandSurface,
1189 ) -> Result<
1190 (
1191 CompiledSqlCommand,
1192 SqlCacheAttribution,
1193 SqlCompilePhaseAttribution,
1194 ),
1195 QueryError,
1196 >
1197 where
1198 E: PersistedRow<Canister = C> + EntityValue,
1199 {
1200 let (cache_key_local_instructions, cache_key) = measured(|| {
1201 Ok::<_, QueryError>(SqlCompiledCommandCacheKey::for_entity::<E>(surface, sql))
1202 })?;
1203 let mut attribution = SqlCompileAttributionBuilder::default();
1204 attribution.record_cache_key(cache_key_local_instructions);
1205
1206 self.compile_sql_statement_with_cache::<E>(cache_key, attribution, sql, surface)
1207 }
1208
1209 fn compile_sql_statement_with_cache<E>(
1212 &self,
1213 cache_key: SqlCompiledCommandCacheKey,
1214 mut attribution: SqlCompileAttributionBuilder,
1215 sql: &str,
1216 surface: SqlCompiledCommandSurface,
1217 ) -> Result<
1218 (
1219 CompiledSqlCommand,
1220 SqlCacheAttribution,
1221 SqlCompilePhaseAttribution,
1222 ),
1223 QueryError,
1224 >
1225 where
1226 E: PersistedRow<Canister = C> + EntityValue,
1227 {
1228 let (cache_lookup_local_instructions, cached) = measured(|| {
1229 let cached =
1230 self.with_sql_compiled_command_cache(|cache| cache.get(&cache_key).cloned());
1231 Ok::<_, QueryError>(cached)
1232 })?;
1233 attribution.record_cache_lookup(cache_lookup_local_instructions);
1234 if let Some(compiled) = cached {
1235 return Ok((
1236 compiled,
1237 SqlCacheAttribution::sql_compiled_command_cache_hit(),
1238 attribution.finish(),
1239 ));
1240 }
1241
1242 let (parse_local_instructions, (parsed, parse_attribution)) =
1243 measured(|| parse_sql_with_attribution(sql).map_err(QueryError::from_sql_parse_error))?;
1244 attribution.record_parse(parse_local_instructions, parse_attribution);
1245 let authority = EntityAuthority::for_type::<E>();
1246 let (artifacts, compile_attribution) =
1247 Self::compile_sql_statement_measured(&parsed, surface, authority, cache_key.clone())?;
1248 attribution.record_core_compile(compile_attribution);
1249 let compiled = artifacts.command;
1250
1251 let (cache_insert_local_instructions, ()) = measured(|| {
1252 self.with_sql_compiled_command_cache(|cache| {
1253 cache.insert(cache_key, compiled.clone());
1254 });
1255 Ok::<_, QueryError>(())
1256 })?;
1257 attribution.record_cache_insert(cache_insert_local_instructions);
1258
1259 Ok((
1260 compiled,
1261 SqlCacheAttribution::sql_compiled_command_cache_miss(),
1262 attribution.finish(),
1263 ))
1264 }
1265}