1mod execute;
8mod explain;
9mod projection;
10
11#[cfg(feature = "diagnostics")]
12use candid::CandidType;
13#[cfg(feature = "diagnostics")]
14use serde::Deserialize;
15use std::{cell::RefCell, collections::HashMap, sync::Arc};
16
17const SQL_COMPILED_COMMAND_CACHE_METHOD_VERSION: u8 = 1;
23
24#[cfg(feature = "diagnostics")]
25use crate::db::DataStore;
26#[cfg(feature = "diagnostics")]
27use crate::db::executor::GroupedCountAttribution;
28#[cfg(feature = "diagnostics")]
29use crate::db::session::sql::projection::{
30 current_pure_covering_decode_local_instructions,
31 current_pure_covering_row_assembly_local_instructions,
32};
33#[cfg(test)]
34use crate::db::sql::parser::parse_sql;
35use crate::db::sql::parser::{SqlDeleteStatement, SqlInsertStatement, SqlUpdateStatement};
36use crate::{
37 db::{
38 DbSession, GroupedRow, PersistedRow, QueryError,
39 commit::CommitSchemaFingerprint,
40 executor::{EntityAuthority, SharedPreparedExecutionPlan},
41 query::{
42 intent::StructuralQuery,
43 plan::{AccessPlannedQuery, VisibleIndexes},
44 },
45 schema::commit_schema_fingerprint_for_entity,
46 session::query::QueryPlanCacheAttribution,
47 session::sql::projection::{
48 projection_fixed_scales_from_projection_spec, projection_labels_from_projection_spec,
49 },
50 sql::lowering::{LoweredBaseQueryShape, LoweredSqlCommand, SqlGlobalAggregateCommandCore},
51 sql::parser::{SqlStatement, parse_sql_with_attribution},
52 },
53 traits::{CanisterKind, EntityValue},
54};
55
56#[cfg(all(test, not(feature = "diagnostics")))]
57pub(crate) use crate::db::session::sql::projection::with_sql_projection_materialization_metrics;
58#[cfg(feature = "diagnostics")]
59pub use crate::db::session::sql::projection::{
60 SqlProjectionMaterializationMetrics, with_sql_projection_materialization_metrics,
61};
62
63#[derive(Debug)]
65pub enum SqlStatementResult {
66 Count {
67 row_count: u32,
68 },
69 Projection {
70 columns: Vec<String>,
71 fixed_scales: Vec<Option<u32>>,
72 rows: Vec<Vec<crate::value::Value>>,
73 row_count: u32,
74 },
75 ProjectionText {
76 columns: Vec<String>,
77 rows: Vec<Vec<String>>,
78 row_count: u32,
79 },
80 Grouped {
81 columns: Vec<String>,
82 fixed_scales: Vec<Option<u32>>,
83 rows: Vec<GroupedRow>,
84 row_count: u32,
85 next_cursor: Option<String>,
86 },
87 Explain(String),
88 Describe(crate::db::EntitySchemaDescription),
89 ShowIndexes(Vec<String>),
90 ShowColumns(Vec<crate::db::EntityFieldDescription>),
91 ShowEntities(Vec<String>),
92}
93
94#[cfg(feature = "diagnostics")]
105#[derive(CandidType, Clone, Debug, Deserialize, Eq, PartialEq)]
106pub struct SqlQueryExecutionAttribution {
107 pub compile_local_instructions: u64,
108 pub compile_cache_key_local_instructions: u64,
109 pub compile_cache_lookup_local_instructions: u64,
110 pub compile_parse_local_instructions: u64,
111 pub compile_parse_tokenize_local_instructions: u64,
112 pub compile_parse_select_local_instructions: u64,
113 pub compile_parse_expr_local_instructions: u64,
114 pub compile_parse_predicate_local_instructions: u64,
115 pub compile_aggregate_lane_check_local_instructions: u64,
116 pub compile_prepare_local_instructions: u64,
117 pub compile_lower_local_instructions: u64,
118 pub compile_bind_local_instructions: u64,
119 pub compile_cache_insert_local_instructions: u64,
120 pub planner_local_instructions: u64,
121 pub store_local_instructions: u64,
122 pub executor_local_instructions: u64,
123 pub grouped_stream_local_instructions: u64,
124 pub grouped_fold_local_instructions: u64,
125 pub grouped_finalize_local_instructions: u64,
126 pub grouped_count_borrowed_hash_computations: u64,
127 pub grouped_count_bucket_candidate_checks: u64,
128 pub grouped_count_existing_group_hits: u64,
129 pub grouped_count_new_group_inserts: u64,
130 pub grouped_count_row_materialization_local_instructions: u64,
131 pub grouped_count_group_lookup_local_instructions: u64,
132 pub grouped_count_existing_group_update_local_instructions: u64,
133 pub grouped_count_new_group_insert_local_instructions: u64,
134 pub pure_covering_decode_local_instructions: u64,
135 pub pure_covering_row_assembly_local_instructions: u64,
136 pub store_get_calls: u64,
137 pub response_decode_local_instructions: u64,
138 pub execute_local_instructions: u64,
139 pub total_local_instructions: u64,
140 pub sql_compiled_command_cache_hits: u64,
141 pub sql_compiled_command_cache_misses: u64,
142 pub shared_query_plan_cache_hits: u64,
143 pub shared_query_plan_cache_misses: u64,
144}
145
146#[cfg(feature = "diagnostics")]
150#[derive(Clone, Copy, Debug, Eq, PartialEq)]
151pub(in crate::db) struct SqlExecutePhaseAttribution {
152 pub planner_local_instructions: u64,
153 pub store_local_instructions: u64,
154 pub executor_local_instructions: u64,
155 pub grouped_stream_local_instructions: u64,
156 pub grouped_fold_local_instructions: u64,
157 pub grouped_finalize_local_instructions: u64,
158 pub grouped_count: GroupedCountAttribution,
159}
160
161#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
173pub(in crate::db) struct SqlCompilePhaseAttribution {
174 pub cache_key: u64,
175 pub cache_lookup: u64,
176 pub parse: u64,
177 pub parse_tokenize: u64,
178 pub parse_select: u64,
179 pub parse_expr: u64,
180 pub parse_predicate: u64,
181 pub aggregate_lane_check: u64,
182 pub prepare: u64,
183 pub lower: u64,
184 pub bind: u64,
185 pub cache_insert: u64,
186}
187
188impl SqlCompilePhaseAttribution {
189 #[must_use]
190 const fn cache_hit(cache_key: u64, cache_lookup: u64) -> Self {
191 Self {
192 cache_key,
193 cache_lookup,
194 parse: 0,
195 parse_tokenize: 0,
196 parse_select: 0,
197 parse_expr: 0,
198 parse_predicate: 0,
199 aggregate_lane_check: 0,
200 prepare: 0,
201 lower: 0,
202 bind: 0,
203 cache_insert: 0,
204 }
205 }
206}
207
208#[cfg(feature = "diagnostics")]
209impl SqlExecutePhaseAttribution {
210 #[must_use]
211 pub(in crate::db) const fn from_execute_total_and_store_total(
212 execute_local_instructions: u64,
213 store_local_instructions: u64,
214 ) -> Self {
215 Self {
216 planner_local_instructions: 0,
217 store_local_instructions,
218 executor_local_instructions: execute_local_instructions
219 .saturating_sub(store_local_instructions),
220 grouped_stream_local_instructions: 0,
221 grouped_fold_local_instructions: 0,
222 grouped_finalize_local_instructions: 0,
223 grouped_count: GroupedCountAttribution::none(),
224 }
225 }
226}
227
228#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
234pub(in crate::db) struct SqlCacheAttribution {
235 pub sql_compiled_command_cache_hits: u64,
236 pub sql_compiled_command_cache_misses: u64,
237 pub shared_query_plan_cache_hits: u64,
238 pub shared_query_plan_cache_misses: u64,
239}
240
241impl SqlCacheAttribution {
242 #[must_use]
243 const fn none() -> Self {
244 Self {
245 sql_compiled_command_cache_hits: 0,
246 sql_compiled_command_cache_misses: 0,
247 shared_query_plan_cache_hits: 0,
248 shared_query_plan_cache_misses: 0,
249 }
250 }
251
252 #[must_use]
253 const fn sql_compiled_command_cache_hit() -> Self {
254 Self {
255 sql_compiled_command_cache_hits: 1,
256 ..Self::none()
257 }
258 }
259
260 #[must_use]
261 const fn sql_compiled_command_cache_miss() -> Self {
262 Self {
263 sql_compiled_command_cache_misses: 1,
264 ..Self::none()
265 }
266 }
267
268 #[must_use]
269 const fn from_shared_query_plan_cache(attribution: QueryPlanCacheAttribution) -> Self {
270 Self {
271 shared_query_plan_cache_hits: attribution.hits,
272 shared_query_plan_cache_misses: attribution.misses,
273 ..Self::none()
274 }
275 }
276
277 #[must_use]
278 const fn merge(self, other: Self) -> Self {
279 Self {
280 sql_compiled_command_cache_hits: self
281 .sql_compiled_command_cache_hits
282 .saturating_add(other.sql_compiled_command_cache_hits),
283 sql_compiled_command_cache_misses: self
284 .sql_compiled_command_cache_misses
285 .saturating_add(other.sql_compiled_command_cache_misses),
286 shared_query_plan_cache_hits: self
287 .shared_query_plan_cache_hits
288 .saturating_add(other.shared_query_plan_cache_hits),
289 shared_query_plan_cache_misses: self
290 .shared_query_plan_cache_misses
291 .saturating_add(other.shared_query_plan_cache_misses),
292 }
293 }
294}
295
296#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
297enum SqlCompiledCommandSurface {
298 Query,
299 Update,
300}
301
302#[derive(Clone, Debug, Eq, Hash, PartialEq)]
313pub(in crate::db) struct SqlCompiledCommandCacheKey {
314 cache_method_version: u8,
315 surface: SqlCompiledCommandSurface,
316 entity_path: &'static str,
317 schema_fingerprint: CommitSchemaFingerprint,
318 sql: String,
319}
320
321#[derive(Clone, Debug)]
331pub(in crate::db) struct SqlProjectionContract {
332 columns: Vec<String>,
333 fixed_scales: Vec<Option<u32>>,
334}
335
336impl SqlProjectionContract {
337 #[must_use]
338 pub(in crate::db) const fn new(columns: Vec<String>, fixed_scales: Vec<Option<u32>>) -> Self {
339 Self {
340 columns,
341 fixed_scales,
342 }
343 }
344
345 #[must_use]
346 pub(in crate::db) fn into_parts(self) -> (Vec<String>, Vec<Option<u32>>) {
347 (self.columns, self.fixed_scales)
348 }
349}
350
351impl SqlCompiledCommandCacheKey {
352 fn query_for_entity<E>(sql: &str) -> Self
353 where
354 E: PersistedRow + EntityValue,
355 {
356 Self::for_entity::<E>(SqlCompiledCommandSurface::Query, sql)
357 }
358
359 fn update_for_entity<E>(sql: &str) -> Self
360 where
361 E: PersistedRow + EntityValue,
362 {
363 Self::for_entity::<E>(SqlCompiledCommandSurface::Update, sql)
364 }
365
366 fn for_entity<E>(surface: SqlCompiledCommandSurface, sql: &str) -> Self
367 where
368 E: PersistedRow + EntityValue,
369 {
370 Self {
371 cache_method_version: SQL_COMPILED_COMMAND_CACHE_METHOD_VERSION,
372 surface,
373 entity_path: E::PATH,
374 schema_fingerprint: commit_schema_fingerprint_for_entity::<E>(),
375 sql: sql.to_string(),
376 }
377 }
378
379 #[must_use]
380 pub(in crate::db) const fn schema_fingerprint(&self) -> CommitSchemaFingerprint {
381 self.schema_fingerprint
382 }
383}
384
385#[cfg(test)]
386impl SqlCompiledCommandCacheKey {
387 pub(in crate::db) fn query_for_entity_with_method_version<E>(
388 sql: &str,
389 cache_method_version: u8,
390 ) -> Self
391 where
392 E: PersistedRow + EntityValue,
393 {
394 Self::for_entity_with_method_version::<E>(
395 SqlCompiledCommandSurface::Query,
396 sql,
397 cache_method_version,
398 )
399 }
400
401 pub(in crate::db) fn update_for_entity_with_method_version<E>(
402 sql: &str,
403 cache_method_version: u8,
404 ) -> Self
405 where
406 E: PersistedRow + EntityValue,
407 {
408 Self::for_entity_with_method_version::<E>(
409 SqlCompiledCommandSurface::Update,
410 sql,
411 cache_method_version,
412 )
413 }
414
415 fn for_entity_with_method_version<E>(
416 surface: SqlCompiledCommandSurface,
417 sql: &str,
418 cache_method_version: u8,
419 ) -> Self
420 where
421 E: PersistedRow + EntityValue,
422 {
423 Self {
424 cache_method_version,
425 surface,
426 entity_path: E::PATH,
427 schema_fingerprint: commit_schema_fingerprint_for_entity::<E>(),
428 sql: sql.to_string(),
429 }
430 }
431}
432
433pub(in crate::db) type SqlCompiledCommandCache =
434 HashMap<SqlCompiledCommandCacheKey, CompiledSqlCommand>;
435
436thread_local! {
437 static SQL_COMPILED_COMMAND_CACHES: RefCell<HashMap<usize, SqlCompiledCommandCache>> =
441 RefCell::new(HashMap::default());
442}
443
444#[derive(Clone, Debug)]
448pub(in crate::db) enum CompiledSqlCommand {
449 Select {
450 query: Arc<StructuralQuery>,
451 compiled_cache_key: SqlCompiledCommandCacheKey,
452 },
453 Delete {
454 query: LoweredBaseQueryShape,
455 statement: SqlDeleteStatement,
456 },
457 GlobalAggregate {
458 command: Box<SqlGlobalAggregateCommandCore>,
459 },
460 Explain(LoweredSqlCommand),
461 Insert(SqlInsertStatement),
462 Update(SqlUpdateStatement),
463 DescribeEntity,
464 ShowIndexesEntity,
465 ShowColumnsEntity,
466 ShowEntities,
467}
468
469impl CompiledSqlCommand {
470 fn new_select(query: StructuralQuery, compiled_cache_key: SqlCompiledCommandCacheKey) -> Self {
471 Self::Select {
472 query: Arc::new(query),
473 compiled_cache_key,
474 }
475 }
476}
477
478#[cfg(test)]
481pub(in crate::db) fn parse_sql_statement(sql: &str) -> Result<SqlStatement, QueryError> {
482 parse_sql(sql).map_err(QueryError::from_sql_parse_error)
483}
484
485fn parse_sql_statement_with_attribution(
489 sql: &str,
490) -> Result<
491 (
492 SqlStatement,
493 crate::db::sql::parser::SqlParsePhaseAttribution,
494 ),
495 QueryError,
496> {
497 parse_sql_with_attribution(sql).map_err(QueryError::from_sql_parse_error)
498}
499
500#[cfg(feature = "diagnostics")]
501#[expect(
502 clippy::missing_const_for_fn,
503 reason = "the wasm32 branch reads the runtime performance counter and cannot be const"
504)]
505fn read_sql_local_instruction_counter() -> u64 {
506 #[cfg(all(feature = "diagnostics", target_arch = "wasm32"))]
507 {
508 canic_cdk::api::performance_counter(1)
509 }
510
511 #[cfg(not(all(feature = "diagnostics", target_arch = "wasm32")))]
512 {
513 0
514 }
515}
516
517pub(in crate::db::session::sql) fn measure_sql_stage<T, E>(
518 run: impl FnOnce() -> Result<T, E>,
519) -> (u64, Result<T, E>) {
520 #[cfg(feature = "diagnostics")]
521 let start = read_sql_local_instruction_counter();
522
523 let result = run();
524
525 #[cfg(feature = "diagnostics")]
526 let delta = read_sql_local_instruction_counter().saturating_sub(start);
527
528 #[cfg(not(feature = "diagnostics"))]
529 let delta = 0;
530
531 (delta, result)
532}
533
534impl<C: CanisterKind> DbSession<C> {
535 fn sql_cache_scope_id(&self) -> usize {
536 self.db.cache_scope_id()
537 }
538
539 fn with_sql_compiled_command_cache<R>(
540 &self,
541 f: impl FnOnce(&mut SqlCompiledCommandCache) -> R,
542 ) -> R {
543 let scope_id = self.sql_cache_scope_id();
544
545 SQL_COMPILED_COMMAND_CACHES.with(|caches| {
546 let mut caches = caches.borrow_mut();
547 let cache = caches.entry(scope_id).or_default();
548
549 f(cache)
550 })
551 }
552
553 #[cfg(test)]
554 pub(in crate::db) fn sql_compiled_command_cache_len(&self) -> usize {
555 self.with_sql_compiled_command_cache(|cache| cache.len())
556 }
557
558 #[cfg(test)]
559 pub(in crate::db) fn clear_sql_caches_for_tests(&self) {
560 self.with_sql_compiled_command_cache(SqlCompiledCommandCache::clear);
561 }
562
563 fn sql_select_projection_contract_from_shared_prepared_plan(
567 authority: EntityAuthority,
568 prepared_plan: &SharedPreparedExecutionPlan,
569 ) -> SqlProjectionContract {
570 let projection = prepared_plan
571 .logical_plan()
572 .projection_spec(authority.model());
573 let columns = projection_labels_from_projection_spec(&projection);
574 let fixed_scales = projection_fixed_scales_from_projection_spec(&projection);
575
576 SqlProjectionContract::new(columns, fixed_scales)
577 }
578
579 fn sql_select_prepared_plan(
582 &self,
583 query: &StructuralQuery,
584 authority: EntityAuthority,
585 cache_schema_fingerprint: CommitSchemaFingerprint,
586 ) -> Result<
587 (
588 SharedPreparedExecutionPlan,
589 SqlProjectionContract,
590 SqlCacheAttribution,
591 ),
592 QueryError,
593 > {
594 let (prepared_plan, cache_attribution) = self.cached_shared_query_plan_for_authority(
595 authority,
596 cache_schema_fingerprint,
597 query,
598 )?;
599 let projection = Self::sql_select_projection_contract_from_shared_prepared_plan(
600 authority,
601 &prepared_plan,
602 );
603
604 Ok((
605 prepared_plan,
606 projection,
607 SqlCacheAttribution::from_shared_query_plan_cache(cache_attribution),
608 ))
609 }
610
611 pub(in crate::db::session::sql) fn build_structural_plan_with_visible_indexes_for_authority(
614 &self,
615 query: StructuralQuery,
616 authority: EntityAuthority,
617 ) -> Result<(VisibleIndexes<'_>, AccessPlannedQuery), QueryError> {
618 let visible_indexes =
619 self.visible_indexes_for_store_model(authority.store_path(), authority.model())?;
620 let plan = query.build_plan_with_visible_indexes(&visible_indexes)?;
621
622 Ok((visible_indexes, plan))
623 }
624
625 fn ensure_sql_query_statement_supported(statement: &SqlStatement) -> Result<(), QueryError> {
628 match statement {
629 SqlStatement::Select(_)
630 | SqlStatement::Explain(_)
631 | SqlStatement::Describe(_)
632 | SqlStatement::ShowIndexes(_)
633 | SqlStatement::ShowColumns(_)
634 | SqlStatement::ShowEntities(_) => Ok(()),
635 SqlStatement::Insert(_) => Err(QueryError::unsupported_query(
636 "execute_sql_query rejects INSERT; use execute_sql_update::<E>()",
637 )),
638 SqlStatement::Update(_) => Err(QueryError::unsupported_query(
639 "execute_sql_query rejects UPDATE; use execute_sql_update::<E>()",
640 )),
641 SqlStatement::Delete(_) => Err(QueryError::unsupported_query(
642 "execute_sql_query rejects DELETE; use execute_sql_update::<E>()",
643 )),
644 }
645 }
646
647 fn ensure_sql_update_statement_supported(statement: &SqlStatement) -> Result<(), QueryError> {
650 match statement {
651 SqlStatement::Insert(_) | SqlStatement::Update(_) | SqlStatement::Delete(_) => Ok(()),
652 SqlStatement::Select(_) => Err(QueryError::unsupported_query(
653 "execute_sql_update rejects SELECT; use execute_sql_query::<E>()",
654 )),
655 SqlStatement::Explain(_) => Err(QueryError::unsupported_query(
656 "execute_sql_update rejects EXPLAIN; use execute_sql_query::<E>()",
657 )),
658 SqlStatement::Describe(_) => Err(QueryError::unsupported_query(
659 "execute_sql_update rejects DESCRIBE; use execute_sql_query::<E>()",
660 )),
661 SqlStatement::ShowIndexes(_) => Err(QueryError::unsupported_query(
662 "execute_sql_update rejects SHOW INDEXES; use execute_sql_query::<E>()",
663 )),
664 SqlStatement::ShowColumns(_) => Err(QueryError::unsupported_query(
665 "execute_sql_update rejects SHOW COLUMNS; use execute_sql_query::<E>()",
666 )),
667 SqlStatement::ShowEntities(_) => Err(QueryError::unsupported_query(
668 "execute_sql_update rejects SHOW ENTITIES; use execute_sql_query::<E>()",
669 )),
670 }
671 }
672
673 pub fn execute_sql_query<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
678 where
679 E: PersistedRow<Canister = C> + EntityValue,
680 {
681 let compiled = self.compile_sql_query::<E>(sql)?;
682
683 self.execute_compiled_sql::<E>(&compiled)
684 }
685
686 #[cfg(feature = "diagnostics")]
689 #[doc(hidden)]
690 pub fn execute_sql_query_with_attribution<E>(
691 &self,
692 sql: &str,
693 ) -> Result<(SqlStatementResult, SqlQueryExecutionAttribution), QueryError>
694 where
695 E: PersistedRow<Canister = C> + EntityValue,
696 {
697 let (compile_local_instructions, compiled) =
700 measure_sql_stage(|| self.compile_sql_query_with_cache_attribution::<E>(sql));
701 let (compiled, compile_cache_attribution, compile_phase_attribution) = compiled?;
702
703 let store_get_calls_before = DataStore::current_get_call_count();
706 let pure_covering_decode_before = current_pure_covering_decode_local_instructions();
707 let pure_covering_row_assembly_before =
708 current_pure_covering_row_assembly_local_instructions();
709 let (result, execute_cache_attribution, execute_phase_attribution) =
710 self.execute_compiled_sql_with_phase_attribution::<E>(&compiled)?;
711 let store_get_calls =
712 DataStore::current_get_call_count().saturating_sub(store_get_calls_before);
713 let pure_covering_decode_local_instructions =
714 current_pure_covering_decode_local_instructions()
715 .saturating_sub(pure_covering_decode_before);
716 let pure_covering_row_assembly_local_instructions =
717 current_pure_covering_row_assembly_local_instructions()
718 .saturating_sub(pure_covering_row_assembly_before);
719 let execute_local_instructions = execute_phase_attribution
720 .planner_local_instructions
721 .saturating_add(execute_phase_attribution.store_local_instructions)
722 .saturating_add(execute_phase_attribution.executor_local_instructions);
723 let cache_attribution = compile_cache_attribution.merge(execute_cache_attribution);
724 let total_local_instructions =
725 compile_local_instructions.saturating_add(execute_local_instructions);
726
727 Ok((
728 result,
729 SqlQueryExecutionAttribution {
730 compile_local_instructions,
731 compile_cache_key_local_instructions: compile_phase_attribution.cache_key,
732 compile_cache_lookup_local_instructions: compile_phase_attribution.cache_lookup,
733 compile_parse_local_instructions: compile_phase_attribution.parse,
734 compile_parse_tokenize_local_instructions: compile_phase_attribution.parse_tokenize,
735 compile_parse_select_local_instructions: compile_phase_attribution.parse_select,
736 compile_parse_expr_local_instructions: compile_phase_attribution.parse_expr,
737 compile_parse_predicate_local_instructions: compile_phase_attribution
738 .parse_predicate,
739 compile_aggregate_lane_check_local_instructions: compile_phase_attribution
740 .aggregate_lane_check,
741 compile_prepare_local_instructions: compile_phase_attribution.prepare,
742 compile_lower_local_instructions: compile_phase_attribution.lower,
743 compile_bind_local_instructions: compile_phase_attribution.bind,
744 compile_cache_insert_local_instructions: compile_phase_attribution.cache_insert,
745 planner_local_instructions: execute_phase_attribution.planner_local_instructions,
746 store_local_instructions: execute_phase_attribution.store_local_instructions,
747 executor_local_instructions: execute_phase_attribution.executor_local_instructions,
748 grouped_stream_local_instructions: execute_phase_attribution
749 .grouped_stream_local_instructions,
750 grouped_fold_local_instructions: execute_phase_attribution
751 .grouped_fold_local_instructions,
752 grouped_finalize_local_instructions: execute_phase_attribution
753 .grouped_finalize_local_instructions,
754 grouped_count_borrowed_hash_computations: execute_phase_attribution
755 .grouped_count
756 .borrowed_hash_computations,
757 grouped_count_bucket_candidate_checks: execute_phase_attribution
758 .grouped_count
759 .bucket_candidate_checks,
760 grouped_count_existing_group_hits: execute_phase_attribution
761 .grouped_count
762 .existing_group_hits,
763 grouped_count_new_group_inserts: execute_phase_attribution
764 .grouped_count
765 .new_group_inserts,
766 grouped_count_row_materialization_local_instructions: execute_phase_attribution
767 .grouped_count
768 .row_materialization_local_instructions,
769 grouped_count_group_lookup_local_instructions: execute_phase_attribution
770 .grouped_count
771 .group_lookup_local_instructions,
772 grouped_count_existing_group_update_local_instructions: execute_phase_attribution
773 .grouped_count
774 .existing_group_update_local_instructions,
775 grouped_count_new_group_insert_local_instructions: execute_phase_attribution
776 .grouped_count
777 .new_group_insert_local_instructions,
778 pure_covering_decode_local_instructions,
779 pure_covering_row_assembly_local_instructions,
780 store_get_calls,
781 response_decode_local_instructions: 0,
782 execute_local_instructions,
783 total_local_instructions,
784 sql_compiled_command_cache_hits: cache_attribution.sql_compiled_command_cache_hits,
785 sql_compiled_command_cache_misses: cache_attribution
786 .sql_compiled_command_cache_misses,
787 shared_query_plan_cache_hits: cache_attribution.shared_query_plan_cache_hits,
788 shared_query_plan_cache_misses: cache_attribution.shared_query_plan_cache_misses,
789 },
790 ))
791 }
792
793 pub fn execute_sql_update<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
798 where
799 E: PersistedRow<Canister = C> + EntityValue,
800 {
801 let compiled = self.compile_sql_update::<E>(sql)?;
802
803 self.execute_compiled_sql::<E>(&compiled)
804 }
805
806 pub(in crate::db) fn compile_sql_query<E>(
809 &self,
810 sql: &str,
811 ) -> Result<CompiledSqlCommand, QueryError>
812 where
813 E: PersistedRow<Canister = C> + EntityValue,
814 {
815 self.compile_sql_query_with_cache_attribution::<E>(sql)
816 .map(|(compiled, _, _)| compiled)
817 }
818
819 fn compile_sql_query_with_cache_attribution<E>(
820 &self,
821 sql: &str,
822 ) -> Result<
823 (
824 CompiledSqlCommand,
825 SqlCacheAttribution,
826 SqlCompilePhaseAttribution,
827 ),
828 QueryError,
829 >
830 where
831 E: PersistedRow<Canister = C> + EntityValue,
832 {
833 let (cache_key_local_instructions, cache_key) = measure_sql_stage(|| {
834 Ok::<_, QueryError>(SqlCompiledCommandCacheKey::query_for_entity::<E>(sql))
835 });
836 let cache_key = cache_key?;
837
838 self.compile_sql_statement_with_cache::<E>(
839 cache_key,
840 cache_key_local_instructions,
841 sql,
842 Self::ensure_sql_query_statement_supported,
843 )
844 }
845
846 pub(in crate::db) fn compile_sql_update<E>(
849 &self,
850 sql: &str,
851 ) -> Result<CompiledSqlCommand, QueryError>
852 where
853 E: PersistedRow<Canister = C> + EntityValue,
854 {
855 self.compile_sql_update_with_cache_attribution::<E>(sql)
856 .map(|(compiled, _, _)| compiled)
857 }
858
859 fn compile_sql_update_with_cache_attribution<E>(
860 &self,
861 sql: &str,
862 ) -> Result<
863 (
864 CompiledSqlCommand,
865 SqlCacheAttribution,
866 SqlCompilePhaseAttribution,
867 ),
868 QueryError,
869 >
870 where
871 E: PersistedRow<Canister = C> + EntityValue,
872 {
873 let (cache_key_local_instructions, cache_key) = measure_sql_stage(|| {
874 Ok::<_, QueryError>(SqlCompiledCommandCacheKey::update_for_entity::<E>(sql))
875 });
876 let cache_key = cache_key?;
877
878 self.compile_sql_statement_with_cache::<E>(
879 cache_key,
880 cache_key_local_instructions,
881 sql,
882 Self::ensure_sql_update_statement_supported,
883 )
884 }
885
886 fn compile_sql_statement_with_cache<E>(
889 &self,
890 cache_key: SqlCompiledCommandCacheKey,
891 cache_key_local_instructions: u64,
892 sql: &str,
893 ensure_surface_supported: fn(&SqlStatement) -> Result<(), QueryError>,
894 ) -> Result<
895 (
896 CompiledSqlCommand,
897 SqlCacheAttribution,
898 SqlCompilePhaseAttribution,
899 ),
900 QueryError,
901 >
902 where
903 E: PersistedRow<Canister = C> + EntityValue,
904 {
905 let (cache_lookup_local_instructions, cached) = measure_sql_stage(|| {
906 let cached =
907 self.with_sql_compiled_command_cache(|cache| cache.get(&cache_key).cloned());
908 Ok::<_, QueryError>(cached)
909 });
910 let cached = cached?;
911 if let Some(compiled) = cached {
912 return Ok((
913 compiled,
914 SqlCacheAttribution::sql_compiled_command_cache_hit(),
915 SqlCompilePhaseAttribution::cache_hit(
916 cache_key_local_instructions,
917 cache_lookup_local_instructions,
918 ),
919 ));
920 }
921
922 let (parse_local_instructions, parsed) =
923 measure_sql_stage(|| parse_sql_statement_with_attribution(sql));
924 let (parsed, parse_attribution) = parsed?;
925 let parse_select_local_instructions = parse_local_instructions
926 .saturating_sub(parse_attribution.tokenize)
927 .saturating_sub(parse_attribution.expr)
928 .saturating_sub(parse_attribution.predicate);
929 ensure_surface_supported(&parsed)?;
930 let authority = EntityAuthority::for_type::<E>();
931 let (
932 compiled,
933 aggregate_lane_check_local_instructions,
934 prepare_local_instructions,
935 lower_local_instructions,
936 bind_local_instructions,
937 ) = Self::compile_sql_statement_for_authority(&parsed, authority, cache_key.clone())?;
938
939 let (cache_insert_local_instructions, cache_insert) = measure_sql_stage(|| {
940 self.with_sql_compiled_command_cache(|cache| {
941 cache.insert(cache_key, compiled.clone());
942 });
943 Ok::<_, QueryError>(())
944 });
945 cache_insert?;
946
947 Ok((
948 compiled,
949 SqlCacheAttribution::sql_compiled_command_cache_miss(),
950 SqlCompilePhaseAttribution {
951 cache_key: cache_key_local_instructions,
952 cache_lookup: cache_lookup_local_instructions,
953 parse: parse_local_instructions,
954 parse_tokenize: parse_attribution.tokenize,
955 parse_select: parse_select_local_instructions,
956 parse_expr: parse_attribution.expr,
957 parse_predicate: parse_attribution.predicate,
958 aggregate_lane_check: aggregate_lane_check_local_instructions,
959 prepare: prepare_local_instructions,
960 lower: lower_local_instructions,
961 bind: bind_local_instructions,
962 cache_insert: cache_insert_local_instructions,
963 },
964 ))
965 }
966}