1mod cache;
8mod compiled;
9mod execute;
10mod projection;
11
12#[cfg(feature = "diagnostics")]
13use candid::CandidType;
14#[cfg(feature = "diagnostics")]
15use serde::Deserialize;
16use std::sync::Arc;
17
18#[cfg(feature = "diagnostics")]
19use crate::db::DataStore;
20#[cfg(feature = "diagnostics")]
21use crate::db::executor::{
22 GroupedCountAttribution as ExecutorGroupedCountAttribution, ScalarAggregateTerminalAttribution,
23 current_pure_covering_decode_local_instructions,
24 current_pure_covering_row_assembly_local_instructions,
25};
26#[cfg(test)]
27use crate::db::sql::parser::parse_sql;
28#[cfg(feature = "diagnostics")]
29use crate::db::{GroupedCountAttribution, GroupedExecutionAttribution};
30use crate::{
31 db::{
32 DbSession, GroupedRow, MissingRowPolicy, PersistedRow, QueryError,
33 commit::CommitSchemaFingerprint,
34 executor::{EntityAuthority, SharedPreparedExecutionPlan},
35 query::intent::StructuralQuery,
36 session::sql::projection::{
37 projection_fixed_scales_from_projection_spec, projection_labels_from_projection_spec,
38 },
39 sql::lowering::{
40 bind_lowered_sql_delete_query_structural, bind_lowered_sql_select_query_structural,
41 compile_sql_global_aggregate_command_core_from_prepared,
42 extract_prepared_sql_insert_statement, extract_prepared_sql_update_statement,
43 lower_prepared_sql_delete_statement, lower_prepared_sql_select_statement,
44 lower_sql_command_from_prepared_statement, prepare_sql_statement,
45 },
46 sql::parser::{SqlStatement, parse_sql_with_attribution},
47 },
48 traits::{CanisterKind, EntityValue},
49 value::OutputValue,
50};
51
52pub(in crate::db::session::sql) use crate::db::diagnostics::measure_local_instruction_delta as measure_sql_stage;
53pub(in crate::db::session::sql) use cache::SqlCompiledCommandSurface;
54pub(in crate::db) use cache::{SqlCacheAttribution, SqlCompiledCommandCacheKey};
55pub(in crate::db) use compiled::{CompiledSqlCommand, SqlProjectionContract};
56
57#[cfg(all(test, not(feature = "diagnostics")))]
58pub(crate) use crate::db::session::sql::projection::with_sql_projection_materialization_metrics;
59#[cfg(feature = "diagnostics")]
60pub use crate::db::session::sql::projection::{
61 SqlProjectionMaterializationMetrics, with_sql_projection_materialization_metrics,
62};
63
64#[derive(Debug)]
66pub enum SqlStatementResult {
67 Count {
68 row_count: u32,
69 },
70 Projection {
71 columns: Vec<String>,
72 fixed_scales: Vec<Option<u32>>,
73 rows: Vec<Vec<OutputValue>>,
74 row_count: u32,
75 },
76 ProjectionText {
77 columns: Vec<String>,
78 rows: Vec<Vec<String>>,
79 row_count: u32,
80 },
81 Grouped {
82 columns: Vec<String>,
83 fixed_scales: Vec<Option<u32>>,
84 rows: Vec<GroupedRow>,
85 row_count: u32,
86 next_cursor: Option<String>,
87 },
88 Explain(String),
89 Describe(crate::db::EntitySchemaDescription),
90 ShowIndexes(Vec<String>),
91 ShowColumns(Vec<crate::db::EntityFieldDescription>),
92 ShowEntities(Vec<String>),
93}
94
95#[cfg(feature = "diagnostics")]
101#[derive(CandidType, Clone, Copy, Debug, Default, Deserialize, Eq, PartialEq)]
102pub struct SqlCompileAttribution {
103 pub cache_key_local_instructions: u64,
104 pub cache_lookup_local_instructions: u64,
105 pub parse_local_instructions: u64,
106 pub parse_tokenize_local_instructions: u64,
107 pub parse_select_local_instructions: u64,
108 pub parse_expr_local_instructions: u64,
109 pub parse_predicate_local_instructions: u64,
110 pub aggregate_lane_check_local_instructions: u64,
111 pub prepare_local_instructions: u64,
112 pub lower_local_instructions: u64,
113 pub bind_local_instructions: u64,
114 pub cache_insert_local_instructions: u64,
115}
116
117#[cfg(feature = "diagnostics")]
123#[derive(CandidType, Clone, Copy, Debug, Default, Deserialize, Eq, PartialEq)]
124pub struct SqlExecutionAttribution {
125 pub planner_local_instructions: u64,
126 pub store_local_instructions: u64,
127 pub executor_invocation_local_instructions: u64,
128 pub executor_local_instructions: u64,
129 pub response_finalization_local_instructions: u64,
130}
131
132#[cfg(feature = "diagnostics")]
138#[derive(CandidType, Clone, Debug, Default, Deserialize, Eq, PartialEq)]
139pub struct SqlScalarAggregateAttribution {
140 pub base_row_local_instructions: u64,
141 pub reducer_fold_local_instructions: u64,
142 pub expression_evaluations: u64,
143 pub filter_evaluations: u64,
144 pub rows_ingested: u64,
145 pub terminal_count: u64,
146 pub unique_input_expr_count: u64,
147 pub unique_filter_expr_count: u64,
148 pub sink_mode: Option<String>,
149}
150
151#[cfg(feature = "diagnostics")]
152impl SqlScalarAggregateAttribution {
153 fn from_executor(terminal: ScalarAggregateTerminalAttribution) -> Option<Self> {
154 let has_scalar_aggregate_work = terminal.base_row_local_instructions != 0
158 || terminal.reducer_fold_local_instructions != 0
159 || terminal.expression_evaluations != 0
160 || terminal.filter_evaluations != 0
161 || terminal.rows_ingested != 0
162 || terminal.terminal_count != 0
163 || terminal.unique_input_expr_count != 0
164 || terminal.unique_filter_expr_count != 0
165 || terminal.sink_mode.label().is_some();
166 if !has_scalar_aggregate_work {
167 return None;
168 }
169
170 Some(Self {
171 base_row_local_instructions: terminal.base_row_local_instructions,
172 reducer_fold_local_instructions: terminal.reducer_fold_local_instructions,
173 expression_evaluations: terminal.expression_evaluations,
174 filter_evaluations: terminal.filter_evaluations,
175 rows_ingested: terminal.rows_ingested,
176 terminal_count: terminal.terminal_count,
177 unique_input_expr_count: terminal.unique_input_expr_count,
178 unique_filter_expr_count: terminal.unique_filter_expr_count,
179 sink_mode: terminal.sink_mode.label().map(str::to_string),
180 })
181 }
182}
183
184#[cfg(feature = "diagnostics")]
190#[derive(CandidType, Clone, Copy, Debug, Default, Deserialize, Eq, PartialEq)]
191pub struct SqlPureCoveringAttribution {
192 pub decode_local_instructions: u64,
193 pub row_assembly_local_instructions: u64,
194}
195
196#[cfg(feature = "diagnostics")]
201#[derive(CandidType, Clone, Copy, Debug, Default, Deserialize, Eq, PartialEq)]
202pub struct SqlQueryCacheAttribution {
203 pub sql_compiled_command_hits: u64,
204 pub sql_compiled_command_misses: u64,
205 pub shared_query_plan_hits: u64,
206 pub shared_query_plan_misses: u64,
207}
208
209#[cfg(feature = "diagnostics")]
218#[derive(CandidType, Clone, Debug, Default, Deserialize, Eq, PartialEq)]
219pub struct SqlQueryExecutionAttribution {
220 pub compile_local_instructions: u64,
221 pub compile: SqlCompileAttribution,
222 pub plan_lookup_local_instructions: u64,
223 pub execution: SqlExecutionAttribution,
224 pub grouped: Option<GroupedExecutionAttribution>,
225 pub scalar_aggregate: Option<SqlScalarAggregateAttribution>,
226 pub pure_covering: Option<SqlPureCoveringAttribution>,
227 pub store_get_calls: u64,
228 pub response_decode_local_instructions: u64,
229 pub execute_local_instructions: u64,
230 pub total_local_instructions: u64,
231 pub cache: SqlQueryCacheAttribution,
232}
233
234#[cfg(feature = "diagnostics")]
238#[derive(Clone, Copy, Debug, Eq, PartialEq)]
239pub(in crate::db) struct SqlExecutePhaseAttribution {
240 pub planner_local_instructions: u64,
241 pub store_local_instructions: u64,
242 pub executor_invocation_local_instructions: u64,
243 pub executor_local_instructions: u64,
244 pub response_finalization_local_instructions: u64,
245 pub grouped_stream_local_instructions: u64,
246 pub grouped_fold_local_instructions: u64,
247 pub grouped_finalize_local_instructions: u64,
248 pub grouped_count: ExecutorGroupedCountAttribution,
249 pub scalar_aggregate_terminal: ScalarAggregateTerminalAttribution,
250}
251
252#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
264pub(in crate::db) struct SqlCompilePhaseAttribution {
265 pub cache_key: u64,
266 pub cache_lookup: u64,
267 pub parse: u64,
268 pub parse_tokenize: u64,
269 pub parse_select: u64,
270 pub parse_expr: u64,
271 pub parse_predicate: u64,
272 pub aggregate_lane_check: u64,
273 pub prepare: u64,
274 pub lower: u64,
275 pub bind: u64,
276 pub cache_insert: u64,
277}
278
279impl SqlCompilePhaseAttribution {
280 #[must_use]
281 const fn cache_hit(cache_key: u64, cache_lookup: u64) -> Self {
282 Self {
283 cache_key,
284 cache_lookup,
285 parse: 0,
286 parse_tokenize: 0,
287 parse_select: 0,
288 parse_expr: 0,
289 parse_predicate: 0,
290 aggregate_lane_check: 0,
291 prepare: 0,
292 lower: 0,
293 bind: 0,
294 cache_insert: 0,
295 }
296 }
297}
298
299#[cfg(feature = "diagnostics")]
300impl SqlExecutePhaseAttribution {
301 #[must_use]
302 pub(in crate::db) const fn from_execute_total_and_store_total(
303 execute_local_instructions: u64,
304 store_local_instructions: u64,
305 ) -> Self {
306 Self {
307 planner_local_instructions: 0,
308 store_local_instructions,
309 executor_invocation_local_instructions: execute_local_instructions,
310 executor_local_instructions: execute_local_instructions
311 .saturating_sub(store_local_instructions),
312 response_finalization_local_instructions: 0,
313 grouped_stream_local_instructions: 0,
314 grouped_fold_local_instructions: 0,
315 grouped_finalize_local_instructions: 0,
316 grouped_count: ExecutorGroupedCountAttribution::none(),
317 scalar_aggregate_terminal: ScalarAggregateTerminalAttribution::none(),
318 }
319 }
320}
321
322#[cfg(test)]
325pub(in crate::db) fn parse_sql_statement(sql: &str) -> Result<SqlStatement, QueryError> {
326 parse_sql(sql).map_err(QueryError::from_sql_parse_error)
327}
328
329impl<C: CanisterKind> DbSession<C> {
330 #[expect(clippy::too_many_lines)]
333 fn compile_sql_statement_for_authority(
334 statement: &SqlStatement,
335 authority: EntityAuthority,
336 compiled_cache_key: SqlCompiledCommandCacheKey,
337 ) -> Result<(CompiledSqlCommand, u64, u64, u64, u64), QueryError> {
338 let prepare_statement = || {
342 measure_sql_stage(|| {
343 prepare_sql_statement(statement.clone(), authority.model().name())
344 .map_err(QueryError::from_sql_lowering_error)
345 })
346 };
347
348 match statement {
349 SqlStatement::Select(_) => {
350 let (prepare_local_instructions, prepared) = prepare_statement();
351 let prepared = prepared?;
352 let (aggregate_lane_check_local_instructions, requires_aggregate_lane) =
353 measure_sql_stage(|| {
354 Ok::<_, QueryError>(prepared.statement().is_global_aggregate_lane_shape())
355 });
356 let requires_aggregate_lane = requires_aggregate_lane?;
357
358 if requires_aggregate_lane {
359 let (lower_local_instructions, command) = measure_sql_stage(|| {
360 compile_sql_global_aggregate_command_core_from_prepared(
361 prepared,
362 authority.model(),
363 MissingRowPolicy::Ignore,
364 )
365 .map_err(QueryError::from_sql_lowering_error)
366 });
367 let command = command?;
368
369 Ok((
370 CompiledSqlCommand::GlobalAggregate {
371 command: Box::new(command),
372 },
373 aggregate_lane_check_local_instructions,
374 prepare_local_instructions,
375 lower_local_instructions,
376 0,
377 ))
378 } else {
379 let (lower_local_instructions, select) = measure_sql_stage(|| {
380 lower_prepared_sql_select_statement(prepared, authority.model())
381 .map_err(QueryError::from_sql_lowering_error)
382 });
383 let select = select?;
384 let (bind_local_instructions, query) = measure_sql_stage(|| {
385 bind_lowered_sql_select_query_structural(
386 authority.model(),
387 select,
388 MissingRowPolicy::Ignore,
389 )
390 .map_err(QueryError::from_sql_lowering_error)
391 });
392 let query = query?;
393
394 Ok((
395 CompiledSqlCommand::Select {
396 query: Arc::new(query),
397 compiled_cache_key,
398 },
399 aggregate_lane_check_local_instructions,
400 prepare_local_instructions,
401 lower_local_instructions,
402 bind_local_instructions,
403 ))
404 }
405 }
406 SqlStatement::Delete(_) => {
407 let (prepare_local_instructions, prepared) = prepare_statement();
408 let prepared = prepared?;
409 let (lower_local_instructions, lowered) = measure_sql_stage(|| {
410 lower_prepared_sql_delete_statement(prepared)
411 .map_err(QueryError::from_sql_lowering_error)
412 });
413 let delete = lowered?;
414 let returning = delete.returning().cloned();
415 let query = delete.into_base_query();
416 let (bind_local_instructions, query) = measure_sql_stage(|| {
417 Ok::<_, QueryError>(bind_lowered_sql_delete_query_structural(
418 authority.model(),
419 query,
420 MissingRowPolicy::Ignore,
421 ))
422 });
423 let query = query?;
424
425 Ok((
426 CompiledSqlCommand::Delete {
427 query: Arc::new(query),
428 returning,
429 },
430 0,
431 prepare_local_instructions,
432 lower_local_instructions,
433 bind_local_instructions,
434 ))
435 }
436 SqlStatement::Insert(_) => {
437 let (prepare_local_instructions, prepared) = prepare_statement();
438 let prepared = prepared?;
439 let statement = extract_prepared_sql_insert_statement(prepared)
440 .map_err(QueryError::from_sql_lowering_error)?;
441
442 Ok((
443 CompiledSqlCommand::Insert(statement),
444 0,
445 prepare_local_instructions,
446 0,
447 0,
448 ))
449 }
450 SqlStatement::Update(_) => {
451 let (prepare_local_instructions, prepared) = prepare_statement();
452 let prepared = prepared?;
453 let statement = extract_prepared_sql_update_statement(prepared)
454 .map_err(QueryError::from_sql_lowering_error)?;
455
456 Ok((
457 CompiledSqlCommand::Update(statement),
458 0,
459 prepare_local_instructions,
460 0,
461 0,
462 ))
463 }
464 SqlStatement::Explain(_) => {
465 let (prepare_local_instructions, prepared) = prepare_statement();
466 let prepared = prepared?;
467 let (lower_local_instructions, lowered) = measure_sql_stage(|| {
468 lower_sql_command_from_prepared_statement(prepared, authority.model())
469 .map_err(QueryError::from_sql_lowering_error)
470 });
471 let lowered = lowered?;
472
473 Ok((
474 CompiledSqlCommand::Explain(Box::new(lowered)),
475 0,
476 prepare_local_instructions,
477 lower_local_instructions,
478 0,
479 ))
480 }
481 SqlStatement::Describe(_) => {
482 let (prepare_local_instructions, prepared) = prepare_statement();
483 let _prepared = prepared?;
484
485 Ok((
486 CompiledSqlCommand::DescribeEntity,
487 0,
488 prepare_local_instructions,
489 0,
490 0,
491 ))
492 }
493 SqlStatement::ShowIndexes(_) => {
494 let (prepare_local_instructions, prepared) = prepare_statement();
495 let _prepared = prepared?;
496
497 Ok((
498 CompiledSqlCommand::ShowIndexesEntity,
499 0,
500 prepare_local_instructions,
501 0,
502 0,
503 ))
504 }
505 SqlStatement::ShowColumns(_) => {
506 let (prepare_local_instructions, prepared) = prepare_statement();
507 let _prepared = prepared?;
508
509 Ok((
510 CompiledSqlCommand::ShowColumnsEntity,
511 0,
512 prepare_local_instructions,
513 0,
514 0,
515 ))
516 }
517 SqlStatement::ShowEntities(_) => Ok((CompiledSqlCommand::ShowEntities, 0, 0, 0, 0)),
518 }
519 }
520
521 fn sql_select_prepared_plan(
524 &self,
525 query: &StructuralQuery,
526 authority: EntityAuthority,
527 cache_schema_fingerprint: CommitSchemaFingerprint,
528 ) -> Result<
529 (
530 SharedPreparedExecutionPlan,
531 SqlProjectionContract,
532 SqlCacheAttribution,
533 ),
534 QueryError,
535 > {
536 let (prepared_plan, cache_attribution) = self.cached_shared_query_plan_for_authority(
537 authority,
538 cache_schema_fingerprint,
539 query,
540 )?;
541 let projection_spec = prepared_plan
542 .logical_plan()
543 .projection_spec(authority.model());
544 let projection = SqlProjectionContract::new(
545 projection_labels_from_projection_spec(&projection_spec),
546 projection_fixed_scales_from_projection_spec(&projection_spec),
547 );
548
549 Ok((
550 prepared_plan,
551 projection,
552 SqlCacheAttribution::from_shared_query_plan_cache(cache_attribution),
553 ))
554 }
555
556 fn ensure_sql_statement_supported_for_surface(
560 statement: &SqlStatement,
561 surface: SqlCompiledCommandSurface,
562 ) -> Result<(), QueryError> {
563 match (surface, statement) {
564 (
565 SqlCompiledCommandSurface::Query,
566 SqlStatement::Select(_)
567 | SqlStatement::Explain(_)
568 | SqlStatement::Describe(_)
569 | SqlStatement::ShowIndexes(_)
570 | SqlStatement::ShowColumns(_)
571 | SqlStatement::ShowEntities(_),
572 )
573 | (
574 SqlCompiledCommandSurface::Update,
575 SqlStatement::Insert(_) | SqlStatement::Update(_) | SqlStatement::Delete(_),
576 ) => Ok(()),
577 (SqlCompiledCommandSurface::Query, SqlStatement::Insert(_)) => {
578 Err(QueryError::unsupported_query(
579 "execute_sql_query rejects INSERT; use execute_sql_update::<E>()",
580 ))
581 }
582 (SqlCompiledCommandSurface::Query, SqlStatement::Update(_)) => {
583 Err(QueryError::unsupported_query(
584 "execute_sql_query rejects UPDATE; use execute_sql_update::<E>()",
585 ))
586 }
587 (SqlCompiledCommandSurface::Query, SqlStatement::Delete(_)) => {
588 Err(QueryError::unsupported_query(
589 "execute_sql_query rejects DELETE; use execute_sql_update::<E>()",
590 ))
591 }
592 (SqlCompiledCommandSurface::Update, SqlStatement::Select(_)) => {
593 Err(QueryError::unsupported_query(
594 "execute_sql_update rejects SELECT; use execute_sql_query::<E>()",
595 ))
596 }
597 (SqlCompiledCommandSurface::Update, SqlStatement::Explain(_)) => {
598 Err(QueryError::unsupported_query(
599 "execute_sql_update rejects EXPLAIN; use execute_sql_query::<E>()",
600 ))
601 }
602 (SqlCompiledCommandSurface::Update, SqlStatement::Describe(_)) => {
603 Err(QueryError::unsupported_query(
604 "execute_sql_update rejects DESCRIBE; use execute_sql_query::<E>()",
605 ))
606 }
607 (SqlCompiledCommandSurface::Update, SqlStatement::ShowIndexes(_)) => {
608 Err(QueryError::unsupported_query(
609 "execute_sql_update rejects SHOW INDEXES; use execute_sql_query::<E>()",
610 ))
611 }
612 (SqlCompiledCommandSurface::Update, SqlStatement::ShowColumns(_)) => {
613 Err(QueryError::unsupported_query(
614 "execute_sql_update rejects SHOW COLUMNS; use execute_sql_query::<E>()",
615 ))
616 }
617 (SqlCompiledCommandSurface::Update, SqlStatement::ShowEntities(_)) => {
618 Err(QueryError::unsupported_query(
619 "execute_sql_update rejects SHOW ENTITIES; use execute_sql_query::<E>()",
620 ))
621 }
622 }
623 }
624
625 pub fn execute_sql_query<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
630 where
631 E: PersistedRow<Canister = C> + EntityValue,
632 {
633 let compiled = self.compile_sql_query::<E>(sql)?;
634
635 self.execute_compiled_sql_owned::<E>(compiled)
636 }
637
638 #[cfg(feature = "diagnostics")]
641 #[doc(hidden)]
642 pub fn execute_sql_query_with_attribution<E>(
643 &self,
644 sql: &str,
645 ) -> Result<(SqlStatementResult, SqlQueryExecutionAttribution), QueryError>
646 where
647 E: PersistedRow<Canister = C> + EntityValue,
648 {
649 let (compile_local_instructions, compiled) =
652 measure_sql_stage(|| self.compile_sql_query_with_cache_attribution::<E>(sql));
653 let (compiled, compile_cache_attribution, compile_phase_attribution) = compiled?;
654
655 let store_get_calls_before = DataStore::current_get_call_count();
658 let pure_covering_decode_before = current_pure_covering_decode_local_instructions();
659 let pure_covering_row_assembly_before =
660 current_pure_covering_row_assembly_local_instructions();
661 let (result, execute_cache_attribution, execute_phase_attribution) =
662 self.execute_compiled_sql_with_phase_attribution::<E>(&compiled)?;
663 let store_get_calls =
664 DataStore::current_get_call_count().saturating_sub(store_get_calls_before);
665 let pure_covering_decode_local_instructions =
666 current_pure_covering_decode_local_instructions()
667 .saturating_sub(pure_covering_decode_before);
668 let pure_covering_row_assembly_local_instructions =
669 current_pure_covering_row_assembly_local_instructions()
670 .saturating_sub(pure_covering_row_assembly_before);
671 let execute_local_instructions = execute_phase_attribution
672 .planner_local_instructions
673 .saturating_add(execute_phase_attribution.store_local_instructions)
674 .saturating_add(execute_phase_attribution.executor_local_instructions)
675 .saturating_add(execute_phase_attribution.response_finalization_local_instructions);
676 let cache_attribution = compile_cache_attribution.merge(execute_cache_attribution);
677 let total_local_instructions =
678 compile_local_instructions.saturating_add(execute_local_instructions);
679 let grouped = matches!(&result, SqlStatementResult::Grouped { .. }).then_some(
680 GroupedExecutionAttribution {
681 stream_local_instructions: execute_phase_attribution
682 .grouped_stream_local_instructions,
683 fold_local_instructions: execute_phase_attribution.grouped_fold_local_instructions,
684 finalize_local_instructions: execute_phase_attribution
685 .grouped_finalize_local_instructions,
686 count: GroupedCountAttribution::from_executor(
687 execute_phase_attribution.grouped_count,
688 ),
689 },
690 );
691 let pure_covering = (pure_covering_decode_local_instructions > 0
692 || pure_covering_row_assembly_local_instructions > 0)
693 .then_some(SqlPureCoveringAttribution {
694 decode_local_instructions: pure_covering_decode_local_instructions,
695 row_assembly_local_instructions: pure_covering_row_assembly_local_instructions,
696 });
697
698 Ok((
699 result,
700 SqlQueryExecutionAttribution {
701 compile_local_instructions,
702 compile: SqlCompileAttribution {
703 cache_key_local_instructions: compile_phase_attribution.cache_key,
704 cache_lookup_local_instructions: compile_phase_attribution.cache_lookup,
705 parse_local_instructions: compile_phase_attribution.parse,
706 parse_tokenize_local_instructions: compile_phase_attribution.parse_tokenize,
707 parse_select_local_instructions: compile_phase_attribution.parse_select,
708 parse_expr_local_instructions: compile_phase_attribution.parse_expr,
709 parse_predicate_local_instructions: compile_phase_attribution.parse_predicate,
710 aggregate_lane_check_local_instructions: compile_phase_attribution
711 .aggregate_lane_check,
712 prepare_local_instructions: compile_phase_attribution.prepare,
713 lower_local_instructions: compile_phase_attribution.lower,
714 bind_local_instructions: compile_phase_attribution.bind,
715 cache_insert_local_instructions: compile_phase_attribution.cache_insert,
716 },
717 plan_lookup_local_instructions: execute_phase_attribution
718 .planner_local_instructions,
719 execution: SqlExecutionAttribution {
720 planner_local_instructions: execute_phase_attribution
721 .planner_local_instructions,
722 store_local_instructions: execute_phase_attribution.store_local_instructions,
723 executor_invocation_local_instructions: execute_phase_attribution
724 .executor_invocation_local_instructions,
725 executor_local_instructions: execute_phase_attribution
726 .executor_local_instructions,
727 response_finalization_local_instructions: execute_phase_attribution
728 .response_finalization_local_instructions,
729 },
730 grouped,
731 scalar_aggregate: SqlScalarAggregateAttribution::from_executor(
732 execute_phase_attribution.scalar_aggregate_terminal,
733 ),
734 pure_covering,
735 store_get_calls,
736 response_decode_local_instructions: 0,
737 execute_local_instructions,
738 total_local_instructions,
739 cache: SqlQueryCacheAttribution {
740 sql_compiled_command_hits: cache_attribution.sql_compiled_command_cache_hits,
741 sql_compiled_command_misses: cache_attribution
742 .sql_compiled_command_cache_misses,
743 shared_query_plan_hits: cache_attribution.shared_query_plan_cache_hits,
744 shared_query_plan_misses: cache_attribution.shared_query_plan_cache_misses,
745 },
746 },
747 ))
748 }
749
750 pub fn execute_sql_update<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
755 where
756 E: PersistedRow<Canister = C> + EntityValue,
757 {
758 let compiled = self.compile_sql_update::<E>(sql)?;
759
760 self.execute_compiled_sql_owned::<E>(compiled)
761 }
762
763 pub(in crate::db) fn compile_sql_query<E>(
766 &self,
767 sql: &str,
768 ) -> Result<CompiledSqlCommand, QueryError>
769 where
770 E: PersistedRow<Canister = C> + EntityValue,
771 {
772 self.compile_sql_query_with_cache_attribution::<E>(sql)
773 .map(|(compiled, _, _)| compiled)
774 }
775
776 fn compile_sql_query_with_cache_attribution<E>(
777 &self,
778 sql: &str,
779 ) -> Result<
780 (
781 CompiledSqlCommand,
782 SqlCacheAttribution,
783 SqlCompilePhaseAttribution,
784 ),
785 QueryError,
786 >
787 where
788 E: PersistedRow<Canister = C> + EntityValue,
789 {
790 self.compile_sql_surface_with_cache_attribution::<E>(sql, SqlCompiledCommandSurface::Query)
791 }
792
793 pub(in crate::db) fn compile_sql_update<E>(
796 &self,
797 sql: &str,
798 ) -> Result<CompiledSqlCommand, QueryError>
799 where
800 E: PersistedRow<Canister = C> + EntityValue,
801 {
802 self.compile_sql_update_with_cache_attribution::<E>(sql)
803 .map(|(compiled, _, _)| compiled)
804 }
805
806 fn compile_sql_update_with_cache_attribution<E>(
807 &self,
808 sql: &str,
809 ) -> Result<
810 (
811 CompiledSqlCommand,
812 SqlCacheAttribution,
813 SqlCompilePhaseAttribution,
814 ),
815 QueryError,
816 >
817 where
818 E: PersistedRow<Canister = C> + EntityValue,
819 {
820 self.compile_sql_surface_with_cache_attribution::<E>(sql, SqlCompiledCommandSurface::Update)
821 }
822
823 fn compile_sql_surface_with_cache_attribution<E>(
827 &self,
828 sql: &str,
829 surface: SqlCompiledCommandSurface,
830 ) -> Result<
831 (
832 CompiledSqlCommand,
833 SqlCacheAttribution,
834 SqlCompilePhaseAttribution,
835 ),
836 QueryError,
837 >
838 where
839 E: PersistedRow<Canister = C> + EntityValue,
840 {
841 let (cache_key_local_instructions, cache_key) = measure_sql_stage(|| {
842 Ok::<_, QueryError>(SqlCompiledCommandCacheKey::for_entity::<E>(surface, sql))
843 });
844 let cache_key = cache_key?;
845
846 self.compile_sql_statement_with_cache::<E, _>(
847 cache_key,
848 cache_key_local_instructions,
849 sql,
850 |statement| Self::ensure_sql_statement_supported_for_surface(statement, surface),
851 )
852 }
853
854 fn compile_sql_statement_with_cache<E, F>(
857 &self,
858 cache_key: SqlCompiledCommandCacheKey,
859 cache_key_local_instructions: u64,
860 sql: &str,
861 ensure_surface_supported: F,
862 ) -> Result<
863 (
864 CompiledSqlCommand,
865 SqlCacheAttribution,
866 SqlCompilePhaseAttribution,
867 ),
868 QueryError,
869 >
870 where
871 E: PersistedRow<Canister = C> + EntityValue,
872 F: FnOnce(&SqlStatement) -> Result<(), QueryError>,
873 {
874 let (cache_lookup_local_instructions, cached) = measure_sql_stage(|| {
875 let cached =
876 self.with_sql_compiled_command_cache(|cache| cache.get(&cache_key).cloned());
877 Ok::<_, QueryError>(cached)
878 });
879 let cached = cached?;
880 if let Some(compiled) = cached {
881 return Ok((
882 compiled,
883 SqlCacheAttribution::sql_compiled_command_cache_hit(),
884 SqlCompilePhaseAttribution::cache_hit(
885 cache_key_local_instructions,
886 cache_lookup_local_instructions,
887 ),
888 ));
889 }
890
891 let (parse_local_instructions, parsed) = measure_sql_stage(|| {
892 parse_sql_with_attribution(sql).map_err(QueryError::from_sql_parse_error)
893 });
894 let (parsed, parse_attribution) = parsed?;
895 let parse_select_local_instructions = parse_local_instructions
896 .saturating_sub(parse_attribution.tokenize)
897 .saturating_sub(parse_attribution.expr)
898 .saturating_sub(parse_attribution.predicate);
899 ensure_surface_supported(&parsed)?;
900 let authority = EntityAuthority::for_type::<E>();
901 let (
902 compiled,
903 aggregate_lane_check_local_instructions,
904 prepare_local_instructions,
905 lower_local_instructions,
906 bind_local_instructions,
907 ) = Self::compile_sql_statement_for_authority(&parsed, authority, cache_key.clone())?;
908
909 let (cache_insert_local_instructions, cache_insert) = measure_sql_stage(|| {
910 self.with_sql_compiled_command_cache(|cache| {
911 cache.insert(cache_key, compiled.clone());
912 });
913 Ok::<_, QueryError>(())
914 });
915 cache_insert?;
916
917 Ok((
918 compiled,
919 SqlCacheAttribution::sql_compiled_command_cache_miss(),
920 SqlCompilePhaseAttribution {
921 cache_key: cache_key_local_instructions,
922 cache_lookup: cache_lookup_local_instructions,
923 parse: parse_local_instructions,
924 parse_tokenize: parse_attribution.tokenize,
925 parse_select: parse_select_local_instructions,
926 parse_expr: parse_attribution.expr,
927 parse_predicate: parse_attribution.predicate,
928 aggregate_lane_check: aggregate_lane_check_local_instructions,
929 prepare: prepare_local_instructions,
930 lower: lower_local_instructions,
931 bind: bind_local_instructions,
932 cache_insert: cache_insert_local_instructions,
933 },
934 ))
935 }
936}