1mod cache;
8mod compiled;
9mod execute;
10mod projection;
11
12#[cfg(feature = "diagnostics")]
13use candid::CandidType;
14#[cfg(feature = "diagnostics")]
15use serde::Deserialize;
16use std::sync::Arc;
17
18#[cfg(feature = "diagnostics")]
19use crate::db::DataStore;
20#[cfg(feature = "diagnostics")]
21use crate::db::executor::GroupedCountAttribution;
22#[cfg(feature = "diagnostics")]
23use crate::db::session::sql::projection::{
24 current_pure_covering_decode_local_instructions,
25 current_pure_covering_row_assembly_local_instructions,
26};
27#[cfg(test)]
28use crate::db::sql::parser::parse_sql;
29use crate::{
30 db::{
31 DbSession, GroupedRow, MissingRowPolicy, PersistedRow, QueryError,
32 commit::CommitSchemaFingerprint,
33 executor::{EntityAuthority, SharedPreparedExecutionPlan},
34 query::intent::StructuralQuery,
35 session::sql::projection::{
36 projection_fixed_scales_from_projection_spec, projection_labels_from_projection_spec,
37 },
38 sql::lowering::{
39 bind_lowered_sql_delete_query_structural, bind_lowered_sql_select_query_structural,
40 compile_sql_global_aggregate_command_core_from_prepared,
41 extract_prepared_sql_insert_statement, extract_prepared_sql_update_statement,
42 lower_prepared_sql_delete_statement, lower_prepared_sql_select_statement,
43 lower_sql_command_from_prepared_statement, prepare_sql_statement,
44 },
45 sql::parser::{SqlStatement, parse_sql_with_attribution},
46 },
47 traits::{CanisterKind, EntityValue},
48 value::OutputValue,
49};
50
51pub(in crate::db::session::sql) use cache::SqlCompiledCommandSurface;
52pub(in crate::db) use cache::{SqlCacheAttribution, SqlCompiledCommandCacheKey};
53pub(in crate::db) use compiled::{CompiledSqlCommand, SqlProjectionContract};
54
55#[cfg(all(test, not(feature = "diagnostics")))]
56pub(crate) use crate::db::session::sql::projection::with_sql_projection_materialization_metrics;
57#[cfg(feature = "diagnostics")]
58pub use crate::db::session::sql::projection::{
59 SqlProjectionMaterializationMetrics, with_sql_projection_materialization_metrics,
60};
61
62#[derive(Debug)]
64pub enum SqlStatementResult {
65 Count {
66 row_count: u32,
67 },
68 Projection {
69 columns: Vec<String>,
70 fixed_scales: Vec<Option<u32>>,
71 rows: Vec<Vec<OutputValue>>,
72 row_count: u32,
73 },
74 ProjectionText {
75 columns: Vec<String>,
76 rows: Vec<Vec<String>>,
77 row_count: u32,
78 },
79 Grouped {
80 columns: Vec<String>,
81 fixed_scales: Vec<Option<u32>>,
82 rows: Vec<GroupedRow>,
83 row_count: u32,
84 next_cursor: Option<String>,
85 },
86 Explain(String),
87 Describe(crate::db::EntitySchemaDescription),
88 ShowIndexes(Vec<String>),
89 ShowColumns(Vec<crate::db::EntityFieldDescription>),
90 ShowEntities(Vec<String>),
91}
92
93#[cfg(feature = "diagnostics")]
107#[derive(CandidType, Clone, Debug, Default, Deserialize, Eq, PartialEq)]
108pub struct SqlQueryExecutionAttribution {
109 pub compile_local_instructions: u64,
110 pub compile_cache_key_local_instructions: u64,
111 pub compile_cache_lookup_local_instructions: u64,
112 pub compile_parse_local_instructions: u64,
113 pub compile_parse_tokenize_local_instructions: u64,
114 pub compile_parse_select_local_instructions: u64,
115 pub compile_parse_expr_local_instructions: u64,
116 pub compile_parse_predicate_local_instructions: u64,
117 pub compile_aggregate_lane_check_local_instructions: u64,
118 pub compile_prepare_local_instructions: u64,
119 pub compile_lower_local_instructions: u64,
120 pub compile_bind_local_instructions: u64,
121 pub compile_cache_insert_local_instructions: u64,
122 pub plan_lookup_local_instructions: u64,
123 pub planner_local_instructions: u64,
124 pub store_local_instructions: u64,
125 pub executor_invocation_local_instructions: u64,
126 pub executor_local_instructions: u64,
127 pub response_finalization_local_instructions: u64,
128 pub grouped_stream_local_instructions: u64,
129 pub grouped_fold_local_instructions: u64,
130 pub grouped_finalize_local_instructions: u64,
131 pub grouped_count_borrowed_hash_computations: u64,
132 pub grouped_count_bucket_candidate_checks: u64,
133 pub grouped_count_existing_group_hits: u64,
134 pub grouped_count_new_group_inserts: u64,
135 pub grouped_count_row_materialization_local_instructions: u64,
136 pub grouped_count_group_lookup_local_instructions: u64,
137 pub grouped_count_existing_group_update_local_instructions: u64,
138 pub grouped_count_new_group_insert_local_instructions: u64,
139 pub pure_covering_decode_local_instructions: u64,
140 pub pure_covering_row_assembly_local_instructions: u64,
141 pub store_get_calls: u64,
142 pub response_decode_local_instructions: u64,
143 pub execute_local_instructions: u64,
144 pub total_local_instructions: u64,
145 pub sql_compiled_command_cache_hits: u64,
146 pub sql_compiled_command_cache_misses: u64,
147 pub shared_query_plan_cache_hits: u64,
148 pub shared_query_plan_cache_misses: u64,
149}
150
151#[cfg(feature = "diagnostics")]
155#[derive(Clone, Copy, Debug, Eq, PartialEq)]
156pub(in crate::db) struct SqlExecutePhaseAttribution {
157 pub planner_local_instructions: u64,
158 pub store_local_instructions: u64,
159 pub executor_invocation_local_instructions: u64,
160 pub executor_local_instructions: u64,
161 pub response_finalization_local_instructions: u64,
162 pub grouped_stream_local_instructions: u64,
163 pub grouped_fold_local_instructions: u64,
164 pub grouped_finalize_local_instructions: u64,
165 pub grouped_count: GroupedCountAttribution,
166}
167
168#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
180pub(in crate::db) struct SqlCompilePhaseAttribution {
181 pub cache_key: u64,
182 pub cache_lookup: u64,
183 pub parse: u64,
184 pub parse_tokenize: u64,
185 pub parse_select: u64,
186 pub parse_expr: u64,
187 pub parse_predicate: u64,
188 pub aggregate_lane_check: u64,
189 pub prepare: u64,
190 pub lower: u64,
191 pub bind: u64,
192 pub cache_insert: u64,
193}
194
195impl SqlCompilePhaseAttribution {
196 #[must_use]
197 const fn cache_hit(cache_key: u64, cache_lookup: u64) -> Self {
198 Self {
199 cache_key,
200 cache_lookup,
201 parse: 0,
202 parse_tokenize: 0,
203 parse_select: 0,
204 parse_expr: 0,
205 parse_predicate: 0,
206 aggregate_lane_check: 0,
207 prepare: 0,
208 lower: 0,
209 bind: 0,
210 cache_insert: 0,
211 }
212 }
213}
214
215#[cfg(feature = "diagnostics")]
216impl SqlExecutePhaseAttribution {
217 #[must_use]
218 pub(in crate::db) const fn from_execute_total_and_store_total(
219 execute_local_instructions: u64,
220 store_local_instructions: u64,
221 ) -> Self {
222 Self {
223 planner_local_instructions: 0,
224 store_local_instructions,
225 executor_invocation_local_instructions: execute_local_instructions,
226 executor_local_instructions: execute_local_instructions
227 .saturating_sub(store_local_instructions),
228 response_finalization_local_instructions: 0,
229 grouped_stream_local_instructions: 0,
230 grouped_fold_local_instructions: 0,
231 grouped_finalize_local_instructions: 0,
232 grouped_count: GroupedCountAttribution::none(),
233 }
234 }
235}
236
237#[cfg(test)]
240pub(in crate::db) fn parse_sql_statement(sql: &str) -> Result<SqlStatement, QueryError> {
241 parse_sql(sql).map_err(QueryError::from_sql_parse_error)
242}
243
244#[cfg(feature = "diagnostics")]
245#[expect(
246 clippy::missing_const_for_fn,
247 reason = "the wasm32 branch reads the runtime performance counter and cannot be const"
248)]
249fn read_sql_local_instruction_counter() -> u64 {
250 #[cfg(all(feature = "diagnostics", target_arch = "wasm32"))]
251 {
252 canic_cdk::api::performance_counter(1)
253 }
254
255 #[cfg(not(all(feature = "diagnostics", target_arch = "wasm32")))]
256 {
257 0
258 }
259}
260
261pub(in crate::db::session::sql) fn measure_sql_stage<T, E>(
262 run: impl FnOnce() -> Result<T, E>,
263) -> (u64, Result<T, E>) {
264 #[cfg(feature = "diagnostics")]
265 let start = read_sql_local_instruction_counter();
266
267 let result = run();
268
269 #[cfg(feature = "diagnostics")]
270 let delta = read_sql_local_instruction_counter().saturating_sub(start);
271
272 #[cfg(not(feature = "diagnostics"))]
273 let delta = 0;
274
275 (delta, result)
276}
277
278impl<C: CanisterKind> DbSession<C> {
279 #[expect(clippy::too_many_lines)]
282 fn compile_sql_statement_for_authority(
283 statement: &SqlStatement,
284 authority: EntityAuthority,
285 compiled_cache_key: SqlCompiledCommandCacheKey,
286 ) -> Result<(CompiledSqlCommand, u64, u64, u64, u64), QueryError> {
287 let prepare_statement = || {
291 measure_sql_stage(|| {
292 prepare_sql_statement(statement.clone(), authority.model().name())
293 .map_err(QueryError::from_sql_lowering_error)
294 })
295 };
296
297 match statement {
298 SqlStatement::Select(_) => {
299 let (prepare_local_instructions, prepared) = prepare_statement();
300 let prepared = prepared?;
301 let (aggregate_lane_check_local_instructions, requires_aggregate_lane) =
302 measure_sql_stage(|| {
303 Ok::<_, QueryError>(prepared.statement().is_global_aggregate_lane_shape())
304 });
305 let requires_aggregate_lane = requires_aggregate_lane?;
306
307 if requires_aggregate_lane {
308 let (lower_local_instructions, command) = measure_sql_stage(|| {
309 compile_sql_global_aggregate_command_core_from_prepared(
310 prepared,
311 authority.model(),
312 MissingRowPolicy::Ignore,
313 )
314 .map_err(QueryError::from_sql_lowering_error)
315 });
316 let command = command?;
317
318 Ok((
319 CompiledSqlCommand::GlobalAggregate {
320 command: Box::new(command),
321 },
322 aggregate_lane_check_local_instructions,
323 prepare_local_instructions,
324 lower_local_instructions,
325 0,
326 ))
327 } else {
328 let (lower_local_instructions, select) = measure_sql_stage(|| {
329 lower_prepared_sql_select_statement(prepared, authority.model())
330 .map_err(QueryError::from_sql_lowering_error)
331 });
332 let select = select?;
333 let (bind_local_instructions, query) = measure_sql_stage(|| {
334 bind_lowered_sql_select_query_structural(
335 authority.model(),
336 select,
337 MissingRowPolicy::Ignore,
338 )
339 .map_err(QueryError::from_sql_lowering_error)
340 });
341 let query = query?;
342
343 Ok((
344 CompiledSqlCommand::Select {
345 query: Arc::new(query),
346 compiled_cache_key,
347 },
348 aggregate_lane_check_local_instructions,
349 prepare_local_instructions,
350 lower_local_instructions,
351 bind_local_instructions,
352 ))
353 }
354 }
355 SqlStatement::Delete(_) => {
356 let (prepare_local_instructions, prepared) = prepare_statement();
357 let prepared = prepared?;
358 let (lower_local_instructions, lowered) = measure_sql_stage(|| {
359 lower_prepared_sql_delete_statement(prepared)
360 .map_err(QueryError::from_sql_lowering_error)
361 });
362 let delete = lowered?;
363 let returning = delete.returning().cloned();
364 let query = delete.into_base_query();
365 let (bind_local_instructions, query) = measure_sql_stage(|| {
366 Ok::<_, QueryError>(bind_lowered_sql_delete_query_structural(
367 authority.model(),
368 query,
369 MissingRowPolicy::Ignore,
370 ))
371 });
372 let query = query?;
373
374 Ok((
375 CompiledSqlCommand::Delete {
376 query: Arc::new(query),
377 returning,
378 },
379 0,
380 prepare_local_instructions,
381 lower_local_instructions,
382 bind_local_instructions,
383 ))
384 }
385 SqlStatement::Insert(_) => {
386 let (prepare_local_instructions, prepared) = prepare_statement();
387 let prepared = prepared?;
388 let statement = extract_prepared_sql_insert_statement(prepared)
389 .map_err(QueryError::from_sql_lowering_error)?;
390
391 Ok((
392 CompiledSqlCommand::Insert(statement),
393 0,
394 prepare_local_instructions,
395 0,
396 0,
397 ))
398 }
399 SqlStatement::Update(_) => {
400 let (prepare_local_instructions, prepared) = prepare_statement();
401 let prepared = prepared?;
402 let statement = extract_prepared_sql_update_statement(prepared)
403 .map_err(QueryError::from_sql_lowering_error)?;
404
405 Ok((
406 CompiledSqlCommand::Update(statement),
407 0,
408 prepare_local_instructions,
409 0,
410 0,
411 ))
412 }
413 SqlStatement::Explain(_) => {
414 let (prepare_local_instructions, prepared) = prepare_statement();
415 let prepared = prepared?;
416 let (lower_local_instructions, lowered) = measure_sql_stage(|| {
417 lower_sql_command_from_prepared_statement(prepared, authority.model())
418 .map_err(QueryError::from_sql_lowering_error)
419 });
420 let lowered = lowered?;
421
422 Ok((
423 CompiledSqlCommand::Explain(Box::new(lowered)),
424 0,
425 prepare_local_instructions,
426 lower_local_instructions,
427 0,
428 ))
429 }
430 SqlStatement::Describe(_) => {
431 let (prepare_local_instructions, prepared) = prepare_statement();
432 let _prepared = prepared?;
433
434 Ok((
435 CompiledSqlCommand::DescribeEntity,
436 0,
437 prepare_local_instructions,
438 0,
439 0,
440 ))
441 }
442 SqlStatement::ShowIndexes(_) => {
443 let (prepare_local_instructions, prepared) = prepare_statement();
444 let _prepared = prepared?;
445
446 Ok((
447 CompiledSqlCommand::ShowIndexesEntity,
448 0,
449 prepare_local_instructions,
450 0,
451 0,
452 ))
453 }
454 SqlStatement::ShowColumns(_) => {
455 let (prepare_local_instructions, prepared) = prepare_statement();
456 let _prepared = prepared?;
457
458 Ok((
459 CompiledSqlCommand::ShowColumnsEntity,
460 0,
461 prepare_local_instructions,
462 0,
463 0,
464 ))
465 }
466 SqlStatement::ShowEntities(_) => Ok((CompiledSqlCommand::ShowEntities, 0, 0, 0, 0)),
467 }
468 }
469
470 fn sql_select_prepared_plan(
473 &self,
474 query: &StructuralQuery,
475 authority: EntityAuthority,
476 cache_schema_fingerprint: CommitSchemaFingerprint,
477 ) -> Result<
478 (
479 SharedPreparedExecutionPlan,
480 SqlProjectionContract,
481 SqlCacheAttribution,
482 ),
483 QueryError,
484 > {
485 let (prepared_plan, cache_attribution) = self.cached_shared_query_plan_for_authority(
486 authority,
487 cache_schema_fingerprint,
488 query,
489 )?;
490 let projection_spec = prepared_plan
491 .logical_plan()
492 .projection_spec(authority.model());
493 let projection = SqlProjectionContract::new(
494 projection_labels_from_projection_spec(&projection_spec),
495 projection_fixed_scales_from_projection_spec(&projection_spec),
496 );
497
498 Ok((
499 prepared_plan,
500 projection,
501 SqlCacheAttribution::from_shared_query_plan_cache(cache_attribution),
502 ))
503 }
504
505 fn ensure_sql_statement_supported_for_surface(
509 statement: &SqlStatement,
510 surface: SqlCompiledCommandSurface,
511 ) -> Result<(), QueryError> {
512 match (surface, statement) {
513 (
514 SqlCompiledCommandSurface::Query,
515 SqlStatement::Select(_)
516 | SqlStatement::Explain(_)
517 | SqlStatement::Describe(_)
518 | SqlStatement::ShowIndexes(_)
519 | SqlStatement::ShowColumns(_)
520 | SqlStatement::ShowEntities(_),
521 )
522 | (
523 SqlCompiledCommandSurface::Update,
524 SqlStatement::Insert(_) | SqlStatement::Update(_) | SqlStatement::Delete(_),
525 ) => Ok(()),
526 (SqlCompiledCommandSurface::Query, SqlStatement::Insert(_)) => {
527 Err(QueryError::unsupported_query(
528 "execute_sql_query rejects INSERT; use execute_sql_update::<E>()",
529 ))
530 }
531 (SqlCompiledCommandSurface::Query, SqlStatement::Update(_)) => {
532 Err(QueryError::unsupported_query(
533 "execute_sql_query rejects UPDATE; use execute_sql_update::<E>()",
534 ))
535 }
536 (SqlCompiledCommandSurface::Query, SqlStatement::Delete(_)) => {
537 Err(QueryError::unsupported_query(
538 "execute_sql_query rejects DELETE; use execute_sql_update::<E>()",
539 ))
540 }
541 (SqlCompiledCommandSurface::Update, SqlStatement::Select(_)) => {
542 Err(QueryError::unsupported_query(
543 "execute_sql_update rejects SELECT; use execute_sql_query::<E>()",
544 ))
545 }
546 (SqlCompiledCommandSurface::Update, SqlStatement::Explain(_)) => {
547 Err(QueryError::unsupported_query(
548 "execute_sql_update rejects EXPLAIN; use execute_sql_query::<E>()",
549 ))
550 }
551 (SqlCompiledCommandSurface::Update, SqlStatement::Describe(_)) => {
552 Err(QueryError::unsupported_query(
553 "execute_sql_update rejects DESCRIBE; use execute_sql_query::<E>()",
554 ))
555 }
556 (SqlCompiledCommandSurface::Update, SqlStatement::ShowIndexes(_)) => {
557 Err(QueryError::unsupported_query(
558 "execute_sql_update rejects SHOW INDEXES; use execute_sql_query::<E>()",
559 ))
560 }
561 (SqlCompiledCommandSurface::Update, SqlStatement::ShowColumns(_)) => {
562 Err(QueryError::unsupported_query(
563 "execute_sql_update rejects SHOW COLUMNS; use execute_sql_query::<E>()",
564 ))
565 }
566 (SqlCompiledCommandSurface::Update, SqlStatement::ShowEntities(_)) => {
567 Err(QueryError::unsupported_query(
568 "execute_sql_update rejects SHOW ENTITIES; use execute_sql_query::<E>()",
569 ))
570 }
571 }
572 }
573
574 pub fn execute_sql_query<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
579 where
580 E: PersistedRow<Canister = C> + EntityValue,
581 {
582 let compiled = self.compile_sql_query::<E>(sql)?;
583
584 self.execute_compiled_sql::<E>(&compiled)
585 }
586
587 #[cfg(feature = "diagnostics")]
590 #[doc(hidden)]
591 #[expect(
592 clippy::needless_update,
593 reason = "diagnostics attribution literals stay default-backed so future counters do not break every initializer"
594 )]
595 pub fn execute_sql_query_with_attribution<E>(
596 &self,
597 sql: &str,
598 ) -> Result<(SqlStatementResult, SqlQueryExecutionAttribution), QueryError>
599 where
600 E: PersistedRow<Canister = C> + EntityValue,
601 {
602 let (compile_local_instructions, compiled) =
605 measure_sql_stage(|| self.compile_sql_query_with_cache_attribution::<E>(sql));
606 let (compiled, compile_cache_attribution, compile_phase_attribution) = compiled?;
607
608 let store_get_calls_before = DataStore::current_get_call_count();
611 let pure_covering_decode_before = current_pure_covering_decode_local_instructions();
612 let pure_covering_row_assembly_before =
613 current_pure_covering_row_assembly_local_instructions();
614 let (result, execute_cache_attribution, execute_phase_attribution) =
615 self.execute_compiled_sql_with_phase_attribution::<E>(&compiled)?;
616 let store_get_calls =
617 DataStore::current_get_call_count().saturating_sub(store_get_calls_before);
618 let pure_covering_decode_local_instructions =
619 current_pure_covering_decode_local_instructions()
620 .saturating_sub(pure_covering_decode_before);
621 let pure_covering_row_assembly_local_instructions =
622 current_pure_covering_row_assembly_local_instructions()
623 .saturating_sub(pure_covering_row_assembly_before);
624 let execute_local_instructions = execute_phase_attribution
625 .planner_local_instructions
626 .saturating_add(execute_phase_attribution.store_local_instructions)
627 .saturating_add(execute_phase_attribution.executor_local_instructions)
628 .saturating_add(execute_phase_attribution.response_finalization_local_instructions);
629 let cache_attribution = compile_cache_attribution.merge(execute_cache_attribution);
630 let total_local_instructions =
631 compile_local_instructions.saturating_add(execute_local_instructions);
632
633 Ok((
634 result,
635 SqlQueryExecutionAttribution {
636 compile_local_instructions,
637 compile_cache_key_local_instructions: compile_phase_attribution.cache_key,
638 compile_cache_lookup_local_instructions: compile_phase_attribution.cache_lookup,
639 compile_parse_local_instructions: compile_phase_attribution.parse,
640 compile_parse_tokenize_local_instructions: compile_phase_attribution.parse_tokenize,
641 compile_parse_select_local_instructions: compile_phase_attribution.parse_select,
642 compile_parse_expr_local_instructions: compile_phase_attribution.parse_expr,
643 compile_parse_predicate_local_instructions: compile_phase_attribution
644 .parse_predicate,
645 compile_aggregate_lane_check_local_instructions: compile_phase_attribution
646 .aggregate_lane_check,
647 compile_prepare_local_instructions: compile_phase_attribution.prepare,
648 compile_lower_local_instructions: compile_phase_attribution.lower,
649 compile_bind_local_instructions: compile_phase_attribution.bind,
650 compile_cache_insert_local_instructions: compile_phase_attribution.cache_insert,
651 plan_lookup_local_instructions: execute_phase_attribution
652 .planner_local_instructions,
653 planner_local_instructions: execute_phase_attribution.planner_local_instructions,
654 store_local_instructions: execute_phase_attribution.store_local_instructions,
655 executor_invocation_local_instructions: execute_phase_attribution
656 .executor_invocation_local_instructions,
657 executor_local_instructions: execute_phase_attribution.executor_local_instructions,
658 response_finalization_local_instructions: execute_phase_attribution
659 .response_finalization_local_instructions,
660 grouped_stream_local_instructions: execute_phase_attribution
661 .grouped_stream_local_instructions,
662 grouped_fold_local_instructions: execute_phase_attribution
663 .grouped_fold_local_instructions,
664 grouped_finalize_local_instructions: execute_phase_attribution
665 .grouped_finalize_local_instructions,
666 grouped_count_borrowed_hash_computations: execute_phase_attribution
667 .grouped_count
668 .borrowed_hash_computations,
669 grouped_count_bucket_candidate_checks: execute_phase_attribution
670 .grouped_count
671 .bucket_candidate_checks,
672 grouped_count_existing_group_hits: execute_phase_attribution
673 .grouped_count
674 .existing_group_hits,
675 grouped_count_new_group_inserts: execute_phase_attribution
676 .grouped_count
677 .new_group_inserts,
678 grouped_count_row_materialization_local_instructions: execute_phase_attribution
679 .grouped_count
680 .row_materialization_local_instructions,
681 grouped_count_group_lookup_local_instructions: execute_phase_attribution
682 .grouped_count
683 .group_lookup_local_instructions,
684 grouped_count_existing_group_update_local_instructions: execute_phase_attribution
685 .grouped_count
686 .existing_group_update_local_instructions,
687 grouped_count_new_group_insert_local_instructions: execute_phase_attribution
688 .grouped_count
689 .new_group_insert_local_instructions,
690 pure_covering_decode_local_instructions,
691 pure_covering_row_assembly_local_instructions,
692 store_get_calls,
693 response_decode_local_instructions: 0,
694 execute_local_instructions,
695 total_local_instructions,
696 sql_compiled_command_cache_hits: cache_attribution.sql_compiled_command_cache_hits,
697 sql_compiled_command_cache_misses: cache_attribution
698 .sql_compiled_command_cache_misses,
699 shared_query_plan_cache_hits: cache_attribution.shared_query_plan_cache_hits,
700 shared_query_plan_cache_misses: cache_attribution.shared_query_plan_cache_misses,
701 ..SqlQueryExecutionAttribution::default()
702 },
703 ))
704 }
705
706 pub fn execute_sql_update<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
711 where
712 E: PersistedRow<Canister = C> + EntityValue,
713 {
714 let compiled = self.compile_sql_update::<E>(sql)?;
715
716 self.execute_compiled_sql::<E>(&compiled)
717 }
718
719 pub(in crate::db) fn compile_sql_query<E>(
722 &self,
723 sql: &str,
724 ) -> Result<CompiledSqlCommand, QueryError>
725 where
726 E: PersistedRow<Canister = C> + EntityValue,
727 {
728 self.compile_sql_query_with_cache_attribution::<E>(sql)
729 .map(|(compiled, _, _)| compiled)
730 }
731
732 fn compile_sql_query_with_cache_attribution<E>(
733 &self,
734 sql: &str,
735 ) -> Result<
736 (
737 CompiledSqlCommand,
738 SqlCacheAttribution,
739 SqlCompilePhaseAttribution,
740 ),
741 QueryError,
742 >
743 where
744 E: PersistedRow<Canister = C> + EntityValue,
745 {
746 self.compile_sql_surface_with_cache_attribution::<E>(sql, SqlCompiledCommandSurface::Query)
747 }
748
749 pub(in crate::db) fn compile_sql_update<E>(
752 &self,
753 sql: &str,
754 ) -> Result<CompiledSqlCommand, QueryError>
755 where
756 E: PersistedRow<Canister = C> + EntityValue,
757 {
758 self.compile_sql_update_with_cache_attribution::<E>(sql)
759 .map(|(compiled, _, _)| compiled)
760 }
761
762 fn compile_sql_update_with_cache_attribution<E>(
763 &self,
764 sql: &str,
765 ) -> Result<
766 (
767 CompiledSqlCommand,
768 SqlCacheAttribution,
769 SqlCompilePhaseAttribution,
770 ),
771 QueryError,
772 >
773 where
774 E: PersistedRow<Canister = C> + EntityValue,
775 {
776 self.compile_sql_surface_with_cache_attribution::<E>(sql, SqlCompiledCommandSurface::Update)
777 }
778
779 fn compile_sql_surface_with_cache_attribution<E>(
783 &self,
784 sql: &str,
785 surface: SqlCompiledCommandSurface,
786 ) -> Result<
787 (
788 CompiledSqlCommand,
789 SqlCacheAttribution,
790 SqlCompilePhaseAttribution,
791 ),
792 QueryError,
793 >
794 where
795 E: PersistedRow<Canister = C> + EntityValue,
796 {
797 let (cache_key_local_instructions, cache_key) = measure_sql_stage(|| {
798 Ok::<_, QueryError>(SqlCompiledCommandCacheKey::for_entity::<E>(surface, sql))
799 });
800 let cache_key = cache_key?;
801
802 self.compile_sql_statement_with_cache::<E, _>(
803 cache_key,
804 cache_key_local_instructions,
805 sql,
806 |statement| Self::ensure_sql_statement_supported_for_surface(statement, surface),
807 )
808 }
809
810 fn compile_sql_statement_with_cache<E, F>(
813 &self,
814 cache_key: SqlCompiledCommandCacheKey,
815 cache_key_local_instructions: u64,
816 sql: &str,
817 ensure_surface_supported: F,
818 ) -> Result<
819 (
820 CompiledSqlCommand,
821 SqlCacheAttribution,
822 SqlCompilePhaseAttribution,
823 ),
824 QueryError,
825 >
826 where
827 E: PersistedRow<Canister = C> + EntityValue,
828 F: FnOnce(&SqlStatement) -> Result<(), QueryError>,
829 {
830 let (cache_lookup_local_instructions, cached) = measure_sql_stage(|| {
831 let cached =
832 self.with_sql_compiled_command_cache(|cache| cache.get(&cache_key).cloned());
833 Ok::<_, QueryError>(cached)
834 });
835 let cached = cached?;
836 if let Some(compiled) = cached {
837 return Ok((
838 compiled,
839 SqlCacheAttribution::sql_compiled_command_cache_hit(),
840 SqlCompilePhaseAttribution::cache_hit(
841 cache_key_local_instructions,
842 cache_lookup_local_instructions,
843 ),
844 ));
845 }
846
847 let (parse_local_instructions, parsed) = measure_sql_stage(|| {
848 parse_sql_with_attribution(sql).map_err(QueryError::from_sql_parse_error)
849 });
850 let (parsed, parse_attribution) = parsed?;
851 let parse_select_local_instructions = parse_local_instructions
852 .saturating_sub(parse_attribution.tokenize)
853 .saturating_sub(parse_attribution.expr)
854 .saturating_sub(parse_attribution.predicate);
855 ensure_surface_supported(&parsed)?;
856 let authority = EntityAuthority::for_type::<E>();
857 let (
858 compiled,
859 aggregate_lane_check_local_instructions,
860 prepare_local_instructions,
861 lower_local_instructions,
862 bind_local_instructions,
863 ) = Self::compile_sql_statement_for_authority(&parsed, authority, cache_key.clone())?;
864
865 let (cache_insert_local_instructions, cache_insert) = measure_sql_stage(|| {
866 self.with_sql_compiled_command_cache(|cache| {
867 cache.insert(cache_key, compiled.clone());
868 });
869 Ok::<_, QueryError>(())
870 });
871 cache_insert?;
872
873 Ok((
874 compiled,
875 SqlCacheAttribution::sql_compiled_command_cache_miss(),
876 SqlCompilePhaseAttribution {
877 cache_key: cache_key_local_instructions,
878 cache_lookup: cache_lookup_local_instructions,
879 parse: parse_local_instructions,
880 parse_tokenize: parse_attribution.tokenize,
881 parse_select: parse_select_local_instructions,
882 parse_expr: parse_attribution.expr,
883 parse_predicate: parse_attribution.predicate,
884 aggregate_lane_check: aggregate_lane_check_local_instructions,
885 prepare: prepare_local_instructions,
886 lower: lower_local_instructions,
887 bind: bind_local_instructions,
888 cache_insert: cache_insert_local_instructions,
889 },
890 ))
891 }
892}