1mod cache;
8mod compiled;
9mod execute;
10mod projection;
11
12#[cfg(feature = "diagnostics")]
13use candid::CandidType;
14#[cfg(feature = "diagnostics")]
15use serde::Deserialize;
16use std::sync::Arc;
17
18#[cfg(feature = "diagnostics")]
19use crate::db::DataStore;
20#[cfg(feature = "diagnostics")]
21use crate::db::executor::{
22 GroupedCountAttribution, ScalarAggregateTerminalAttribution,
23 current_pure_covering_decode_local_instructions,
24 current_pure_covering_row_assembly_local_instructions,
25};
26#[cfg(test)]
27use crate::db::sql::parser::parse_sql;
28use crate::{
29 db::{
30 DbSession, GroupedRow, MissingRowPolicy, PersistedRow, QueryError,
31 commit::CommitSchemaFingerprint,
32 executor::{EntityAuthority, SharedPreparedExecutionPlan},
33 query::intent::StructuralQuery,
34 session::sql::projection::{
35 projection_fixed_scales_from_projection_spec, projection_labels_from_projection_spec,
36 },
37 sql::lowering::{
38 bind_lowered_sql_delete_query_structural, bind_lowered_sql_select_query_structural,
39 compile_sql_global_aggregate_command_core_from_prepared,
40 extract_prepared_sql_insert_statement, extract_prepared_sql_update_statement,
41 lower_prepared_sql_delete_statement, lower_prepared_sql_select_statement,
42 lower_sql_command_from_prepared_statement, prepare_sql_statement,
43 },
44 sql::parser::{SqlStatement, parse_sql_with_attribution},
45 },
46 traits::{CanisterKind, EntityValue},
47 value::OutputValue,
48};
49
50pub(in crate::db::session::sql) use crate::db::diagnostics::measure_local_instruction_delta as measure_sql_stage;
51pub(in crate::db::session::sql) use cache::SqlCompiledCommandSurface;
52pub(in crate::db) use cache::{SqlCacheAttribution, SqlCompiledCommandCacheKey};
53pub(in crate::db) use compiled::{CompiledSqlCommand, SqlProjectionContract};
54
55#[cfg(all(test, not(feature = "diagnostics")))]
56pub(crate) use crate::db::session::sql::projection::with_sql_projection_materialization_metrics;
57#[cfg(feature = "diagnostics")]
58pub use crate::db::session::sql::projection::{
59 SqlProjectionMaterializationMetrics, with_sql_projection_materialization_metrics,
60};
61
62#[derive(Debug)]
64pub enum SqlStatementResult {
65 Count {
66 row_count: u32,
67 },
68 Projection {
69 columns: Vec<String>,
70 fixed_scales: Vec<Option<u32>>,
71 rows: Vec<Vec<OutputValue>>,
72 row_count: u32,
73 },
74 ProjectionText {
75 columns: Vec<String>,
76 rows: Vec<Vec<String>>,
77 row_count: u32,
78 },
79 Grouped {
80 columns: Vec<String>,
81 fixed_scales: Vec<Option<u32>>,
82 rows: Vec<GroupedRow>,
83 row_count: u32,
84 next_cursor: Option<String>,
85 },
86 Explain(String),
87 Describe(crate::db::EntitySchemaDescription),
88 ShowIndexes(Vec<String>),
89 ShowColumns(Vec<crate::db::EntityFieldDescription>),
90 ShowEntities(Vec<String>),
91}
92
93#[cfg(feature = "diagnostics")]
107#[derive(CandidType, Clone, Debug, Default, Deserialize, Eq, PartialEq)]
108pub struct SqlQueryExecutionAttribution {
109 pub compile_local_instructions: u64,
110 pub compile_cache_key_local_instructions: u64,
111 pub compile_cache_lookup_local_instructions: u64,
112 pub compile_parse_local_instructions: u64,
113 pub compile_parse_tokenize_local_instructions: u64,
114 pub compile_parse_select_local_instructions: u64,
115 pub compile_parse_expr_local_instructions: u64,
116 pub compile_parse_predicate_local_instructions: u64,
117 pub compile_aggregate_lane_check_local_instructions: u64,
118 pub compile_prepare_local_instructions: u64,
119 pub compile_lower_local_instructions: u64,
120 pub compile_bind_local_instructions: u64,
121 pub compile_cache_insert_local_instructions: u64,
122 pub plan_lookup_local_instructions: u64,
123 pub planner_local_instructions: u64,
124 pub store_local_instructions: u64,
125 pub executor_invocation_local_instructions: u64,
126 pub executor_local_instructions: u64,
127 pub response_finalization_local_instructions: u64,
128 pub grouped_stream_local_instructions: u64,
129 pub grouped_fold_local_instructions: u64,
130 pub grouped_finalize_local_instructions: u64,
131 pub grouped_count_borrowed_hash_computations: u64,
132 pub grouped_count_bucket_candidate_checks: u64,
133 pub grouped_count_existing_group_hits: u64,
134 pub grouped_count_new_group_inserts: u64,
135 pub grouped_count_row_materialization_local_instructions: u64,
136 pub grouped_count_group_lookup_local_instructions: u64,
137 pub grouped_count_existing_group_update_local_instructions: u64,
138 pub grouped_count_new_group_insert_local_instructions: u64,
139 pub scalar_aggregate_base_row_local_instructions: u64,
140 pub scalar_aggregate_reducer_fold_local_instructions: u64,
141 pub scalar_aggregate_expression_evaluations: u64,
142 pub scalar_aggregate_filter_evaluations: u64,
143 pub scalar_aggregate_rows_ingested: u64,
144 pub scalar_aggregate_terminal_count: u64,
145 pub scalar_aggregate_unique_input_expr_count: u64,
146 pub scalar_aggregate_unique_filter_expr_count: u64,
147 pub scalar_aggregate_sink_mode: Option<String>,
148 pub pure_covering_decode_local_instructions: u64,
149 pub pure_covering_row_assembly_local_instructions: u64,
150 pub store_get_calls: u64,
151 pub response_decode_local_instructions: u64,
152 pub execute_local_instructions: u64,
153 pub total_local_instructions: u64,
154 pub sql_compiled_command_cache_hits: u64,
155 pub sql_compiled_command_cache_misses: u64,
156 pub shared_query_plan_cache_hits: u64,
157 pub shared_query_plan_cache_misses: u64,
158}
159
160#[cfg(feature = "diagnostics")]
164#[derive(Clone, Copy, Debug, Eq, PartialEq)]
165pub(in crate::db) struct SqlExecutePhaseAttribution {
166 pub planner_local_instructions: u64,
167 pub store_local_instructions: u64,
168 pub executor_invocation_local_instructions: u64,
169 pub executor_local_instructions: u64,
170 pub response_finalization_local_instructions: u64,
171 pub grouped_stream_local_instructions: u64,
172 pub grouped_fold_local_instructions: u64,
173 pub grouped_finalize_local_instructions: u64,
174 pub grouped_count: GroupedCountAttribution,
175 pub scalar_aggregate_terminal: ScalarAggregateTerminalAttribution,
176}
177
178#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
190pub(in crate::db) struct SqlCompilePhaseAttribution {
191 pub cache_key: u64,
192 pub cache_lookup: u64,
193 pub parse: u64,
194 pub parse_tokenize: u64,
195 pub parse_select: u64,
196 pub parse_expr: u64,
197 pub parse_predicate: u64,
198 pub aggregate_lane_check: u64,
199 pub prepare: u64,
200 pub lower: u64,
201 pub bind: u64,
202 pub cache_insert: u64,
203}
204
205impl SqlCompilePhaseAttribution {
206 #[must_use]
207 const fn cache_hit(cache_key: u64, cache_lookup: u64) -> Self {
208 Self {
209 cache_key,
210 cache_lookup,
211 parse: 0,
212 parse_tokenize: 0,
213 parse_select: 0,
214 parse_expr: 0,
215 parse_predicate: 0,
216 aggregate_lane_check: 0,
217 prepare: 0,
218 lower: 0,
219 bind: 0,
220 cache_insert: 0,
221 }
222 }
223}
224
225#[cfg(feature = "diagnostics")]
226impl SqlExecutePhaseAttribution {
227 #[must_use]
228 pub(in crate::db) const fn from_execute_total_and_store_total(
229 execute_local_instructions: u64,
230 store_local_instructions: u64,
231 ) -> Self {
232 Self {
233 planner_local_instructions: 0,
234 store_local_instructions,
235 executor_invocation_local_instructions: execute_local_instructions,
236 executor_local_instructions: execute_local_instructions
237 .saturating_sub(store_local_instructions),
238 response_finalization_local_instructions: 0,
239 grouped_stream_local_instructions: 0,
240 grouped_fold_local_instructions: 0,
241 grouped_finalize_local_instructions: 0,
242 grouped_count: GroupedCountAttribution::none(),
243 scalar_aggregate_terminal: ScalarAggregateTerminalAttribution::none(),
244 }
245 }
246}
247
248#[cfg(test)]
251pub(in crate::db) fn parse_sql_statement(sql: &str) -> Result<SqlStatement, QueryError> {
252 parse_sql(sql).map_err(QueryError::from_sql_parse_error)
253}
254
255impl<C: CanisterKind> DbSession<C> {
256 #[expect(clippy::too_many_lines)]
259 fn compile_sql_statement_for_authority(
260 statement: &SqlStatement,
261 authority: EntityAuthority,
262 compiled_cache_key: SqlCompiledCommandCacheKey,
263 ) -> Result<(CompiledSqlCommand, u64, u64, u64, u64), QueryError> {
264 let prepare_statement = || {
268 measure_sql_stage(|| {
269 prepare_sql_statement(statement.clone(), authority.model().name())
270 .map_err(QueryError::from_sql_lowering_error)
271 })
272 };
273
274 match statement {
275 SqlStatement::Select(_) => {
276 let (prepare_local_instructions, prepared) = prepare_statement();
277 let prepared = prepared?;
278 let (aggregate_lane_check_local_instructions, requires_aggregate_lane) =
279 measure_sql_stage(|| {
280 Ok::<_, QueryError>(prepared.statement().is_global_aggregate_lane_shape())
281 });
282 let requires_aggregate_lane = requires_aggregate_lane?;
283
284 if requires_aggregate_lane {
285 let (lower_local_instructions, command) = measure_sql_stage(|| {
286 compile_sql_global_aggregate_command_core_from_prepared(
287 prepared,
288 authority.model(),
289 MissingRowPolicy::Ignore,
290 )
291 .map_err(QueryError::from_sql_lowering_error)
292 });
293 let command = command?;
294
295 Ok((
296 CompiledSqlCommand::GlobalAggregate {
297 command: Box::new(command),
298 },
299 aggregate_lane_check_local_instructions,
300 prepare_local_instructions,
301 lower_local_instructions,
302 0,
303 ))
304 } else {
305 let (lower_local_instructions, select) = measure_sql_stage(|| {
306 lower_prepared_sql_select_statement(prepared, authority.model())
307 .map_err(QueryError::from_sql_lowering_error)
308 });
309 let select = select?;
310 let (bind_local_instructions, query) = measure_sql_stage(|| {
311 bind_lowered_sql_select_query_structural(
312 authority.model(),
313 select,
314 MissingRowPolicy::Ignore,
315 )
316 .map_err(QueryError::from_sql_lowering_error)
317 });
318 let query = query?;
319
320 Ok((
321 CompiledSqlCommand::Select {
322 query: Arc::new(query),
323 compiled_cache_key,
324 },
325 aggregate_lane_check_local_instructions,
326 prepare_local_instructions,
327 lower_local_instructions,
328 bind_local_instructions,
329 ))
330 }
331 }
332 SqlStatement::Delete(_) => {
333 let (prepare_local_instructions, prepared) = prepare_statement();
334 let prepared = prepared?;
335 let (lower_local_instructions, lowered) = measure_sql_stage(|| {
336 lower_prepared_sql_delete_statement(prepared)
337 .map_err(QueryError::from_sql_lowering_error)
338 });
339 let delete = lowered?;
340 let returning = delete.returning().cloned();
341 let query = delete.into_base_query();
342 let (bind_local_instructions, query) = measure_sql_stage(|| {
343 Ok::<_, QueryError>(bind_lowered_sql_delete_query_structural(
344 authority.model(),
345 query,
346 MissingRowPolicy::Ignore,
347 ))
348 });
349 let query = query?;
350
351 Ok((
352 CompiledSqlCommand::Delete {
353 query: Arc::new(query),
354 returning,
355 },
356 0,
357 prepare_local_instructions,
358 lower_local_instructions,
359 bind_local_instructions,
360 ))
361 }
362 SqlStatement::Insert(_) => {
363 let (prepare_local_instructions, prepared) = prepare_statement();
364 let prepared = prepared?;
365 let statement = extract_prepared_sql_insert_statement(prepared)
366 .map_err(QueryError::from_sql_lowering_error)?;
367
368 Ok((
369 CompiledSqlCommand::Insert(statement),
370 0,
371 prepare_local_instructions,
372 0,
373 0,
374 ))
375 }
376 SqlStatement::Update(_) => {
377 let (prepare_local_instructions, prepared) = prepare_statement();
378 let prepared = prepared?;
379 let statement = extract_prepared_sql_update_statement(prepared)
380 .map_err(QueryError::from_sql_lowering_error)?;
381
382 Ok((
383 CompiledSqlCommand::Update(statement),
384 0,
385 prepare_local_instructions,
386 0,
387 0,
388 ))
389 }
390 SqlStatement::Explain(_) => {
391 let (prepare_local_instructions, prepared) = prepare_statement();
392 let prepared = prepared?;
393 let (lower_local_instructions, lowered) = measure_sql_stage(|| {
394 lower_sql_command_from_prepared_statement(prepared, authority.model())
395 .map_err(QueryError::from_sql_lowering_error)
396 });
397 let lowered = lowered?;
398
399 Ok((
400 CompiledSqlCommand::Explain(Box::new(lowered)),
401 0,
402 prepare_local_instructions,
403 lower_local_instructions,
404 0,
405 ))
406 }
407 SqlStatement::Describe(_) => {
408 let (prepare_local_instructions, prepared) = prepare_statement();
409 let _prepared = prepared?;
410
411 Ok((
412 CompiledSqlCommand::DescribeEntity,
413 0,
414 prepare_local_instructions,
415 0,
416 0,
417 ))
418 }
419 SqlStatement::ShowIndexes(_) => {
420 let (prepare_local_instructions, prepared) = prepare_statement();
421 let _prepared = prepared?;
422
423 Ok((
424 CompiledSqlCommand::ShowIndexesEntity,
425 0,
426 prepare_local_instructions,
427 0,
428 0,
429 ))
430 }
431 SqlStatement::ShowColumns(_) => {
432 let (prepare_local_instructions, prepared) = prepare_statement();
433 let _prepared = prepared?;
434
435 Ok((
436 CompiledSqlCommand::ShowColumnsEntity,
437 0,
438 prepare_local_instructions,
439 0,
440 0,
441 ))
442 }
443 SqlStatement::ShowEntities(_) => Ok((CompiledSqlCommand::ShowEntities, 0, 0, 0, 0)),
444 }
445 }
446
447 fn sql_select_prepared_plan(
450 &self,
451 query: &StructuralQuery,
452 authority: EntityAuthority,
453 cache_schema_fingerprint: CommitSchemaFingerprint,
454 ) -> Result<
455 (
456 SharedPreparedExecutionPlan,
457 SqlProjectionContract,
458 SqlCacheAttribution,
459 ),
460 QueryError,
461 > {
462 let (prepared_plan, cache_attribution) = self.cached_shared_query_plan_for_authority(
463 authority,
464 cache_schema_fingerprint,
465 query,
466 )?;
467 let projection_spec = prepared_plan
468 .logical_plan()
469 .projection_spec(authority.model());
470 let projection = SqlProjectionContract::new(
471 projection_labels_from_projection_spec(&projection_spec),
472 projection_fixed_scales_from_projection_spec(&projection_spec),
473 );
474
475 Ok((
476 prepared_plan,
477 projection,
478 SqlCacheAttribution::from_shared_query_plan_cache(cache_attribution),
479 ))
480 }
481
482 fn ensure_sql_statement_supported_for_surface(
486 statement: &SqlStatement,
487 surface: SqlCompiledCommandSurface,
488 ) -> Result<(), QueryError> {
489 match (surface, statement) {
490 (
491 SqlCompiledCommandSurface::Query,
492 SqlStatement::Select(_)
493 | SqlStatement::Explain(_)
494 | SqlStatement::Describe(_)
495 | SqlStatement::ShowIndexes(_)
496 | SqlStatement::ShowColumns(_)
497 | SqlStatement::ShowEntities(_),
498 )
499 | (
500 SqlCompiledCommandSurface::Update,
501 SqlStatement::Insert(_) | SqlStatement::Update(_) | SqlStatement::Delete(_),
502 ) => Ok(()),
503 (SqlCompiledCommandSurface::Query, SqlStatement::Insert(_)) => {
504 Err(QueryError::unsupported_query(
505 "execute_sql_query rejects INSERT; use execute_sql_update::<E>()",
506 ))
507 }
508 (SqlCompiledCommandSurface::Query, SqlStatement::Update(_)) => {
509 Err(QueryError::unsupported_query(
510 "execute_sql_query rejects UPDATE; use execute_sql_update::<E>()",
511 ))
512 }
513 (SqlCompiledCommandSurface::Query, SqlStatement::Delete(_)) => {
514 Err(QueryError::unsupported_query(
515 "execute_sql_query rejects DELETE; use execute_sql_update::<E>()",
516 ))
517 }
518 (SqlCompiledCommandSurface::Update, SqlStatement::Select(_)) => {
519 Err(QueryError::unsupported_query(
520 "execute_sql_update rejects SELECT; use execute_sql_query::<E>()",
521 ))
522 }
523 (SqlCompiledCommandSurface::Update, SqlStatement::Explain(_)) => {
524 Err(QueryError::unsupported_query(
525 "execute_sql_update rejects EXPLAIN; use execute_sql_query::<E>()",
526 ))
527 }
528 (SqlCompiledCommandSurface::Update, SqlStatement::Describe(_)) => {
529 Err(QueryError::unsupported_query(
530 "execute_sql_update rejects DESCRIBE; use execute_sql_query::<E>()",
531 ))
532 }
533 (SqlCompiledCommandSurface::Update, SqlStatement::ShowIndexes(_)) => {
534 Err(QueryError::unsupported_query(
535 "execute_sql_update rejects SHOW INDEXES; use execute_sql_query::<E>()",
536 ))
537 }
538 (SqlCompiledCommandSurface::Update, SqlStatement::ShowColumns(_)) => {
539 Err(QueryError::unsupported_query(
540 "execute_sql_update rejects SHOW COLUMNS; use execute_sql_query::<E>()",
541 ))
542 }
543 (SqlCompiledCommandSurface::Update, SqlStatement::ShowEntities(_)) => {
544 Err(QueryError::unsupported_query(
545 "execute_sql_update rejects SHOW ENTITIES; use execute_sql_query::<E>()",
546 ))
547 }
548 }
549 }
550
551 pub fn execute_sql_query<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
556 where
557 E: PersistedRow<Canister = C> + EntityValue,
558 {
559 let compiled = self.compile_sql_query::<E>(sql)?;
560
561 self.execute_compiled_sql_owned::<E>(compiled)
562 }
563
564 #[cfg(feature = "diagnostics")]
567 #[doc(hidden)]
568 #[expect(
569 clippy::needless_update,
570 reason = "diagnostics attribution literals stay default-backed so future counters do not break every initializer"
571 )]
572 #[expect(
573 clippy::too_many_lines,
574 reason = "diagnostics attribution explicitly enumerates every counter field at the public diagnostics boundary"
575 )]
576 pub fn execute_sql_query_with_attribution<E>(
577 &self,
578 sql: &str,
579 ) -> Result<(SqlStatementResult, SqlQueryExecutionAttribution), QueryError>
580 where
581 E: PersistedRow<Canister = C> + EntityValue,
582 {
583 let (compile_local_instructions, compiled) =
586 measure_sql_stage(|| self.compile_sql_query_with_cache_attribution::<E>(sql));
587 let (compiled, compile_cache_attribution, compile_phase_attribution) = compiled?;
588
589 let store_get_calls_before = DataStore::current_get_call_count();
592 let pure_covering_decode_before = current_pure_covering_decode_local_instructions();
593 let pure_covering_row_assembly_before =
594 current_pure_covering_row_assembly_local_instructions();
595 let (result, execute_cache_attribution, execute_phase_attribution) =
596 self.execute_compiled_sql_with_phase_attribution::<E>(&compiled)?;
597 let store_get_calls =
598 DataStore::current_get_call_count().saturating_sub(store_get_calls_before);
599 let pure_covering_decode_local_instructions =
600 current_pure_covering_decode_local_instructions()
601 .saturating_sub(pure_covering_decode_before);
602 let pure_covering_row_assembly_local_instructions =
603 current_pure_covering_row_assembly_local_instructions()
604 .saturating_sub(pure_covering_row_assembly_before);
605 let execute_local_instructions = execute_phase_attribution
606 .planner_local_instructions
607 .saturating_add(execute_phase_attribution.store_local_instructions)
608 .saturating_add(execute_phase_attribution.executor_local_instructions)
609 .saturating_add(execute_phase_attribution.response_finalization_local_instructions);
610 let cache_attribution = compile_cache_attribution.merge(execute_cache_attribution);
611 let total_local_instructions =
612 compile_local_instructions.saturating_add(execute_local_instructions);
613
614 Ok((
615 result,
616 SqlQueryExecutionAttribution {
617 compile_local_instructions,
618 compile_cache_key_local_instructions: compile_phase_attribution.cache_key,
619 compile_cache_lookup_local_instructions: compile_phase_attribution.cache_lookup,
620 compile_parse_local_instructions: compile_phase_attribution.parse,
621 compile_parse_tokenize_local_instructions: compile_phase_attribution.parse_tokenize,
622 compile_parse_select_local_instructions: compile_phase_attribution.parse_select,
623 compile_parse_expr_local_instructions: compile_phase_attribution.parse_expr,
624 compile_parse_predicate_local_instructions: compile_phase_attribution
625 .parse_predicate,
626 compile_aggregate_lane_check_local_instructions: compile_phase_attribution
627 .aggregate_lane_check,
628 compile_prepare_local_instructions: compile_phase_attribution.prepare,
629 compile_lower_local_instructions: compile_phase_attribution.lower,
630 compile_bind_local_instructions: compile_phase_attribution.bind,
631 compile_cache_insert_local_instructions: compile_phase_attribution.cache_insert,
632 plan_lookup_local_instructions: execute_phase_attribution
633 .planner_local_instructions,
634 planner_local_instructions: execute_phase_attribution.planner_local_instructions,
635 store_local_instructions: execute_phase_attribution.store_local_instructions,
636 executor_invocation_local_instructions: execute_phase_attribution
637 .executor_invocation_local_instructions,
638 executor_local_instructions: execute_phase_attribution.executor_local_instructions,
639 response_finalization_local_instructions: execute_phase_attribution
640 .response_finalization_local_instructions,
641 grouped_stream_local_instructions: execute_phase_attribution
642 .grouped_stream_local_instructions,
643 grouped_fold_local_instructions: execute_phase_attribution
644 .grouped_fold_local_instructions,
645 grouped_finalize_local_instructions: execute_phase_attribution
646 .grouped_finalize_local_instructions,
647 grouped_count_borrowed_hash_computations: execute_phase_attribution
648 .grouped_count
649 .borrowed_hash_computations,
650 grouped_count_bucket_candidate_checks: execute_phase_attribution
651 .grouped_count
652 .bucket_candidate_checks,
653 grouped_count_existing_group_hits: execute_phase_attribution
654 .grouped_count
655 .existing_group_hits,
656 grouped_count_new_group_inserts: execute_phase_attribution
657 .grouped_count
658 .new_group_inserts,
659 grouped_count_row_materialization_local_instructions: execute_phase_attribution
660 .grouped_count
661 .row_materialization_local_instructions,
662 grouped_count_group_lookup_local_instructions: execute_phase_attribution
663 .grouped_count
664 .group_lookup_local_instructions,
665 grouped_count_existing_group_update_local_instructions: execute_phase_attribution
666 .grouped_count
667 .existing_group_update_local_instructions,
668 grouped_count_new_group_insert_local_instructions: execute_phase_attribution
669 .grouped_count
670 .new_group_insert_local_instructions,
671 scalar_aggregate_base_row_local_instructions: execute_phase_attribution
672 .scalar_aggregate_terminal
673 .base_row_local_instructions,
674 scalar_aggregate_reducer_fold_local_instructions: execute_phase_attribution
675 .scalar_aggregate_terminal
676 .reducer_fold_local_instructions,
677 scalar_aggregate_expression_evaluations: execute_phase_attribution
678 .scalar_aggregate_terminal
679 .expression_evaluations,
680 scalar_aggregate_filter_evaluations: execute_phase_attribution
681 .scalar_aggregate_terminal
682 .filter_evaluations,
683 scalar_aggregate_rows_ingested: execute_phase_attribution
684 .scalar_aggregate_terminal
685 .rows_ingested,
686 scalar_aggregate_terminal_count: execute_phase_attribution
687 .scalar_aggregate_terminal
688 .terminal_count,
689 scalar_aggregate_unique_input_expr_count: execute_phase_attribution
690 .scalar_aggregate_terminal
691 .unique_input_expr_count,
692 scalar_aggregate_unique_filter_expr_count: execute_phase_attribution
693 .scalar_aggregate_terminal
694 .unique_filter_expr_count,
695 scalar_aggregate_sink_mode: execute_phase_attribution
696 .scalar_aggregate_terminal
697 .sink_mode
698 .label()
699 .map(str::to_string),
700 pure_covering_decode_local_instructions,
701 pure_covering_row_assembly_local_instructions,
702 store_get_calls,
703 response_decode_local_instructions: 0,
704 execute_local_instructions,
705 total_local_instructions,
706 sql_compiled_command_cache_hits: cache_attribution.sql_compiled_command_cache_hits,
707 sql_compiled_command_cache_misses: cache_attribution
708 .sql_compiled_command_cache_misses,
709 shared_query_plan_cache_hits: cache_attribution.shared_query_plan_cache_hits,
710 shared_query_plan_cache_misses: cache_attribution.shared_query_plan_cache_misses,
711 ..SqlQueryExecutionAttribution::default()
712 },
713 ))
714 }
715
716 pub fn execute_sql_update<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
721 where
722 E: PersistedRow<Canister = C> + EntityValue,
723 {
724 let compiled = self.compile_sql_update::<E>(sql)?;
725
726 self.execute_compiled_sql_owned::<E>(compiled)
727 }
728
729 pub(in crate::db) fn compile_sql_query<E>(
732 &self,
733 sql: &str,
734 ) -> Result<CompiledSqlCommand, QueryError>
735 where
736 E: PersistedRow<Canister = C> + EntityValue,
737 {
738 self.compile_sql_query_with_cache_attribution::<E>(sql)
739 .map(|(compiled, _, _)| compiled)
740 }
741
742 fn compile_sql_query_with_cache_attribution<E>(
743 &self,
744 sql: &str,
745 ) -> Result<
746 (
747 CompiledSqlCommand,
748 SqlCacheAttribution,
749 SqlCompilePhaseAttribution,
750 ),
751 QueryError,
752 >
753 where
754 E: PersistedRow<Canister = C> + EntityValue,
755 {
756 self.compile_sql_surface_with_cache_attribution::<E>(sql, SqlCompiledCommandSurface::Query)
757 }
758
759 pub(in crate::db) fn compile_sql_update<E>(
762 &self,
763 sql: &str,
764 ) -> Result<CompiledSqlCommand, QueryError>
765 where
766 E: PersistedRow<Canister = C> + EntityValue,
767 {
768 self.compile_sql_update_with_cache_attribution::<E>(sql)
769 .map(|(compiled, _, _)| compiled)
770 }
771
772 fn compile_sql_update_with_cache_attribution<E>(
773 &self,
774 sql: &str,
775 ) -> Result<
776 (
777 CompiledSqlCommand,
778 SqlCacheAttribution,
779 SqlCompilePhaseAttribution,
780 ),
781 QueryError,
782 >
783 where
784 E: PersistedRow<Canister = C> + EntityValue,
785 {
786 self.compile_sql_surface_with_cache_attribution::<E>(sql, SqlCompiledCommandSurface::Update)
787 }
788
789 fn compile_sql_surface_with_cache_attribution<E>(
793 &self,
794 sql: &str,
795 surface: SqlCompiledCommandSurface,
796 ) -> Result<
797 (
798 CompiledSqlCommand,
799 SqlCacheAttribution,
800 SqlCompilePhaseAttribution,
801 ),
802 QueryError,
803 >
804 where
805 E: PersistedRow<Canister = C> + EntityValue,
806 {
807 let (cache_key_local_instructions, cache_key) = measure_sql_stage(|| {
808 Ok::<_, QueryError>(SqlCompiledCommandCacheKey::for_entity::<E>(surface, sql))
809 });
810 let cache_key = cache_key?;
811
812 self.compile_sql_statement_with_cache::<E, _>(
813 cache_key,
814 cache_key_local_instructions,
815 sql,
816 |statement| Self::ensure_sql_statement_supported_for_surface(statement, surface),
817 )
818 }
819
820 fn compile_sql_statement_with_cache<E, F>(
823 &self,
824 cache_key: SqlCompiledCommandCacheKey,
825 cache_key_local_instructions: u64,
826 sql: &str,
827 ensure_surface_supported: F,
828 ) -> Result<
829 (
830 CompiledSqlCommand,
831 SqlCacheAttribution,
832 SqlCompilePhaseAttribution,
833 ),
834 QueryError,
835 >
836 where
837 E: PersistedRow<Canister = C> + EntityValue,
838 F: FnOnce(&SqlStatement) -> Result<(), QueryError>,
839 {
840 let (cache_lookup_local_instructions, cached) = measure_sql_stage(|| {
841 let cached =
842 self.with_sql_compiled_command_cache(|cache| cache.get(&cache_key).cloned());
843 Ok::<_, QueryError>(cached)
844 });
845 let cached = cached?;
846 if let Some(compiled) = cached {
847 return Ok((
848 compiled,
849 SqlCacheAttribution::sql_compiled_command_cache_hit(),
850 SqlCompilePhaseAttribution::cache_hit(
851 cache_key_local_instructions,
852 cache_lookup_local_instructions,
853 ),
854 ));
855 }
856
857 let (parse_local_instructions, parsed) = measure_sql_stage(|| {
858 parse_sql_with_attribution(sql).map_err(QueryError::from_sql_parse_error)
859 });
860 let (parsed, parse_attribution) = parsed?;
861 let parse_select_local_instructions = parse_local_instructions
862 .saturating_sub(parse_attribution.tokenize)
863 .saturating_sub(parse_attribution.expr)
864 .saturating_sub(parse_attribution.predicate);
865 ensure_surface_supported(&parsed)?;
866 let authority = EntityAuthority::for_type::<E>();
867 let (
868 compiled,
869 aggregate_lane_check_local_instructions,
870 prepare_local_instructions,
871 lower_local_instructions,
872 bind_local_instructions,
873 ) = Self::compile_sql_statement_for_authority(&parsed, authority, cache_key.clone())?;
874
875 let (cache_insert_local_instructions, cache_insert) = measure_sql_stage(|| {
876 self.with_sql_compiled_command_cache(|cache| {
877 cache.insert(cache_key, compiled.clone());
878 });
879 Ok::<_, QueryError>(())
880 });
881 cache_insert?;
882
883 Ok((
884 compiled,
885 SqlCacheAttribution::sql_compiled_command_cache_miss(),
886 SqlCompilePhaseAttribution {
887 cache_key: cache_key_local_instructions,
888 cache_lookup: cache_lookup_local_instructions,
889 parse: parse_local_instructions,
890 parse_tokenize: parse_attribution.tokenize,
891 parse_select: parse_select_local_instructions,
892 parse_expr: parse_attribution.expr,
893 parse_predicate: parse_attribution.predicate,
894 aggregate_lane_check: aggregate_lane_check_local_instructions,
895 prepare: prepare_local_instructions,
896 lower: lower_local_instructions,
897 bind: bind_local_instructions,
898 cache_insert: cache_insert_local_instructions,
899 },
900 ))
901 }
902}