1mod execute;
8mod explain;
9mod projection;
10
11#[cfg(feature = "diagnostics")]
12use candid::CandidType;
13use icydb_utils::Xxh3;
14#[cfg(feature = "diagnostics")]
15use serde::Deserialize;
16use std::{cell::RefCell, collections::HashMap, hash::BuildHasherDefault, sync::Arc};
17
18type CacheBuildHasher = BuildHasherDefault<Xxh3>;
19
20const SQL_COMPILED_COMMAND_CACHE_METHOD_VERSION: u8 = 1;
23
24#[cfg(feature = "diagnostics")]
25use crate::db::DataStore;
26#[cfg(feature = "diagnostics")]
27use crate::db::executor::GroupedCountAttribution;
28#[cfg(feature = "diagnostics")]
29use crate::db::session::sql::projection::{
30 current_pure_covering_decode_local_instructions,
31 current_pure_covering_row_assembly_local_instructions,
32};
33#[cfg(test)]
34use crate::db::sql::parser::parse_sql;
35use crate::db::sql::parser::{SqlDeleteStatement, SqlInsertStatement, SqlUpdateStatement};
36use crate::{
37 db::{
38 DbSession, GroupedRow, PersistedRow, QueryError,
39 commit::CommitSchemaFingerprint,
40 executor::{EntityAuthority, SharedPreparedExecutionPlan},
41 query::{
42 intent::StructuralQuery,
43 plan::{AccessPlannedQuery, VisibleIndexes},
44 },
45 schema::commit_schema_fingerprint_for_entity,
46 session::query::QueryPlanCacheAttribution,
47 session::sql::projection::{
48 projection_fixed_scales_from_projection_spec, projection_labels_from_projection_spec,
49 },
50 sql::lowering::{LoweredBaseQueryShape, LoweredSqlCommand, SqlGlobalAggregateCommandCore},
51 sql::parser::{SqlStatement, parse_sql_with_attribution},
52 },
53 traits::{CanisterKind, EntityValue},
54};
55
56#[cfg(all(test, not(feature = "diagnostics")))]
57pub(crate) use crate::db::session::sql::projection::with_sql_projection_materialization_metrics;
58#[cfg(feature = "diagnostics")]
59pub use crate::db::session::sql::projection::{
60 SqlProjectionMaterializationMetrics, with_sql_projection_materialization_metrics,
61};
62
63#[derive(Debug)]
65pub enum SqlStatementResult {
66 Count {
67 row_count: u32,
68 },
69 Projection {
70 columns: Vec<String>,
71 fixed_scales: Vec<Option<u32>>,
72 rows: Vec<Vec<crate::value::Value>>,
73 row_count: u32,
74 },
75 ProjectionText {
76 columns: Vec<String>,
77 rows: Vec<Vec<String>>,
78 row_count: u32,
79 },
80 Grouped {
81 columns: Vec<String>,
82 fixed_scales: Vec<Option<u32>>,
83 rows: Vec<GroupedRow>,
84 row_count: u32,
85 next_cursor: Option<String>,
86 },
87 Explain(String),
88 Describe(crate::db::EntitySchemaDescription),
89 ShowIndexes(Vec<String>),
90 ShowColumns(Vec<crate::db::EntityFieldDescription>),
91 ShowEntities(Vec<String>),
92}
93
94#[cfg(feature = "diagnostics")]
105#[derive(CandidType, Clone, Debug, Deserialize, Eq, PartialEq)]
106pub struct SqlQueryExecutionAttribution {
107 pub compile_local_instructions: u64,
108 pub compile_cache_key_local_instructions: u64,
109 pub compile_cache_lookup_local_instructions: u64,
110 pub compile_parse_local_instructions: u64,
111 pub compile_parse_tokenize_local_instructions: u64,
112 pub compile_parse_select_local_instructions: u64,
113 pub compile_parse_expr_local_instructions: u64,
114 pub compile_parse_predicate_local_instructions: u64,
115 pub compile_aggregate_lane_check_local_instructions: u64,
116 pub compile_prepare_local_instructions: u64,
117 pub compile_lower_local_instructions: u64,
118 pub compile_bind_local_instructions: u64,
119 pub compile_cache_insert_local_instructions: u64,
120 pub planner_local_instructions: u64,
121 pub store_local_instructions: u64,
122 pub executor_local_instructions: u64,
123 pub grouped_stream_local_instructions: u64,
124 pub grouped_fold_local_instructions: u64,
125 pub grouped_finalize_local_instructions: u64,
126 pub grouped_count_borrowed_hash_computations: u64,
127 pub grouped_count_bucket_candidate_checks: u64,
128 pub grouped_count_existing_group_hits: u64,
129 pub grouped_count_new_group_inserts: u64,
130 pub grouped_count_row_materialization_local_instructions: u64,
131 pub grouped_count_group_lookup_local_instructions: u64,
132 pub grouped_count_existing_group_update_local_instructions: u64,
133 pub grouped_count_new_group_insert_local_instructions: u64,
134 pub pure_covering_decode_local_instructions: u64,
135 pub pure_covering_row_assembly_local_instructions: u64,
136 pub store_get_calls: u64,
137 pub response_decode_local_instructions: u64,
138 pub execute_local_instructions: u64,
139 pub total_local_instructions: u64,
140 pub sql_compiled_command_cache_hits: u64,
141 pub sql_compiled_command_cache_misses: u64,
142 pub shared_query_plan_cache_hits: u64,
143 pub shared_query_plan_cache_misses: u64,
144}
145
146#[cfg(feature = "diagnostics")]
150#[derive(Clone, Copy, Debug, Eq, PartialEq)]
151pub(in crate::db) struct SqlExecutePhaseAttribution {
152 pub planner_local_instructions: u64,
153 pub store_local_instructions: u64,
154 pub executor_local_instructions: u64,
155 pub grouped_stream_local_instructions: u64,
156 pub grouped_fold_local_instructions: u64,
157 pub grouped_finalize_local_instructions: u64,
158 pub grouped_count: GroupedCountAttribution,
159}
160
161#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
173pub(in crate::db) struct SqlCompilePhaseAttribution {
174 pub cache_key: u64,
175 pub cache_lookup: u64,
176 pub parse: u64,
177 pub parse_tokenize: u64,
178 pub parse_select: u64,
179 pub parse_expr: u64,
180 pub parse_predicate: u64,
181 pub aggregate_lane_check: u64,
182 pub prepare: u64,
183 pub lower: u64,
184 pub bind: u64,
185 pub cache_insert: u64,
186}
187
188impl SqlCompilePhaseAttribution {
189 #[must_use]
190 const fn cache_hit(cache_key: u64, cache_lookup: u64) -> Self {
191 Self {
192 cache_key,
193 cache_lookup,
194 parse: 0,
195 parse_tokenize: 0,
196 parse_select: 0,
197 parse_expr: 0,
198 parse_predicate: 0,
199 aggregate_lane_check: 0,
200 prepare: 0,
201 lower: 0,
202 bind: 0,
203 cache_insert: 0,
204 }
205 }
206}
207
208#[cfg(feature = "diagnostics")]
209impl SqlExecutePhaseAttribution {
210 #[must_use]
211 pub(in crate::db) const fn from_execute_total_and_store_total(
212 execute_local_instructions: u64,
213 store_local_instructions: u64,
214 ) -> Self {
215 Self {
216 planner_local_instructions: 0,
217 store_local_instructions,
218 executor_local_instructions: execute_local_instructions
219 .saturating_sub(store_local_instructions),
220 grouped_stream_local_instructions: 0,
221 grouped_fold_local_instructions: 0,
222 grouped_finalize_local_instructions: 0,
223 grouped_count: GroupedCountAttribution::none(),
224 }
225 }
226}
227
228#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
232pub(in crate::db) struct SqlCacheAttribution {
233 pub sql_compiled_command_cache_hits: u64,
234 pub sql_compiled_command_cache_misses: u64,
235 pub shared_query_plan_cache_hits: u64,
236 pub shared_query_plan_cache_misses: u64,
237}
238
239impl SqlCacheAttribution {
240 #[must_use]
241 const fn none() -> Self {
242 Self {
243 sql_compiled_command_cache_hits: 0,
244 sql_compiled_command_cache_misses: 0,
245 shared_query_plan_cache_hits: 0,
246 shared_query_plan_cache_misses: 0,
247 }
248 }
249
250 #[must_use]
251 const fn sql_compiled_command_cache_hit() -> Self {
252 Self {
253 sql_compiled_command_cache_hits: 1,
254 ..Self::none()
255 }
256 }
257
258 #[must_use]
259 const fn sql_compiled_command_cache_miss() -> Self {
260 Self {
261 sql_compiled_command_cache_misses: 1,
262 ..Self::none()
263 }
264 }
265
266 #[must_use]
267 const fn from_shared_query_plan_cache(attribution: QueryPlanCacheAttribution) -> Self {
268 Self {
269 shared_query_plan_cache_hits: attribution.hits,
270 shared_query_plan_cache_misses: attribution.misses,
271 ..Self::none()
272 }
273 }
274
275 #[must_use]
276 const fn merge(self, other: Self) -> Self {
277 Self {
278 sql_compiled_command_cache_hits: self
279 .sql_compiled_command_cache_hits
280 .saturating_add(other.sql_compiled_command_cache_hits),
281 sql_compiled_command_cache_misses: self
282 .sql_compiled_command_cache_misses
283 .saturating_add(other.sql_compiled_command_cache_misses),
284 shared_query_plan_cache_hits: self
285 .shared_query_plan_cache_hits
286 .saturating_add(other.shared_query_plan_cache_hits),
287 shared_query_plan_cache_misses: self
288 .shared_query_plan_cache_misses
289 .saturating_add(other.shared_query_plan_cache_misses),
290 }
291 }
292}
293
294#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
295enum SqlCompiledCommandSurface {
296 Query,
297 Update,
298}
299
300#[derive(Clone, Debug, Eq, Hash, PartialEq)]
311pub(in crate::db) struct SqlCompiledCommandCacheKey {
312 cache_method_version: u8,
313 surface: SqlCompiledCommandSurface,
314 entity_path: &'static str,
315 schema_fingerprint: CommitSchemaFingerprint,
316 sql: String,
317}
318
319#[derive(Clone, Debug)]
329pub(in crate::db) struct SqlProjectionContract {
330 columns: Vec<String>,
331 fixed_scales: Vec<Option<u32>>,
332}
333
334impl SqlProjectionContract {
335 #[must_use]
336 pub(in crate::db) const fn new(columns: Vec<String>, fixed_scales: Vec<Option<u32>>) -> Self {
337 Self {
338 columns,
339 fixed_scales,
340 }
341 }
342
343 #[must_use]
344 pub(in crate::db) fn into_parts(self) -> (Vec<String>, Vec<Option<u32>>) {
345 (self.columns, self.fixed_scales)
346 }
347}
348
349impl SqlCompiledCommandCacheKey {
350 fn query_for_entity<E>(sql: &str) -> Self
351 where
352 E: PersistedRow + EntityValue,
353 {
354 Self::for_entity::<E>(SqlCompiledCommandSurface::Query, sql)
355 }
356
357 fn update_for_entity<E>(sql: &str) -> Self
358 where
359 E: PersistedRow + EntityValue,
360 {
361 Self::for_entity::<E>(SqlCompiledCommandSurface::Update, sql)
362 }
363
364 fn for_entity<E>(surface: SqlCompiledCommandSurface, sql: &str) -> Self
365 where
366 E: PersistedRow + EntityValue,
367 {
368 Self {
369 cache_method_version: SQL_COMPILED_COMMAND_CACHE_METHOD_VERSION,
370 surface,
371 entity_path: E::PATH,
372 schema_fingerprint: commit_schema_fingerprint_for_entity::<E>(),
373 sql: sql.to_string(),
374 }
375 }
376
377 #[must_use]
378 pub(in crate::db) const fn schema_fingerprint(&self) -> CommitSchemaFingerprint {
379 self.schema_fingerprint
380 }
381}
382
383#[cfg(test)]
384impl SqlCompiledCommandCacheKey {
385 pub(in crate::db) fn query_for_entity_with_method_version<E>(
386 sql: &str,
387 cache_method_version: u8,
388 ) -> Self
389 where
390 E: PersistedRow + EntityValue,
391 {
392 Self::for_entity_with_method_version::<E>(
393 SqlCompiledCommandSurface::Query,
394 sql,
395 cache_method_version,
396 )
397 }
398
399 pub(in crate::db) fn update_for_entity_with_method_version<E>(
400 sql: &str,
401 cache_method_version: u8,
402 ) -> Self
403 where
404 E: PersistedRow + EntityValue,
405 {
406 Self::for_entity_with_method_version::<E>(
407 SqlCompiledCommandSurface::Update,
408 sql,
409 cache_method_version,
410 )
411 }
412
413 fn for_entity_with_method_version<E>(
414 surface: SqlCompiledCommandSurface,
415 sql: &str,
416 cache_method_version: u8,
417 ) -> Self
418 where
419 E: PersistedRow + EntityValue,
420 {
421 Self {
422 cache_method_version,
423 surface,
424 entity_path: E::PATH,
425 schema_fingerprint: commit_schema_fingerprint_for_entity::<E>(),
426 sql: sql.to_string(),
427 }
428 }
429}
430
431pub(in crate::db) type SqlCompiledCommandCache =
432 HashMap<SqlCompiledCommandCacheKey, CompiledSqlCommand, CacheBuildHasher>;
433
434thread_local! {
435 static SQL_COMPILED_COMMAND_CACHES: RefCell<HashMap<usize, SqlCompiledCommandCache, CacheBuildHasher>> =
439 RefCell::new(HashMap::default());
440}
441
442#[derive(Clone, Debug)]
446pub(in crate::db) enum CompiledSqlCommand {
447 Select {
448 query: Arc<StructuralQuery>,
449 compiled_cache_key: SqlCompiledCommandCacheKey,
450 },
451 Delete {
452 query: LoweredBaseQueryShape,
453 statement: SqlDeleteStatement,
454 },
455 GlobalAggregate {
456 command: SqlGlobalAggregateCommandCore,
457 },
458 Explain(LoweredSqlCommand),
459 Insert(SqlInsertStatement),
460 Update(SqlUpdateStatement),
461 DescribeEntity,
462 ShowIndexesEntity,
463 ShowColumnsEntity,
464 ShowEntities,
465}
466
467impl CompiledSqlCommand {
468 fn new_select(query: StructuralQuery, compiled_cache_key: SqlCompiledCommandCacheKey) -> Self {
469 Self::Select {
470 query: Arc::new(query),
471 compiled_cache_key,
472 }
473 }
474}
475
476#[cfg(test)]
479pub(in crate::db) fn parse_sql_statement(sql: &str) -> Result<SqlStatement, QueryError> {
480 parse_sql(sql).map_err(QueryError::from_sql_parse_error)
481}
482
483fn parse_sql_statement_with_attribution(
487 sql: &str,
488) -> Result<
489 (
490 SqlStatement,
491 crate::db::sql::parser::SqlParsePhaseAttribution,
492 ),
493 QueryError,
494> {
495 parse_sql_with_attribution(sql).map_err(QueryError::from_sql_parse_error)
496}
497
498#[cfg(feature = "diagnostics")]
499#[expect(
500 clippy::missing_const_for_fn,
501 reason = "the wasm32 branch reads the runtime performance counter and cannot be const"
502)]
503fn read_sql_local_instruction_counter() -> u64 {
504 #[cfg(all(feature = "diagnostics", target_arch = "wasm32"))]
505 {
506 canic_cdk::api::performance_counter(1)
507 }
508
509 #[cfg(not(all(feature = "diagnostics", target_arch = "wasm32")))]
510 {
511 0
512 }
513}
514
515pub(in crate::db::session::sql) fn measure_sql_stage<T, E>(
516 run: impl FnOnce() -> Result<T, E>,
517) -> (u64, Result<T, E>) {
518 #[cfg(feature = "diagnostics")]
519 let start = read_sql_local_instruction_counter();
520
521 let result = run();
522
523 #[cfg(feature = "diagnostics")]
524 let delta = read_sql_local_instruction_counter().saturating_sub(start);
525
526 #[cfg(not(feature = "diagnostics"))]
527 let delta = 0;
528
529 (delta, result)
530}
531
532impl<C: CanisterKind> DbSession<C> {
533 fn sql_cache_scope_id(&self) -> usize {
534 self.db.cache_scope_id()
535 }
536
537 fn with_sql_compiled_command_cache<R>(
538 &self,
539 f: impl FnOnce(&mut SqlCompiledCommandCache) -> R,
540 ) -> R {
541 let scope_id = self.sql_cache_scope_id();
542
543 SQL_COMPILED_COMMAND_CACHES.with(|caches| {
544 let mut caches = caches.borrow_mut();
545 let cache = caches.entry(scope_id).or_default();
546
547 f(cache)
548 })
549 }
550
551 #[cfg(test)]
552 pub(in crate::db) fn sql_compiled_command_cache_len(&self) -> usize {
553 self.with_sql_compiled_command_cache(|cache| cache.len())
554 }
555
556 #[cfg(test)]
557 pub(in crate::db) fn clear_sql_caches_for_tests(&self) {
558 self.with_sql_compiled_command_cache(SqlCompiledCommandCache::clear);
559 }
560
561 fn sql_select_projection_contract_from_shared_prepared_plan(
565 authority: EntityAuthority,
566 prepared_plan: &SharedPreparedExecutionPlan,
567 ) -> SqlProjectionContract {
568 let projection = prepared_plan
569 .logical_plan()
570 .projection_spec(authority.model());
571 let columns = projection_labels_from_projection_spec(&projection);
572 let fixed_scales = projection_fixed_scales_from_projection_spec(&projection);
573
574 SqlProjectionContract::new(columns, fixed_scales)
575 }
576
577 fn sql_select_prepared_plan_from_shared_cache(
580 &self,
581 query: &StructuralQuery,
582 authority: EntityAuthority,
583 cache_schema_fingerprint: CommitSchemaFingerprint,
584 ) -> Result<
585 (
586 SharedPreparedExecutionPlan,
587 SqlProjectionContract,
588 QueryPlanCacheAttribution,
589 ),
590 QueryError,
591 > {
592 let (prepared_plan, cache_attribution) = self.cached_shared_query_plan_for_authority(
593 authority,
594 cache_schema_fingerprint,
595 query,
596 )?;
597
598 Ok((
599 prepared_plan.clone(),
600 Self::sql_select_projection_contract_from_shared_prepared_plan(
601 authority,
602 &prepared_plan,
603 ),
604 cache_attribution,
605 ))
606 }
607
608 fn sql_select_prepared_plan_without_compiled_cache(
611 &self,
612 query: &StructuralQuery,
613 authority: EntityAuthority,
614 ) -> Result<
615 (
616 SharedPreparedExecutionPlan,
617 SqlProjectionContract,
618 SqlCacheAttribution,
619 ),
620 QueryError,
621 > {
622 let cache_schema_fingerprint = crate::db::schema::commit_schema_fingerprint_for_model(
623 authority.model().path,
624 authority.model(),
625 );
626 let (prepared_plan, projection, cache_attribution) = self
627 .sql_select_prepared_plan_from_shared_cache(
628 query,
629 authority,
630 cache_schema_fingerprint,
631 )?;
632
633 Ok((
634 prepared_plan,
635 projection,
636 SqlCacheAttribution::from_shared_query_plan_cache(cache_attribution),
637 ))
638 }
639
640 fn sql_select_prepared_plan_with_compiled_cache(
643 &self,
644 query: &StructuralQuery,
645 authority: EntityAuthority,
646 cache_schema_fingerprint: CommitSchemaFingerprint,
647 ) -> Result<
648 (
649 SharedPreparedExecutionPlan,
650 SqlProjectionContract,
651 SqlCacheAttribution,
652 ),
653 QueryError,
654 > {
655 let (prepared_plan, projection, cache_attribution) = self
656 .sql_select_prepared_plan_from_shared_cache(
657 query,
658 authority,
659 cache_schema_fingerprint,
660 )?;
661
662 Ok((
663 prepared_plan,
664 projection,
665 SqlCacheAttribution::from_shared_query_plan_cache(cache_attribution),
666 ))
667 }
668
669 pub(in crate::db::session::sql) fn build_structural_plan_with_visible_indexes_for_authority(
672 &self,
673 query: StructuralQuery,
674 authority: EntityAuthority,
675 ) -> Result<(VisibleIndexes<'_>, AccessPlannedQuery), QueryError> {
676 let visible_indexes =
677 self.visible_indexes_for_store_model(authority.store_path(), authority.model())?;
678 let plan = query.build_plan_with_visible_indexes(&visible_indexes)?;
679
680 Ok((visible_indexes, plan))
681 }
682
683 fn ensure_sql_query_statement_supported(statement: &SqlStatement) -> Result<(), QueryError> {
686 match statement {
687 SqlStatement::Select(_)
688 | SqlStatement::Explain(_)
689 | SqlStatement::Describe(_)
690 | SqlStatement::ShowIndexes(_)
691 | SqlStatement::ShowColumns(_)
692 | SqlStatement::ShowEntities(_) => Ok(()),
693 SqlStatement::Insert(_) => Err(QueryError::unsupported_query(
694 "execute_sql_query rejects INSERT; use execute_sql_update::<E>()",
695 )),
696 SqlStatement::Update(_) => Err(QueryError::unsupported_query(
697 "execute_sql_query rejects UPDATE; use execute_sql_update::<E>()",
698 )),
699 SqlStatement::Delete(_) => Err(QueryError::unsupported_query(
700 "execute_sql_query rejects DELETE; use execute_sql_update::<E>()",
701 )),
702 }
703 }
704
705 fn ensure_sql_update_statement_supported(statement: &SqlStatement) -> Result<(), QueryError> {
708 match statement {
709 SqlStatement::Insert(_) | SqlStatement::Update(_) | SqlStatement::Delete(_) => Ok(()),
710 SqlStatement::Select(_) => Err(QueryError::unsupported_query(
711 "execute_sql_update rejects SELECT; use execute_sql_query::<E>()",
712 )),
713 SqlStatement::Explain(_) => Err(QueryError::unsupported_query(
714 "execute_sql_update rejects EXPLAIN; use execute_sql_query::<E>()",
715 )),
716 SqlStatement::Describe(_) => Err(QueryError::unsupported_query(
717 "execute_sql_update rejects DESCRIBE; use execute_sql_query::<E>()",
718 )),
719 SqlStatement::ShowIndexes(_) => Err(QueryError::unsupported_query(
720 "execute_sql_update rejects SHOW INDEXES; use execute_sql_query::<E>()",
721 )),
722 SqlStatement::ShowColumns(_) => Err(QueryError::unsupported_query(
723 "execute_sql_update rejects SHOW COLUMNS; use execute_sql_query::<E>()",
724 )),
725 SqlStatement::ShowEntities(_) => Err(QueryError::unsupported_query(
726 "execute_sql_update rejects SHOW ENTITIES; use execute_sql_query::<E>()",
727 )),
728 }
729 }
730
731 pub fn execute_sql_query<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
736 where
737 E: PersistedRow<Canister = C> + EntityValue,
738 {
739 let compiled = self.compile_sql_query::<E>(sql)?;
740
741 self.execute_compiled_sql::<E>(&compiled)
742 }
743
744 #[cfg(feature = "diagnostics")]
747 #[doc(hidden)]
748 pub fn execute_sql_query_with_attribution<E>(
749 &self,
750 sql: &str,
751 ) -> Result<(SqlStatementResult, SqlQueryExecutionAttribution), QueryError>
752 where
753 E: PersistedRow<Canister = C> + EntityValue,
754 {
755 let (compile_local_instructions, compiled) =
758 measure_sql_stage(|| self.compile_sql_query_with_cache_attribution::<E>(sql));
759 let (compiled, compile_cache_attribution, compile_phase_attribution) = compiled?;
760
761 let store_get_calls_before = DataStore::current_get_call_count();
764 let pure_covering_decode_before = current_pure_covering_decode_local_instructions();
765 let pure_covering_row_assembly_before =
766 current_pure_covering_row_assembly_local_instructions();
767 let (result, execute_cache_attribution, execute_phase_attribution) =
768 self.execute_compiled_sql_with_phase_attribution::<E>(&compiled)?;
769 let store_get_calls =
770 DataStore::current_get_call_count().saturating_sub(store_get_calls_before);
771 let pure_covering_decode_local_instructions =
772 current_pure_covering_decode_local_instructions()
773 .saturating_sub(pure_covering_decode_before);
774 let pure_covering_row_assembly_local_instructions =
775 current_pure_covering_row_assembly_local_instructions()
776 .saturating_sub(pure_covering_row_assembly_before);
777 let execute_local_instructions = execute_phase_attribution
778 .planner_local_instructions
779 .saturating_add(execute_phase_attribution.store_local_instructions)
780 .saturating_add(execute_phase_attribution.executor_local_instructions);
781 let cache_attribution = compile_cache_attribution.merge(execute_cache_attribution);
782 let total_local_instructions =
783 compile_local_instructions.saturating_add(execute_local_instructions);
784
785 Ok((
786 result,
787 SqlQueryExecutionAttribution {
788 compile_local_instructions,
789 compile_cache_key_local_instructions: compile_phase_attribution.cache_key,
790 compile_cache_lookup_local_instructions: compile_phase_attribution.cache_lookup,
791 compile_parse_local_instructions: compile_phase_attribution.parse,
792 compile_parse_tokenize_local_instructions: compile_phase_attribution.parse_tokenize,
793 compile_parse_select_local_instructions: compile_phase_attribution.parse_select,
794 compile_parse_expr_local_instructions: compile_phase_attribution.parse_expr,
795 compile_parse_predicate_local_instructions: compile_phase_attribution
796 .parse_predicate,
797 compile_aggregate_lane_check_local_instructions: compile_phase_attribution
798 .aggregate_lane_check,
799 compile_prepare_local_instructions: compile_phase_attribution.prepare,
800 compile_lower_local_instructions: compile_phase_attribution.lower,
801 compile_bind_local_instructions: compile_phase_attribution.bind,
802 compile_cache_insert_local_instructions: compile_phase_attribution.cache_insert,
803 planner_local_instructions: execute_phase_attribution.planner_local_instructions,
804 store_local_instructions: execute_phase_attribution.store_local_instructions,
805 executor_local_instructions: execute_phase_attribution.executor_local_instructions,
806 grouped_stream_local_instructions: execute_phase_attribution
807 .grouped_stream_local_instructions,
808 grouped_fold_local_instructions: execute_phase_attribution
809 .grouped_fold_local_instructions,
810 grouped_finalize_local_instructions: execute_phase_attribution
811 .grouped_finalize_local_instructions,
812 grouped_count_borrowed_hash_computations: execute_phase_attribution
813 .grouped_count
814 .borrowed_hash_computations,
815 grouped_count_bucket_candidate_checks: execute_phase_attribution
816 .grouped_count
817 .bucket_candidate_checks,
818 grouped_count_existing_group_hits: execute_phase_attribution
819 .grouped_count
820 .existing_group_hits,
821 grouped_count_new_group_inserts: execute_phase_attribution
822 .grouped_count
823 .new_group_inserts,
824 grouped_count_row_materialization_local_instructions: execute_phase_attribution
825 .grouped_count
826 .row_materialization_local_instructions,
827 grouped_count_group_lookup_local_instructions: execute_phase_attribution
828 .grouped_count
829 .group_lookup_local_instructions,
830 grouped_count_existing_group_update_local_instructions: execute_phase_attribution
831 .grouped_count
832 .existing_group_update_local_instructions,
833 grouped_count_new_group_insert_local_instructions: execute_phase_attribution
834 .grouped_count
835 .new_group_insert_local_instructions,
836 pure_covering_decode_local_instructions,
837 pure_covering_row_assembly_local_instructions,
838 store_get_calls,
839 response_decode_local_instructions: 0,
840 execute_local_instructions,
841 total_local_instructions,
842 sql_compiled_command_cache_hits: cache_attribution.sql_compiled_command_cache_hits,
843 sql_compiled_command_cache_misses: cache_attribution
844 .sql_compiled_command_cache_misses,
845 shared_query_plan_cache_hits: cache_attribution.shared_query_plan_cache_hits,
846 shared_query_plan_cache_misses: cache_attribution.shared_query_plan_cache_misses,
847 },
848 ))
849 }
850
851 pub fn execute_sql_update<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
856 where
857 E: PersistedRow<Canister = C> + EntityValue,
858 {
859 let compiled = self.compile_sql_update::<E>(sql)?;
860
861 self.execute_compiled_sql::<E>(&compiled)
862 }
863
864 pub(in crate::db) fn compile_sql_query<E>(
867 &self,
868 sql: &str,
869 ) -> Result<CompiledSqlCommand, QueryError>
870 where
871 E: PersistedRow<Canister = C> + EntityValue,
872 {
873 self.compile_sql_query_with_cache_attribution::<E>(sql)
874 .map(|(compiled, _, _)| compiled)
875 }
876
877 fn compile_sql_query_with_cache_attribution<E>(
878 &self,
879 sql: &str,
880 ) -> Result<
881 (
882 CompiledSqlCommand,
883 SqlCacheAttribution,
884 SqlCompilePhaseAttribution,
885 ),
886 QueryError,
887 >
888 where
889 E: PersistedRow<Canister = C> + EntityValue,
890 {
891 let (cache_key_local_instructions, cache_key) = measure_sql_stage(|| {
892 Ok::<_, QueryError>(SqlCompiledCommandCacheKey::query_for_entity::<E>(sql))
893 });
894 let cache_key = cache_key?;
895
896 self.compile_sql_statement_with_cache::<E>(
897 cache_key,
898 cache_key_local_instructions,
899 sql,
900 Self::ensure_sql_query_statement_supported,
901 )
902 }
903
904 pub(in crate::db) fn compile_sql_update<E>(
907 &self,
908 sql: &str,
909 ) -> Result<CompiledSqlCommand, QueryError>
910 where
911 E: PersistedRow<Canister = C> + EntityValue,
912 {
913 self.compile_sql_update_with_cache_attribution::<E>(sql)
914 .map(|(compiled, _, _)| compiled)
915 }
916
917 fn compile_sql_update_with_cache_attribution<E>(
918 &self,
919 sql: &str,
920 ) -> Result<
921 (
922 CompiledSqlCommand,
923 SqlCacheAttribution,
924 SqlCompilePhaseAttribution,
925 ),
926 QueryError,
927 >
928 where
929 E: PersistedRow<Canister = C> + EntityValue,
930 {
931 let (cache_key_local_instructions, cache_key) = measure_sql_stage(|| {
932 Ok::<_, QueryError>(SqlCompiledCommandCacheKey::update_for_entity::<E>(sql))
933 });
934 let cache_key = cache_key?;
935
936 self.compile_sql_statement_with_cache::<E>(
937 cache_key,
938 cache_key_local_instructions,
939 sql,
940 Self::ensure_sql_update_statement_supported,
941 )
942 }
943
944 fn compile_sql_statement_with_cache<E>(
947 &self,
948 cache_key: SqlCompiledCommandCacheKey,
949 cache_key_local_instructions: u64,
950 sql: &str,
951 ensure_surface_supported: fn(&SqlStatement) -> Result<(), QueryError>,
952 ) -> Result<
953 (
954 CompiledSqlCommand,
955 SqlCacheAttribution,
956 SqlCompilePhaseAttribution,
957 ),
958 QueryError,
959 >
960 where
961 E: PersistedRow<Canister = C> + EntityValue,
962 {
963 let (cache_lookup_local_instructions, cached) = measure_sql_stage(|| {
964 let cached =
965 self.with_sql_compiled_command_cache(|cache| cache.get(&cache_key).cloned());
966 Ok::<_, QueryError>(cached)
967 });
968 let cached = cached?;
969 if let Some(compiled) = cached {
970 return Ok((
971 compiled,
972 SqlCacheAttribution::sql_compiled_command_cache_hit(),
973 SqlCompilePhaseAttribution::cache_hit(
974 cache_key_local_instructions,
975 cache_lookup_local_instructions,
976 ),
977 ));
978 }
979
980 let (parse_local_instructions, parsed) =
981 measure_sql_stage(|| parse_sql_statement_with_attribution(sql));
982 let (parsed, parse_attribution) = parsed?;
983 let parse_select_local_instructions = parse_local_instructions
984 .saturating_sub(parse_attribution.tokenize)
985 .saturating_sub(parse_attribution.expr)
986 .saturating_sub(parse_attribution.predicate);
987 ensure_surface_supported(&parsed)?;
988 let authority = EntityAuthority::for_type::<E>();
989 let (
990 compiled,
991 aggregate_lane_check_local_instructions,
992 prepare_local_instructions,
993 lower_local_instructions,
994 bind_local_instructions,
995 ) = Self::compile_sql_statement_for_authority(&parsed, authority, cache_key.clone())?;
996
997 let (cache_insert_local_instructions, cache_insert) = measure_sql_stage(|| {
998 self.with_sql_compiled_command_cache(|cache| {
999 cache.insert(cache_key, compiled.clone());
1000 });
1001 Ok::<_, QueryError>(())
1002 });
1003 cache_insert?;
1004
1005 Ok((
1006 compiled,
1007 SqlCacheAttribution::sql_compiled_command_cache_miss(),
1008 SqlCompilePhaseAttribution {
1009 cache_key: cache_key_local_instructions,
1010 cache_lookup: cache_lookup_local_instructions,
1011 parse: parse_local_instructions,
1012 parse_tokenize: parse_attribution.tokenize,
1013 parse_select: parse_select_local_instructions,
1014 parse_expr: parse_attribution.expr,
1015 parse_predicate: parse_attribution.predicate,
1016 aggregate_lane_check: aggregate_lane_check_local_instructions,
1017 prepare: prepare_local_instructions,
1018 lower: lower_local_instructions,
1019 bind: bind_local_instructions,
1020 cache_insert: cache_insert_local_instructions,
1021 },
1022 ))
1023 }
1024}