1mod cache;
8mod compiled;
9mod execute;
10mod projection;
11
12#[cfg(feature = "diagnostics")]
13use candid::CandidType;
14#[cfg(feature = "diagnostics")]
15use serde::Deserialize;
16use std::sync::Arc;
17
18#[cfg(feature = "diagnostics")]
19use crate::db::DataStore;
20#[cfg(feature = "diagnostics")]
21use crate::db::executor::GroupedCountAttribution;
22#[cfg(feature = "diagnostics")]
23use crate::db::session::sql::projection::{
24 current_pure_covering_decode_local_instructions,
25 current_pure_covering_row_assembly_local_instructions,
26};
27#[cfg(test)]
28use crate::db::sql::parser::parse_sql;
29use crate::{
30 db::{
31 DbSession, GroupedRow, MissingRowPolicy, PersistedRow, QueryError,
32 commit::CommitSchemaFingerprint,
33 executor::{EntityAuthority, SharedPreparedExecutionPlan},
34 query::intent::StructuralQuery,
35 session::sql::projection::{
36 projection_fixed_scales_from_projection_spec, projection_labels_from_projection_spec,
37 },
38 sql::lowering::{
39 bind_lowered_sql_delete_query_structural, bind_lowered_sql_select_query_structural,
40 compile_sql_global_aggregate_command_core_from_prepared,
41 extract_prepared_sql_insert_statement, extract_prepared_sql_update_statement,
42 lower_prepared_sql_delete_statement, lower_prepared_sql_select_statement,
43 lower_sql_command_from_prepared_statement, prepare_sql_statement,
44 },
45 sql::parser::{SqlStatement, parse_sql_with_attribution},
46 },
47 traits::{CanisterKind, EntityValue},
48 value::OutputValue,
49};
50
51pub(in crate::db::session::sql) use cache::SqlCompiledCommandSurface;
52pub(in crate::db) use cache::{SqlCacheAttribution, SqlCompiledCommandCacheKey};
53pub(in crate::db) use compiled::{CompiledSqlCommand, SqlProjectionContract};
54
55#[cfg(all(test, not(feature = "diagnostics")))]
56pub(crate) use crate::db::session::sql::projection::with_sql_projection_materialization_metrics;
57#[cfg(feature = "diagnostics")]
58pub use crate::db::session::sql::projection::{
59 SqlProjectionMaterializationMetrics, with_sql_projection_materialization_metrics,
60};
61
62#[derive(Debug)]
64pub enum SqlStatementResult {
65 Count {
66 row_count: u32,
67 },
68 Projection {
69 columns: Vec<String>,
70 fixed_scales: Vec<Option<u32>>,
71 rows: Vec<Vec<OutputValue>>,
72 row_count: u32,
73 },
74 ProjectionText {
75 columns: Vec<String>,
76 rows: Vec<Vec<String>>,
77 row_count: u32,
78 },
79 Grouped {
80 columns: Vec<String>,
81 fixed_scales: Vec<Option<u32>>,
82 rows: Vec<GroupedRow>,
83 row_count: u32,
84 next_cursor: Option<String>,
85 },
86 Explain(String),
87 Describe(crate::db::EntitySchemaDescription),
88 ShowIndexes(Vec<String>),
89 ShowColumns(Vec<crate::db::EntityFieldDescription>),
90 ShowEntities(Vec<String>),
91}
92
93#[cfg(feature = "diagnostics")]
104#[derive(CandidType, Clone, Debug, Deserialize, Eq, PartialEq)]
105pub struct SqlQueryExecutionAttribution {
106 pub compile_local_instructions: u64,
107 pub compile_cache_key_local_instructions: u64,
108 pub compile_cache_lookup_local_instructions: u64,
109 pub compile_parse_local_instructions: u64,
110 pub compile_parse_tokenize_local_instructions: u64,
111 pub compile_parse_select_local_instructions: u64,
112 pub compile_parse_expr_local_instructions: u64,
113 pub compile_parse_predicate_local_instructions: u64,
114 pub compile_aggregate_lane_check_local_instructions: u64,
115 pub compile_prepare_local_instructions: u64,
116 pub compile_lower_local_instructions: u64,
117 pub compile_bind_local_instructions: u64,
118 pub compile_cache_insert_local_instructions: u64,
119 pub plan_lookup_local_instructions: u64,
120 pub planner_local_instructions: u64,
121 pub store_local_instructions: u64,
122 pub executor_invocation_local_instructions: u64,
123 pub executor_local_instructions: u64,
124 pub response_finalization_local_instructions: u64,
125 pub grouped_stream_local_instructions: u64,
126 pub grouped_fold_local_instructions: u64,
127 pub grouped_finalize_local_instructions: u64,
128 pub grouped_count_borrowed_hash_computations: u64,
129 pub grouped_count_bucket_candidate_checks: u64,
130 pub grouped_count_existing_group_hits: u64,
131 pub grouped_count_new_group_inserts: u64,
132 pub grouped_count_row_materialization_local_instructions: u64,
133 pub grouped_count_group_lookup_local_instructions: u64,
134 pub grouped_count_existing_group_update_local_instructions: u64,
135 pub grouped_count_new_group_insert_local_instructions: u64,
136 pub pure_covering_decode_local_instructions: u64,
137 pub pure_covering_row_assembly_local_instructions: u64,
138 pub store_get_calls: u64,
139 pub response_decode_local_instructions: u64,
140 pub execute_local_instructions: u64,
141 pub total_local_instructions: u64,
142 pub sql_compiled_command_cache_hits: u64,
143 pub sql_compiled_command_cache_misses: u64,
144 pub shared_query_plan_cache_hits: u64,
145 pub shared_query_plan_cache_misses: u64,
146}
147
148#[cfg(feature = "diagnostics")]
152#[derive(Clone, Copy, Debug, Eq, PartialEq)]
153pub(in crate::db) struct SqlExecutePhaseAttribution {
154 pub planner_local_instructions: u64,
155 pub store_local_instructions: u64,
156 pub executor_invocation_local_instructions: u64,
157 pub executor_local_instructions: u64,
158 pub response_finalization_local_instructions: u64,
159 pub grouped_stream_local_instructions: u64,
160 pub grouped_fold_local_instructions: u64,
161 pub grouped_finalize_local_instructions: u64,
162 pub grouped_count: GroupedCountAttribution,
163}
164
165#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
177pub(in crate::db) struct SqlCompilePhaseAttribution {
178 pub cache_key: u64,
179 pub cache_lookup: u64,
180 pub parse: u64,
181 pub parse_tokenize: u64,
182 pub parse_select: u64,
183 pub parse_expr: u64,
184 pub parse_predicate: u64,
185 pub aggregate_lane_check: u64,
186 pub prepare: u64,
187 pub lower: u64,
188 pub bind: u64,
189 pub cache_insert: u64,
190}
191
192impl SqlCompilePhaseAttribution {
193 #[must_use]
194 const fn cache_hit(cache_key: u64, cache_lookup: u64) -> Self {
195 Self {
196 cache_key,
197 cache_lookup,
198 parse: 0,
199 parse_tokenize: 0,
200 parse_select: 0,
201 parse_expr: 0,
202 parse_predicate: 0,
203 aggregate_lane_check: 0,
204 prepare: 0,
205 lower: 0,
206 bind: 0,
207 cache_insert: 0,
208 }
209 }
210}
211
212#[cfg(feature = "diagnostics")]
213impl SqlExecutePhaseAttribution {
214 #[must_use]
215 pub(in crate::db) const fn from_execute_total_and_store_total(
216 execute_local_instructions: u64,
217 store_local_instructions: u64,
218 ) -> Self {
219 Self {
220 planner_local_instructions: 0,
221 store_local_instructions,
222 executor_invocation_local_instructions: execute_local_instructions,
223 executor_local_instructions: execute_local_instructions
224 .saturating_sub(store_local_instructions),
225 response_finalization_local_instructions: 0,
226 grouped_stream_local_instructions: 0,
227 grouped_fold_local_instructions: 0,
228 grouped_finalize_local_instructions: 0,
229 grouped_count: GroupedCountAttribution::none(),
230 }
231 }
232}
233
234#[cfg(test)]
237pub(in crate::db) fn parse_sql_statement(sql: &str) -> Result<SqlStatement, QueryError> {
238 parse_sql(sql).map_err(QueryError::from_sql_parse_error)
239}
240
241#[cfg(feature = "diagnostics")]
242#[expect(
243 clippy::missing_const_for_fn,
244 reason = "the wasm32 branch reads the runtime performance counter and cannot be const"
245)]
246fn read_sql_local_instruction_counter() -> u64 {
247 #[cfg(all(feature = "diagnostics", target_arch = "wasm32"))]
248 {
249 canic_cdk::api::performance_counter(1)
250 }
251
252 #[cfg(not(all(feature = "diagnostics", target_arch = "wasm32")))]
253 {
254 0
255 }
256}
257
258pub(in crate::db::session::sql) fn measure_sql_stage<T, E>(
259 run: impl FnOnce() -> Result<T, E>,
260) -> (u64, Result<T, E>) {
261 #[cfg(feature = "diagnostics")]
262 let start = read_sql_local_instruction_counter();
263
264 let result = run();
265
266 #[cfg(feature = "diagnostics")]
267 let delta = read_sql_local_instruction_counter().saturating_sub(start);
268
269 #[cfg(not(feature = "diagnostics"))]
270 let delta = 0;
271
272 (delta, result)
273}
274
275impl<C: CanisterKind> DbSession<C> {
276 #[expect(clippy::too_many_lines)]
279 fn compile_sql_statement_for_authority(
280 statement: &SqlStatement,
281 authority: EntityAuthority,
282 compiled_cache_key: SqlCompiledCommandCacheKey,
283 ) -> Result<(CompiledSqlCommand, u64, u64, u64, u64), QueryError> {
284 let prepare_statement = || {
288 measure_sql_stage(|| {
289 prepare_sql_statement(statement.clone(), authority.model().name())
290 .map_err(QueryError::from_sql_lowering_error)
291 })
292 };
293
294 match statement {
295 SqlStatement::Select(_) => {
296 let (prepare_local_instructions, prepared) = prepare_statement();
297 let prepared = prepared?;
298 let (aggregate_lane_check_local_instructions, requires_aggregate_lane) =
299 measure_sql_stage(|| {
300 Ok::<_, QueryError>(prepared.statement().is_global_aggregate_lane_shape())
301 });
302 let requires_aggregate_lane = requires_aggregate_lane?;
303
304 if requires_aggregate_lane {
305 let (lower_local_instructions, command) = measure_sql_stage(|| {
306 compile_sql_global_aggregate_command_core_from_prepared(
307 prepared,
308 authority.model(),
309 MissingRowPolicy::Ignore,
310 )
311 .map_err(QueryError::from_sql_lowering_error)
312 });
313 let command = command?;
314
315 Ok((
316 CompiledSqlCommand::GlobalAggregate {
317 command: Box::new(command),
318 },
319 aggregate_lane_check_local_instructions,
320 prepare_local_instructions,
321 lower_local_instructions,
322 0,
323 ))
324 } else {
325 let (lower_local_instructions, select) = measure_sql_stage(|| {
326 lower_prepared_sql_select_statement(prepared, authority.model())
327 .map_err(QueryError::from_sql_lowering_error)
328 });
329 let select = select?;
330 let (bind_local_instructions, query) = measure_sql_stage(|| {
331 bind_lowered_sql_select_query_structural(
332 authority.model(),
333 select,
334 MissingRowPolicy::Ignore,
335 )
336 .map_err(QueryError::from_sql_lowering_error)
337 });
338 let query = query?;
339
340 Ok((
341 CompiledSqlCommand::Select {
342 query: Arc::new(query),
343 compiled_cache_key,
344 },
345 aggregate_lane_check_local_instructions,
346 prepare_local_instructions,
347 lower_local_instructions,
348 bind_local_instructions,
349 ))
350 }
351 }
352 SqlStatement::Delete(_) => {
353 let (prepare_local_instructions, prepared) = prepare_statement();
354 let prepared = prepared?;
355 let (lower_local_instructions, lowered) = measure_sql_stage(|| {
356 lower_prepared_sql_delete_statement(prepared)
357 .map_err(QueryError::from_sql_lowering_error)
358 });
359 let delete = lowered?;
360 let returning = delete.returning().cloned();
361 let query = delete.into_base_query();
362 let (bind_local_instructions, query) = measure_sql_stage(|| {
363 Ok::<_, QueryError>(bind_lowered_sql_delete_query_structural(
364 authority.model(),
365 query,
366 MissingRowPolicy::Ignore,
367 ))
368 });
369 let query = query?;
370
371 Ok((
372 CompiledSqlCommand::Delete {
373 query: Arc::new(query),
374 returning,
375 },
376 0,
377 prepare_local_instructions,
378 lower_local_instructions,
379 bind_local_instructions,
380 ))
381 }
382 SqlStatement::Insert(_) => {
383 let (prepare_local_instructions, prepared) = prepare_statement();
384 let prepared = prepared?;
385 let statement = extract_prepared_sql_insert_statement(prepared)
386 .map_err(QueryError::from_sql_lowering_error)?;
387
388 Ok((
389 CompiledSqlCommand::Insert(statement),
390 0,
391 prepare_local_instructions,
392 0,
393 0,
394 ))
395 }
396 SqlStatement::Update(_) => {
397 let (prepare_local_instructions, prepared) = prepare_statement();
398 let prepared = prepared?;
399 let statement = extract_prepared_sql_update_statement(prepared)
400 .map_err(QueryError::from_sql_lowering_error)?;
401
402 Ok((
403 CompiledSqlCommand::Update(statement),
404 0,
405 prepare_local_instructions,
406 0,
407 0,
408 ))
409 }
410 SqlStatement::Explain(_) => {
411 let (prepare_local_instructions, prepared) = prepare_statement();
412 let prepared = prepared?;
413 let (lower_local_instructions, lowered) = measure_sql_stage(|| {
414 lower_sql_command_from_prepared_statement(prepared, authority.model())
415 .map_err(QueryError::from_sql_lowering_error)
416 });
417 let lowered = lowered?;
418
419 Ok((
420 CompiledSqlCommand::Explain(Box::new(lowered)),
421 0,
422 prepare_local_instructions,
423 lower_local_instructions,
424 0,
425 ))
426 }
427 SqlStatement::Describe(_) => {
428 let (prepare_local_instructions, prepared) = prepare_statement();
429 let _prepared = prepared?;
430
431 Ok((
432 CompiledSqlCommand::DescribeEntity,
433 0,
434 prepare_local_instructions,
435 0,
436 0,
437 ))
438 }
439 SqlStatement::ShowIndexes(_) => {
440 let (prepare_local_instructions, prepared) = prepare_statement();
441 let _prepared = prepared?;
442
443 Ok((
444 CompiledSqlCommand::ShowIndexesEntity,
445 0,
446 prepare_local_instructions,
447 0,
448 0,
449 ))
450 }
451 SqlStatement::ShowColumns(_) => {
452 let (prepare_local_instructions, prepared) = prepare_statement();
453 let _prepared = prepared?;
454
455 Ok((
456 CompiledSqlCommand::ShowColumnsEntity,
457 0,
458 prepare_local_instructions,
459 0,
460 0,
461 ))
462 }
463 SqlStatement::ShowEntities(_) => Ok((CompiledSqlCommand::ShowEntities, 0, 0, 0, 0)),
464 }
465 }
466
467 fn sql_select_prepared_plan(
470 &self,
471 query: &StructuralQuery,
472 authority: EntityAuthority,
473 cache_schema_fingerprint: CommitSchemaFingerprint,
474 ) -> Result<
475 (
476 SharedPreparedExecutionPlan,
477 SqlProjectionContract,
478 SqlCacheAttribution,
479 ),
480 QueryError,
481 > {
482 let (prepared_plan, cache_attribution) = self.cached_shared_query_plan_for_authority(
483 authority,
484 cache_schema_fingerprint,
485 query,
486 )?;
487 let projection_spec = prepared_plan
488 .logical_plan()
489 .projection_spec(authority.model());
490 let projection = SqlProjectionContract::new(
491 projection_labels_from_projection_spec(&projection_spec),
492 projection_fixed_scales_from_projection_spec(&projection_spec),
493 );
494
495 Ok((
496 prepared_plan,
497 projection,
498 SqlCacheAttribution::from_shared_query_plan_cache(cache_attribution),
499 ))
500 }
501
502 fn ensure_sql_statement_supported_for_surface(
506 statement: &SqlStatement,
507 surface: SqlCompiledCommandSurface,
508 ) -> Result<(), QueryError> {
509 match (surface, statement) {
510 (
511 SqlCompiledCommandSurface::Query,
512 SqlStatement::Select(_)
513 | SqlStatement::Explain(_)
514 | SqlStatement::Describe(_)
515 | SqlStatement::ShowIndexes(_)
516 | SqlStatement::ShowColumns(_)
517 | SqlStatement::ShowEntities(_),
518 )
519 | (
520 SqlCompiledCommandSurface::Update,
521 SqlStatement::Insert(_) | SqlStatement::Update(_) | SqlStatement::Delete(_),
522 ) => Ok(()),
523 (SqlCompiledCommandSurface::Query, SqlStatement::Insert(_)) => {
524 Err(QueryError::unsupported_query(
525 "execute_sql_query rejects INSERT; use execute_sql_update::<E>()",
526 ))
527 }
528 (SqlCompiledCommandSurface::Query, SqlStatement::Update(_)) => {
529 Err(QueryError::unsupported_query(
530 "execute_sql_query rejects UPDATE; use execute_sql_update::<E>()",
531 ))
532 }
533 (SqlCompiledCommandSurface::Query, SqlStatement::Delete(_)) => {
534 Err(QueryError::unsupported_query(
535 "execute_sql_query rejects DELETE; use execute_sql_update::<E>()",
536 ))
537 }
538 (SqlCompiledCommandSurface::Update, SqlStatement::Select(_)) => {
539 Err(QueryError::unsupported_query(
540 "execute_sql_update rejects SELECT; use execute_sql_query::<E>()",
541 ))
542 }
543 (SqlCompiledCommandSurface::Update, SqlStatement::Explain(_)) => {
544 Err(QueryError::unsupported_query(
545 "execute_sql_update rejects EXPLAIN; use execute_sql_query::<E>()",
546 ))
547 }
548 (SqlCompiledCommandSurface::Update, SqlStatement::Describe(_)) => {
549 Err(QueryError::unsupported_query(
550 "execute_sql_update rejects DESCRIBE; use execute_sql_query::<E>()",
551 ))
552 }
553 (SqlCompiledCommandSurface::Update, SqlStatement::ShowIndexes(_)) => {
554 Err(QueryError::unsupported_query(
555 "execute_sql_update rejects SHOW INDEXES; use execute_sql_query::<E>()",
556 ))
557 }
558 (SqlCompiledCommandSurface::Update, SqlStatement::ShowColumns(_)) => {
559 Err(QueryError::unsupported_query(
560 "execute_sql_update rejects SHOW COLUMNS; use execute_sql_query::<E>()",
561 ))
562 }
563 (SqlCompiledCommandSurface::Update, SqlStatement::ShowEntities(_)) => {
564 Err(QueryError::unsupported_query(
565 "execute_sql_update rejects SHOW ENTITIES; use execute_sql_query::<E>()",
566 ))
567 }
568 }
569 }
570
571 pub fn execute_sql_query<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
576 where
577 E: PersistedRow<Canister = C> + EntityValue,
578 {
579 let compiled = self.compile_sql_query::<E>(sql)?;
580
581 self.execute_compiled_sql::<E>(&compiled)
582 }
583
584 #[cfg(feature = "diagnostics")]
587 #[doc(hidden)]
588 pub fn execute_sql_query_with_attribution<E>(
589 &self,
590 sql: &str,
591 ) -> Result<(SqlStatementResult, SqlQueryExecutionAttribution), QueryError>
592 where
593 E: PersistedRow<Canister = C> + EntityValue,
594 {
595 let (compile_local_instructions, compiled) =
598 measure_sql_stage(|| self.compile_sql_query_with_cache_attribution::<E>(sql));
599 let (compiled, compile_cache_attribution, compile_phase_attribution) = compiled?;
600
601 let store_get_calls_before = DataStore::current_get_call_count();
604 let pure_covering_decode_before = current_pure_covering_decode_local_instructions();
605 let pure_covering_row_assembly_before =
606 current_pure_covering_row_assembly_local_instructions();
607 let (result, execute_cache_attribution, execute_phase_attribution) =
608 self.execute_compiled_sql_with_phase_attribution::<E>(&compiled)?;
609 let store_get_calls =
610 DataStore::current_get_call_count().saturating_sub(store_get_calls_before);
611 let pure_covering_decode_local_instructions =
612 current_pure_covering_decode_local_instructions()
613 .saturating_sub(pure_covering_decode_before);
614 let pure_covering_row_assembly_local_instructions =
615 current_pure_covering_row_assembly_local_instructions()
616 .saturating_sub(pure_covering_row_assembly_before);
617 let execute_local_instructions = execute_phase_attribution
618 .planner_local_instructions
619 .saturating_add(execute_phase_attribution.store_local_instructions)
620 .saturating_add(execute_phase_attribution.executor_local_instructions)
621 .saturating_add(execute_phase_attribution.response_finalization_local_instructions);
622 let cache_attribution = compile_cache_attribution.merge(execute_cache_attribution);
623 let total_local_instructions =
624 compile_local_instructions.saturating_add(execute_local_instructions);
625
626 Ok((
627 result,
628 SqlQueryExecutionAttribution {
629 compile_local_instructions,
630 compile_cache_key_local_instructions: compile_phase_attribution.cache_key,
631 compile_cache_lookup_local_instructions: compile_phase_attribution.cache_lookup,
632 compile_parse_local_instructions: compile_phase_attribution.parse,
633 compile_parse_tokenize_local_instructions: compile_phase_attribution.parse_tokenize,
634 compile_parse_select_local_instructions: compile_phase_attribution.parse_select,
635 compile_parse_expr_local_instructions: compile_phase_attribution.parse_expr,
636 compile_parse_predicate_local_instructions: compile_phase_attribution
637 .parse_predicate,
638 compile_aggregate_lane_check_local_instructions: compile_phase_attribution
639 .aggregate_lane_check,
640 compile_prepare_local_instructions: compile_phase_attribution.prepare,
641 compile_lower_local_instructions: compile_phase_attribution.lower,
642 compile_bind_local_instructions: compile_phase_attribution.bind,
643 compile_cache_insert_local_instructions: compile_phase_attribution.cache_insert,
644 plan_lookup_local_instructions: execute_phase_attribution
645 .planner_local_instructions,
646 planner_local_instructions: execute_phase_attribution.planner_local_instructions,
647 store_local_instructions: execute_phase_attribution.store_local_instructions,
648 executor_invocation_local_instructions: execute_phase_attribution
649 .executor_invocation_local_instructions,
650 executor_local_instructions: execute_phase_attribution.executor_local_instructions,
651 response_finalization_local_instructions: execute_phase_attribution
652 .response_finalization_local_instructions,
653 grouped_stream_local_instructions: execute_phase_attribution
654 .grouped_stream_local_instructions,
655 grouped_fold_local_instructions: execute_phase_attribution
656 .grouped_fold_local_instructions,
657 grouped_finalize_local_instructions: execute_phase_attribution
658 .grouped_finalize_local_instructions,
659 grouped_count_borrowed_hash_computations: execute_phase_attribution
660 .grouped_count
661 .borrowed_hash_computations,
662 grouped_count_bucket_candidate_checks: execute_phase_attribution
663 .grouped_count
664 .bucket_candidate_checks,
665 grouped_count_existing_group_hits: execute_phase_attribution
666 .grouped_count
667 .existing_group_hits,
668 grouped_count_new_group_inserts: execute_phase_attribution
669 .grouped_count
670 .new_group_inserts,
671 grouped_count_row_materialization_local_instructions: execute_phase_attribution
672 .grouped_count
673 .row_materialization_local_instructions,
674 grouped_count_group_lookup_local_instructions: execute_phase_attribution
675 .grouped_count
676 .group_lookup_local_instructions,
677 grouped_count_existing_group_update_local_instructions: execute_phase_attribution
678 .grouped_count
679 .existing_group_update_local_instructions,
680 grouped_count_new_group_insert_local_instructions: execute_phase_attribution
681 .grouped_count
682 .new_group_insert_local_instructions,
683 pure_covering_decode_local_instructions,
684 pure_covering_row_assembly_local_instructions,
685 store_get_calls,
686 response_decode_local_instructions: 0,
687 execute_local_instructions,
688 total_local_instructions,
689 sql_compiled_command_cache_hits: cache_attribution.sql_compiled_command_cache_hits,
690 sql_compiled_command_cache_misses: cache_attribution
691 .sql_compiled_command_cache_misses,
692 shared_query_plan_cache_hits: cache_attribution.shared_query_plan_cache_hits,
693 shared_query_plan_cache_misses: cache_attribution.shared_query_plan_cache_misses,
694 },
695 ))
696 }
697
698 pub fn execute_sql_update<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
703 where
704 E: PersistedRow<Canister = C> + EntityValue,
705 {
706 let compiled = self.compile_sql_update::<E>(sql)?;
707
708 self.execute_compiled_sql::<E>(&compiled)
709 }
710
711 pub(in crate::db) fn compile_sql_query<E>(
714 &self,
715 sql: &str,
716 ) -> Result<CompiledSqlCommand, QueryError>
717 where
718 E: PersistedRow<Canister = C> + EntityValue,
719 {
720 self.compile_sql_query_with_cache_attribution::<E>(sql)
721 .map(|(compiled, _, _)| compiled)
722 }
723
724 fn compile_sql_query_with_cache_attribution<E>(
725 &self,
726 sql: &str,
727 ) -> Result<
728 (
729 CompiledSqlCommand,
730 SqlCacheAttribution,
731 SqlCompilePhaseAttribution,
732 ),
733 QueryError,
734 >
735 where
736 E: PersistedRow<Canister = C> + EntityValue,
737 {
738 self.compile_sql_surface_with_cache_attribution::<E>(sql, SqlCompiledCommandSurface::Query)
739 }
740
741 pub(in crate::db) fn compile_sql_update<E>(
744 &self,
745 sql: &str,
746 ) -> Result<CompiledSqlCommand, QueryError>
747 where
748 E: PersistedRow<Canister = C> + EntityValue,
749 {
750 self.compile_sql_update_with_cache_attribution::<E>(sql)
751 .map(|(compiled, _, _)| compiled)
752 }
753
754 fn compile_sql_update_with_cache_attribution<E>(
755 &self,
756 sql: &str,
757 ) -> Result<
758 (
759 CompiledSqlCommand,
760 SqlCacheAttribution,
761 SqlCompilePhaseAttribution,
762 ),
763 QueryError,
764 >
765 where
766 E: PersistedRow<Canister = C> + EntityValue,
767 {
768 self.compile_sql_surface_with_cache_attribution::<E>(sql, SqlCompiledCommandSurface::Update)
769 }
770
771 fn compile_sql_surface_with_cache_attribution<E>(
775 &self,
776 sql: &str,
777 surface: SqlCompiledCommandSurface,
778 ) -> Result<
779 (
780 CompiledSqlCommand,
781 SqlCacheAttribution,
782 SqlCompilePhaseAttribution,
783 ),
784 QueryError,
785 >
786 where
787 E: PersistedRow<Canister = C> + EntityValue,
788 {
789 let (cache_key_local_instructions, cache_key) = measure_sql_stage(|| {
790 Ok::<_, QueryError>(SqlCompiledCommandCacheKey::for_entity::<E>(surface, sql))
791 });
792 let cache_key = cache_key?;
793
794 self.compile_sql_statement_with_cache::<E, _>(
795 cache_key,
796 cache_key_local_instructions,
797 sql,
798 |statement| Self::ensure_sql_statement_supported_for_surface(statement, surface),
799 )
800 }
801
802 fn compile_sql_statement_with_cache<E, F>(
805 &self,
806 cache_key: SqlCompiledCommandCacheKey,
807 cache_key_local_instructions: u64,
808 sql: &str,
809 ensure_surface_supported: F,
810 ) -> Result<
811 (
812 CompiledSqlCommand,
813 SqlCacheAttribution,
814 SqlCompilePhaseAttribution,
815 ),
816 QueryError,
817 >
818 where
819 E: PersistedRow<Canister = C> + EntityValue,
820 F: FnOnce(&SqlStatement) -> Result<(), QueryError>,
821 {
822 let (cache_lookup_local_instructions, cached) = measure_sql_stage(|| {
823 let cached =
824 self.with_sql_compiled_command_cache(|cache| cache.get(&cache_key).cloned());
825 Ok::<_, QueryError>(cached)
826 });
827 let cached = cached?;
828 if let Some(compiled) = cached {
829 return Ok((
830 compiled,
831 SqlCacheAttribution::sql_compiled_command_cache_hit(),
832 SqlCompilePhaseAttribution::cache_hit(
833 cache_key_local_instructions,
834 cache_lookup_local_instructions,
835 ),
836 ));
837 }
838
839 let (parse_local_instructions, parsed) = measure_sql_stage(|| {
840 parse_sql_with_attribution(sql).map_err(QueryError::from_sql_parse_error)
841 });
842 let (parsed, parse_attribution) = parsed?;
843 let parse_select_local_instructions = parse_local_instructions
844 .saturating_sub(parse_attribution.tokenize)
845 .saturating_sub(parse_attribution.expr)
846 .saturating_sub(parse_attribution.predicate);
847 ensure_surface_supported(&parsed)?;
848 let authority = EntityAuthority::for_type::<E>();
849 let (
850 compiled,
851 aggregate_lane_check_local_instructions,
852 prepare_local_instructions,
853 lower_local_instructions,
854 bind_local_instructions,
855 ) = Self::compile_sql_statement_for_authority(&parsed, authority, cache_key.clone())?;
856
857 let (cache_insert_local_instructions, cache_insert) = measure_sql_stage(|| {
858 self.with_sql_compiled_command_cache(|cache| {
859 cache.insert(cache_key, compiled.clone());
860 });
861 Ok::<_, QueryError>(())
862 });
863 cache_insert?;
864
865 Ok((
866 compiled,
867 SqlCacheAttribution::sql_compiled_command_cache_miss(),
868 SqlCompilePhaseAttribution {
869 cache_key: cache_key_local_instructions,
870 cache_lookup: cache_lookup_local_instructions,
871 parse: parse_local_instructions,
872 parse_tokenize: parse_attribution.tokenize,
873 parse_select: parse_select_local_instructions,
874 parse_expr: parse_attribution.expr,
875 parse_predicate: parse_attribution.predicate,
876 aggregate_lane_check: aggregate_lane_check_local_instructions,
877 prepare: prepare_local_instructions,
878 lower: lower_local_instructions,
879 bind: bind_local_instructions,
880 cache_insert: cache_insert_local_instructions,
881 },
882 ))
883 }
884}