1mod execute;
8mod explain;
9mod projection;
10
11#[cfg(feature = "diagnostics")]
12use candid::CandidType;
13use icydb_utils::Xxh3;
14#[cfg(feature = "diagnostics")]
15use serde::Deserialize;
16use std::{cell::RefCell, collections::HashMap, hash::BuildHasherDefault, sync::Arc};
17
18type CacheBuildHasher = BuildHasherDefault<Xxh3>;
19
20const SQL_COMPILED_COMMAND_CACHE_METHOD_VERSION: u8 = 1;
23
24#[cfg(feature = "diagnostics")]
25use crate::db::DataStore;
26#[cfg(feature = "diagnostics")]
27use crate::db::executor::GroupedCountAttribution;
28#[cfg(feature = "diagnostics")]
29use crate::db::session::sql::projection::{
30 current_pure_covering_decode_local_instructions,
31 current_pure_covering_row_assembly_local_instructions,
32};
33#[cfg(test)]
34use crate::db::sql::parser::parse_sql;
35use crate::db::sql::parser::{SqlDeleteStatement, SqlInsertStatement, SqlUpdateStatement};
36use crate::{
37 db::{
38 DbSession, GroupedRow, PersistedRow, QueryError,
39 commit::CommitSchemaFingerprint,
40 executor::{EntityAuthority, SharedPreparedExecutionPlan},
41 query::{
42 intent::StructuralQuery,
43 plan::{AccessPlannedQuery, VisibleIndexes},
44 },
45 schema::commit_schema_fingerprint_for_entity,
46 session::query::QueryPlanCacheAttribution,
47 session::sql::projection::{
48 projection_fixed_scales_from_projection_spec, projection_labels_from_projection_spec,
49 },
50 sql::lowering::{LoweredBaseQueryShape, LoweredSqlCommand, SqlGlobalAggregateCommandCore},
51 sql::parser::{SqlStatement, parse_sql_with_attribution},
52 },
53 traits::{CanisterKind, EntityValue},
54};
55
56#[cfg(all(test, not(feature = "diagnostics")))]
57pub(crate) use crate::db::session::sql::projection::with_sql_projection_materialization_metrics;
58#[cfg(feature = "diagnostics")]
59pub use crate::db::session::sql::projection::{
60 SqlProjectionMaterializationMetrics, with_sql_projection_materialization_metrics,
61};
62
63#[derive(Debug)]
65pub enum SqlStatementResult {
66 Count {
67 row_count: u32,
68 },
69 Projection {
70 columns: Vec<String>,
71 fixed_scales: Vec<Option<u32>>,
72 rows: Vec<Vec<crate::value::Value>>,
73 row_count: u32,
74 },
75 ProjectionText {
76 columns: Vec<String>,
77 rows: Vec<Vec<String>>,
78 row_count: u32,
79 },
80 Grouped {
81 columns: Vec<String>,
82 fixed_scales: Vec<Option<u32>>,
83 rows: Vec<GroupedRow>,
84 row_count: u32,
85 next_cursor: Option<String>,
86 },
87 Explain(String),
88 Describe(crate::db::EntitySchemaDescription),
89 ShowIndexes(Vec<String>),
90 ShowColumns(Vec<crate::db::EntityFieldDescription>),
91 ShowEntities(Vec<String>),
92}
93
94#[cfg(feature = "diagnostics")]
105#[derive(CandidType, Clone, Debug, Deserialize, Eq, PartialEq)]
106pub struct SqlQueryExecutionAttribution {
107 pub compile_local_instructions: u64,
108 pub compile_cache_key_local_instructions: u64,
109 pub compile_cache_lookup_local_instructions: u64,
110 pub compile_parse_local_instructions: u64,
111 pub compile_parse_tokenize_local_instructions: u64,
112 pub compile_parse_select_local_instructions: u64,
113 pub compile_parse_expr_local_instructions: u64,
114 pub compile_parse_predicate_local_instructions: u64,
115 pub compile_aggregate_lane_check_local_instructions: u64,
116 pub compile_prepare_local_instructions: u64,
117 pub compile_lower_local_instructions: u64,
118 pub compile_bind_local_instructions: u64,
119 pub compile_cache_insert_local_instructions: u64,
120 pub planner_local_instructions: u64,
121 pub store_local_instructions: u64,
122 pub executor_local_instructions: u64,
123 pub grouped_stream_local_instructions: u64,
124 pub grouped_fold_local_instructions: u64,
125 pub grouped_finalize_local_instructions: u64,
126 pub grouped_count_borrowed_hash_computations: u64,
127 pub grouped_count_bucket_candidate_checks: u64,
128 pub grouped_count_existing_group_hits: u64,
129 pub grouped_count_new_group_inserts: u64,
130 pub grouped_count_row_materialization_local_instructions: u64,
131 pub grouped_count_group_lookup_local_instructions: u64,
132 pub grouped_count_existing_group_update_local_instructions: u64,
133 pub grouped_count_new_group_insert_local_instructions: u64,
134 pub pure_covering_decode_local_instructions: u64,
135 pub pure_covering_row_assembly_local_instructions: u64,
136 pub store_get_calls: u64,
137 pub response_decode_local_instructions: u64,
138 pub execute_local_instructions: u64,
139 pub total_local_instructions: u64,
140 pub sql_compiled_command_cache_hits: u64,
141 pub sql_compiled_command_cache_misses: u64,
142 pub shared_query_plan_cache_hits: u64,
143 pub shared_query_plan_cache_misses: u64,
144}
145
146#[cfg(feature = "diagnostics")]
150#[derive(Clone, Copy, Debug, Eq, PartialEq)]
151pub(in crate::db) struct SqlExecutePhaseAttribution {
152 pub planner_local_instructions: u64,
153 pub store_local_instructions: u64,
154 pub executor_local_instructions: u64,
155 pub grouped_stream_local_instructions: u64,
156 pub grouped_fold_local_instructions: u64,
157 pub grouped_finalize_local_instructions: u64,
158 pub grouped_count: GroupedCountAttribution,
159}
160
161#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
173pub(in crate::db) struct SqlCompilePhaseAttribution {
174 pub cache_key: u64,
175 pub cache_lookup: u64,
176 pub parse: u64,
177 pub parse_tokenize: u64,
178 pub parse_select: u64,
179 pub parse_expr: u64,
180 pub parse_predicate: u64,
181 pub aggregate_lane_check: u64,
182 pub prepare: u64,
183 pub lower: u64,
184 pub bind: u64,
185 pub cache_insert: u64,
186}
187
188impl SqlCompilePhaseAttribution {
189 #[must_use]
190 const fn cache_hit(cache_key: u64, cache_lookup: u64) -> Self {
191 Self {
192 cache_key,
193 cache_lookup,
194 parse: 0,
195 parse_tokenize: 0,
196 parse_select: 0,
197 parse_expr: 0,
198 parse_predicate: 0,
199 aggregate_lane_check: 0,
200 prepare: 0,
201 lower: 0,
202 bind: 0,
203 cache_insert: 0,
204 }
205 }
206}
207
208#[cfg(feature = "diagnostics")]
209impl SqlExecutePhaseAttribution {
210 #[must_use]
211 pub(in crate::db) const fn from_execute_total_and_store_total(
212 execute_local_instructions: u64,
213 store_local_instructions: u64,
214 ) -> Self {
215 Self {
216 planner_local_instructions: 0,
217 store_local_instructions,
218 executor_local_instructions: execute_local_instructions
219 .saturating_sub(store_local_instructions),
220 grouped_stream_local_instructions: 0,
221 grouped_fold_local_instructions: 0,
222 grouped_finalize_local_instructions: 0,
223 grouped_count: GroupedCountAttribution::none(),
224 }
225 }
226}
227
228#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
232pub(in crate::db) struct SqlCacheAttribution {
233 pub sql_compiled_command_cache_hits: u64,
234 pub sql_compiled_command_cache_misses: u64,
235 pub shared_query_plan_cache_hits: u64,
236 pub shared_query_plan_cache_misses: u64,
237}
238
239impl SqlCacheAttribution {
240 #[must_use]
241 const fn none() -> Self {
242 Self {
243 sql_compiled_command_cache_hits: 0,
244 sql_compiled_command_cache_misses: 0,
245 shared_query_plan_cache_hits: 0,
246 shared_query_plan_cache_misses: 0,
247 }
248 }
249
250 #[must_use]
251 const fn sql_compiled_command_cache_hit() -> Self {
252 Self {
253 sql_compiled_command_cache_hits: 1,
254 ..Self::none()
255 }
256 }
257
258 #[must_use]
259 const fn sql_compiled_command_cache_miss() -> Self {
260 Self {
261 sql_compiled_command_cache_misses: 1,
262 ..Self::none()
263 }
264 }
265
266 #[must_use]
267 const fn from_shared_query_plan_cache(attribution: QueryPlanCacheAttribution) -> Self {
268 Self {
269 shared_query_plan_cache_hits: attribution.hits,
270 shared_query_plan_cache_misses: attribution.misses,
271 ..Self::none()
272 }
273 }
274
275 #[must_use]
276 const fn merge(self, other: Self) -> Self {
277 Self {
278 sql_compiled_command_cache_hits: self
279 .sql_compiled_command_cache_hits
280 .saturating_add(other.sql_compiled_command_cache_hits),
281 sql_compiled_command_cache_misses: self
282 .sql_compiled_command_cache_misses
283 .saturating_add(other.sql_compiled_command_cache_misses),
284 shared_query_plan_cache_hits: self
285 .shared_query_plan_cache_hits
286 .saturating_add(other.shared_query_plan_cache_hits),
287 shared_query_plan_cache_misses: self
288 .shared_query_plan_cache_misses
289 .saturating_add(other.shared_query_plan_cache_misses),
290 }
291 }
292}
293
294#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
295enum SqlCompiledCommandSurface {
296 Query,
297 Update,
298}
299
300#[derive(Clone, Debug, Eq, Hash, PartialEq)]
311pub(in crate::db) struct SqlCompiledCommandCacheKey {
312 cache_method_version: u8,
313 surface: SqlCompiledCommandSurface,
314 entity_path: &'static str,
315 schema_fingerprint: CommitSchemaFingerprint,
316 sql: String,
317}
318
319#[derive(Clone, Debug)]
329pub(in crate::db) struct SqlProjectionContract {
330 columns: Vec<String>,
331 fixed_scales: Vec<Option<u32>>,
332}
333
334impl SqlProjectionContract {
335 #[must_use]
336 pub(in crate::db) const fn new(columns: Vec<String>, fixed_scales: Vec<Option<u32>>) -> Self {
337 Self {
338 columns,
339 fixed_scales,
340 }
341 }
342
343 #[must_use]
344 pub(in crate::db) fn into_parts(self) -> (Vec<String>, Vec<Option<u32>>) {
345 (self.columns, self.fixed_scales)
346 }
347}
348
349impl SqlCompiledCommandCacheKey {
350 fn query_for_entity<E>(sql: &str) -> Self
351 where
352 E: PersistedRow + EntityValue,
353 {
354 Self::for_entity::<E>(SqlCompiledCommandSurface::Query, sql)
355 }
356
357 fn update_for_entity<E>(sql: &str) -> Self
358 where
359 E: PersistedRow + EntityValue,
360 {
361 Self::for_entity::<E>(SqlCompiledCommandSurface::Update, sql)
362 }
363
364 fn for_entity<E>(surface: SqlCompiledCommandSurface, sql: &str) -> Self
365 where
366 E: PersistedRow + EntityValue,
367 {
368 Self {
369 cache_method_version: SQL_COMPILED_COMMAND_CACHE_METHOD_VERSION,
370 surface,
371 entity_path: E::PATH,
372 schema_fingerprint: commit_schema_fingerprint_for_entity::<E>(),
373 sql: sql.to_string(),
374 }
375 }
376
377 #[must_use]
378 pub(in crate::db) const fn schema_fingerprint(&self) -> CommitSchemaFingerprint {
379 self.schema_fingerprint
380 }
381}
382
383#[cfg(test)]
384impl SqlCompiledCommandCacheKey {
385 pub(in crate::db) fn query_for_entity_with_method_version<E>(
386 sql: &str,
387 cache_method_version: u8,
388 ) -> Self
389 where
390 E: PersistedRow + EntityValue,
391 {
392 Self::for_entity_with_method_version::<E>(
393 SqlCompiledCommandSurface::Query,
394 sql,
395 cache_method_version,
396 )
397 }
398
399 pub(in crate::db) fn update_for_entity_with_method_version<E>(
400 sql: &str,
401 cache_method_version: u8,
402 ) -> Self
403 where
404 E: PersistedRow + EntityValue,
405 {
406 Self::for_entity_with_method_version::<E>(
407 SqlCompiledCommandSurface::Update,
408 sql,
409 cache_method_version,
410 )
411 }
412
413 fn for_entity_with_method_version<E>(
414 surface: SqlCompiledCommandSurface,
415 sql: &str,
416 cache_method_version: u8,
417 ) -> Self
418 where
419 E: PersistedRow + EntityValue,
420 {
421 Self {
422 cache_method_version,
423 surface,
424 entity_path: E::PATH,
425 schema_fingerprint: commit_schema_fingerprint_for_entity::<E>(),
426 sql: sql.to_string(),
427 }
428 }
429}
430
431pub(in crate::db) type SqlCompiledCommandCache =
432 HashMap<SqlCompiledCommandCacheKey, CompiledSqlCommand, CacheBuildHasher>;
433
434thread_local! {
435 static SQL_COMPILED_COMMAND_CACHES: RefCell<HashMap<usize, SqlCompiledCommandCache, CacheBuildHasher>> =
439 RefCell::new(HashMap::default());
440}
441
442#[derive(Clone, Debug)]
446pub(in crate::db) enum CompiledSqlCommand {
447 Select {
448 query: Arc<StructuralQuery>,
449 compiled_cache_key: SqlCompiledCommandCacheKey,
450 },
451 Delete {
452 query: LoweredBaseQueryShape,
453 statement: SqlDeleteStatement,
454 },
455 GlobalAggregate {
456 command: SqlGlobalAggregateCommandCore,
457 },
458 Explain(LoweredSqlCommand),
459 Insert(SqlInsertStatement),
460 Update(SqlUpdateStatement),
461 DescribeEntity,
462 ShowIndexesEntity,
463 ShowColumnsEntity,
464 ShowEntities,
465}
466
467impl CompiledSqlCommand {
468 fn new_select(query: StructuralQuery, compiled_cache_key: SqlCompiledCommandCacheKey) -> Self {
469 Self::Select {
470 query: Arc::new(query),
471 compiled_cache_key,
472 }
473 }
474}
475
476#[cfg(test)]
479pub(in crate::db) fn parse_sql_statement(sql: &str) -> Result<SqlStatement, QueryError> {
480 parse_sql(sql).map_err(QueryError::from_sql_parse_error)
481}
482
483fn parse_sql_statement_with_attribution(
487 sql: &str,
488) -> Result<
489 (
490 SqlStatement,
491 crate::db::sql::parser::SqlParsePhaseAttribution,
492 ),
493 QueryError,
494> {
495 parse_sql_with_attribution(sql).map_err(QueryError::from_sql_parse_error)
496}
497
498#[cfg(feature = "diagnostics")]
499#[expect(
500 clippy::missing_const_for_fn,
501 reason = "the wasm32 branch reads the runtime performance counter and cannot be const"
502)]
503fn read_sql_local_instruction_counter() -> u64 {
504 #[cfg(all(feature = "diagnostics", target_arch = "wasm32"))]
505 {
506 canic_cdk::api::performance_counter(1)
507 }
508
509 #[cfg(not(all(feature = "diagnostics", target_arch = "wasm32")))]
510 {
511 0
512 }
513}
514
515pub(in crate::db::session::sql) fn measure_sql_stage<T, E>(
516 run: impl FnOnce() -> Result<T, E>,
517) -> (u64, Result<T, E>) {
518 #[cfg(feature = "diagnostics")]
519 let start = read_sql_local_instruction_counter();
520
521 let result = run();
522
523 #[cfg(feature = "diagnostics")]
524 let delta = read_sql_local_instruction_counter().saturating_sub(start);
525
526 #[cfg(not(feature = "diagnostics"))]
527 let delta = 0;
528
529 (delta, result)
530}
531
532impl<C: CanisterKind> DbSession<C> {
533 fn sql_cache_scope_id(&self) -> usize {
534 self.db.cache_scope_id()
535 }
536
537 fn with_sql_compiled_command_cache<R>(
538 &self,
539 f: impl FnOnce(&mut SqlCompiledCommandCache) -> R,
540 ) -> R {
541 let scope_id = self.sql_cache_scope_id();
542
543 SQL_COMPILED_COMMAND_CACHES.with(|caches| {
544 let mut caches = caches.borrow_mut();
545 let cache = caches.entry(scope_id).or_default();
546
547 f(cache)
548 })
549 }
550
551 #[cfg(test)]
552 pub(in crate::db) fn sql_compiled_command_cache_len(&self) -> usize {
553 self.with_sql_compiled_command_cache(|cache| cache.len())
554 }
555
556 #[cfg(test)]
557 pub(in crate::db) fn clear_sql_caches_for_tests(&self) {
558 self.with_sql_compiled_command_cache(SqlCompiledCommandCache::clear);
559 }
560
561 fn sql_select_projection_contract_from_shared_prepared_plan(
565 authority: EntityAuthority,
566 prepared_plan: &SharedPreparedExecutionPlan,
567 ) -> SqlProjectionContract {
568 let projection = prepared_plan
569 .logical_plan()
570 .projection_spec(authority.model());
571 let columns = projection_labels_from_projection_spec(&projection);
572 let fixed_scales = projection_fixed_scales_from_projection_spec(&projection);
573
574 SqlProjectionContract::new(columns, fixed_scales)
575 }
576
577 fn sql_select_prepared_plan(
580 &self,
581 query: &StructuralQuery,
582 authority: EntityAuthority,
583 cache_schema_fingerprint: CommitSchemaFingerprint,
584 ) -> Result<
585 (
586 SharedPreparedExecutionPlan,
587 SqlProjectionContract,
588 SqlCacheAttribution,
589 ),
590 QueryError,
591 > {
592 let (prepared_plan, cache_attribution) = self.cached_shared_query_plan_for_authority(
593 authority,
594 cache_schema_fingerprint,
595 query,
596 )?;
597 let projection = Self::sql_select_projection_contract_from_shared_prepared_plan(
598 authority,
599 &prepared_plan,
600 );
601
602 Ok((
603 prepared_plan,
604 projection,
605 SqlCacheAttribution::from_shared_query_plan_cache(cache_attribution),
606 ))
607 }
608
609 pub(in crate::db::session::sql) fn build_structural_plan_with_visible_indexes_for_authority(
612 &self,
613 query: StructuralQuery,
614 authority: EntityAuthority,
615 ) -> Result<(VisibleIndexes<'_>, AccessPlannedQuery), QueryError> {
616 let visible_indexes =
617 self.visible_indexes_for_store_model(authority.store_path(), authority.model())?;
618 let plan = query.build_plan_with_visible_indexes(&visible_indexes)?;
619
620 Ok((visible_indexes, plan))
621 }
622
623 fn ensure_sql_query_statement_supported(statement: &SqlStatement) -> Result<(), QueryError> {
626 match statement {
627 SqlStatement::Select(_)
628 | SqlStatement::Explain(_)
629 | SqlStatement::Describe(_)
630 | SqlStatement::ShowIndexes(_)
631 | SqlStatement::ShowColumns(_)
632 | SqlStatement::ShowEntities(_) => Ok(()),
633 SqlStatement::Insert(_) => Err(QueryError::unsupported_query(
634 "execute_sql_query rejects INSERT; use execute_sql_update::<E>()",
635 )),
636 SqlStatement::Update(_) => Err(QueryError::unsupported_query(
637 "execute_sql_query rejects UPDATE; use execute_sql_update::<E>()",
638 )),
639 SqlStatement::Delete(_) => Err(QueryError::unsupported_query(
640 "execute_sql_query rejects DELETE; use execute_sql_update::<E>()",
641 )),
642 }
643 }
644
645 fn ensure_sql_update_statement_supported(statement: &SqlStatement) -> Result<(), QueryError> {
648 match statement {
649 SqlStatement::Insert(_) | SqlStatement::Update(_) | SqlStatement::Delete(_) => Ok(()),
650 SqlStatement::Select(_) => Err(QueryError::unsupported_query(
651 "execute_sql_update rejects SELECT; use execute_sql_query::<E>()",
652 )),
653 SqlStatement::Explain(_) => Err(QueryError::unsupported_query(
654 "execute_sql_update rejects EXPLAIN; use execute_sql_query::<E>()",
655 )),
656 SqlStatement::Describe(_) => Err(QueryError::unsupported_query(
657 "execute_sql_update rejects DESCRIBE; use execute_sql_query::<E>()",
658 )),
659 SqlStatement::ShowIndexes(_) => Err(QueryError::unsupported_query(
660 "execute_sql_update rejects SHOW INDEXES; use execute_sql_query::<E>()",
661 )),
662 SqlStatement::ShowColumns(_) => Err(QueryError::unsupported_query(
663 "execute_sql_update rejects SHOW COLUMNS; use execute_sql_query::<E>()",
664 )),
665 SqlStatement::ShowEntities(_) => Err(QueryError::unsupported_query(
666 "execute_sql_update rejects SHOW ENTITIES; use execute_sql_query::<E>()",
667 )),
668 }
669 }
670
671 pub fn execute_sql_query<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
676 where
677 E: PersistedRow<Canister = C> + EntityValue,
678 {
679 let compiled = self.compile_sql_query::<E>(sql)?;
680
681 self.execute_compiled_sql::<E>(&compiled)
682 }
683
684 #[cfg(feature = "diagnostics")]
687 #[doc(hidden)]
688 pub fn execute_sql_query_with_attribution<E>(
689 &self,
690 sql: &str,
691 ) -> Result<(SqlStatementResult, SqlQueryExecutionAttribution), QueryError>
692 where
693 E: PersistedRow<Canister = C> + EntityValue,
694 {
695 let (compile_local_instructions, compiled) =
698 measure_sql_stage(|| self.compile_sql_query_with_cache_attribution::<E>(sql));
699 let (compiled, compile_cache_attribution, compile_phase_attribution) = compiled?;
700
701 let store_get_calls_before = DataStore::current_get_call_count();
704 let pure_covering_decode_before = current_pure_covering_decode_local_instructions();
705 let pure_covering_row_assembly_before =
706 current_pure_covering_row_assembly_local_instructions();
707 let (result, execute_cache_attribution, execute_phase_attribution) =
708 self.execute_compiled_sql_with_phase_attribution::<E>(&compiled)?;
709 let store_get_calls =
710 DataStore::current_get_call_count().saturating_sub(store_get_calls_before);
711 let pure_covering_decode_local_instructions =
712 current_pure_covering_decode_local_instructions()
713 .saturating_sub(pure_covering_decode_before);
714 let pure_covering_row_assembly_local_instructions =
715 current_pure_covering_row_assembly_local_instructions()
716 .saturating_sub(pure_covering_row_assembly_before);
717 let execute_local_instructions = execute_phase_attribution
718 .planner_local_instructions
719 .saturating_add(execute_phase_attribution.store_local_instructions)
720 .saturating_add(execute_phase_attribution.executor_local_instructions);
721 let cache_attribution = compile_cache_attribution.merge(execute_cache_attribution);
722 let total_local_instructions =
723 compile_local_instructions.saturating_add(execute_local_instructions);
724
725 Ok((
726 result,
727 SqlQueryExecutionAttribution {
728 compile_local_instructions,
729 compile_cache_key_local_instructions: compile_phase_attribution.cache_key,
730 compile_cache_lookup_local_instructions: compile_phase_attribution.cache_lookup,
731 compile_parse_local_instructions: compile_phase_attribution.parse,
732 compile_parse_tokenize_local_instructions: compile_phase_attribution.parse_tokenize,
733 compile_parse_select_local_instructions: compile_phase_attribution.parse_select,
734 compile_parse_expr_local_instructions: compile_phase_attribution.parse_expr,
735 compile_parse_predicate_local_instructions: compile_phase_attribution
736 .parse_predicate,
737 compile_aggregate_lane_check_local_instructions: compile_phase_attribution
738 .aggregate_lane_check,
739 compile_prepare_local_instructions: compile_phase_attribution.prepare,
740 compile_lower_local_instructions: compile_phase_attribution.lower,
741 compile_bind_local_instructions: compile_phase_attribution.bind,
742 compile_cache_insert_local_instructions: compile_phase_attribution.cache_insert,
743 planner_local_instructions: execute_phase_attribution.planner_local_instructions,
744 store_local_instructions: execute_phase_attribution.store_local_instructions,
745 executor_local_instructions: execute_phase_attribution.executor_local_instructions,
746 grouped_stream_local_instructions: execute_phase_attribution
747 .grouped_stream_local_instructions,
748 grouped_fold_local_instructions: execute_phase_attribution
749 .grouped_fold_local_instructions,
750 grouped_finalize_local_instructions: execute_phase_attribution
751 .grouped_finalize_local_instructions,
752 grouped_count_borrowed_hash_computations: execute_phase_attribution
753 .grouped_count
754 .borrowed_hash_computations,
755 grouped_count_bucket_candidate_checks: execute_phase_attribution
756 .grouped_count
757 .bucket_candidate_checks,
758 grouped_count_existing_group_hits: execute_phase_attribution
759 .grouped_count
760 .existing_group_hits,
761 grouped_count_new_group_inserts: execute_phase_attribution
762 .grouped_count
763 .new_group_inserts,
764 grouped_count_row_materialization_local_instructions: execute_phase_attribution
765 .grouped_count
766 .row_materialization_local_instructions,
767 grouped_count_group_lookup_local_instructions: execute_phase_attribution
768 .grouped_count
769 .group_lookup_local_instructions,
770 grouped_count_existing_group_update_local_instructions: execute_phase_attribution
771 .grouped_count
772 .existing_group_update_local_instructions,
773 grouped_count_new_group_insert_local_instructions: execute_phase_attribution
774 .grouped_count
775 .new_group_insert_local_instructions,
776 pure_covering_decode_local_instructions,
777 pure_covering_row_assembly_local_instructions,
778 store_get_calls,
779 response_decode_local_instructions: 0,
780 execute_local_instructions,
781 total_local_instructions,
782 sql_compiled_command_cache_hits: cache_attribution.sql_compiled_command_cache_hits,
783 sql_compiled_command_cache_misses: cache_attribution
784 .sql_compiled_command_cache_misses,
785 shared_query_plan_cache_hits: cache_attribution.shared_query_plan_cache_hits,
786 shared_query_plan_cache_misses: cache_attribution.shared_query_plan_cache_misses,
787 },
788 ))
789 }
790
791 pub fn execute_sql_update<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
796 where
797 E: PersistedRow<Canister = C> + EntityValue,
798 {
799 let compiled = self.compile_sql_update::<E>(sql)?;
800
801 self.execute_compiled_sql::<E>(&compiled)
802 }
803
804 pub(in crate::db) fn compile_sql_query<E>(
807 &self,
808 sql: &str,
809 ) -> Result<CompiledSqlCommand, QueryError>
810 where
811 E: PersistedRow<Canister = C> + EntityValue,
812 {
813 self.compile_sql_query_with_cache_attribution::<E>(sql)
814 .map(|(compiled, _, _)| compiled)
815 }
816
817 fn compile_sql_query_with_cache_attribution<E>(
818 &self,
819 sql: &str,
820 ) -> Result<
821 (
822 CompiledSqlCommand,
823 SqlCacheAttribution,
824 SqlCompilePhaseAttribution,
825 ),
826 QueryError,
827 >
828 where
829 E: PersistedRow<Canister = C> + EntityValue,
830 {
831 let (cache_key_local_instructions, cache_key) = measure_sql_stage(|| {
832 Ok::<_, QueryError>(SqlCompiledCommandCacheKey::query_for_entity::<E>(sql))
833 });
834 let cache_key = cache_key?;
835
836 self.compile_sql_statement_with_cache::<E>(
837 cache_key,
838 cache_key_local_instructions,
839 sql,
840 Self::ensure_sql_query_statement_supported,
841 )
842 }
843
844 pub(in crate::db) fn compile_sql_update<E>(
847 &self,
848 sql: &str,
849 ) -> Result<CompiledSqlCommand, QueryError>
850 where
851 E: PersistedRow<Canister = C> + EntityValue,
852 {
853 self.compile_sql_update_with_cache_attribution::<E>(sql)
854 .map(|(compiled, _, _)| compiled)
855 }
856
857 fn compile_sql_update_with_cache_attribution<E>(
858 &self,
859 sql: &str,
860 ) -> Result<
861 (
862 CompiledSqlCommand,
863 SqlCacheAttribution,
864 SqlCompilePhaseAttribution,
865 ),
866 QueryError,
867 >
868 where
869 E: PersistedRow<Canister = C> + EntityValue,
870 {
871 let (cache_key_local_instructions, cache_key) = measure_sql_stage(|| {
872 Ok::<_, QueryError>(SqlCompiledCommandCacheKey::update_for_entity::<E>(sql))
873 });
874 let cache_key = cache_key?;
875
876 self.compile_sql_statement_with_cache::<E>(
877 cache_key,
878 cache_key_local_instructions,
879 sql,
880 Self::ensure_sql_update_statement_supported,
881 )
882 }
883
884 fn compile_sql_statement_with_cache<E>(
887 &self,
888 cache_key: SqlCompiledCommandCacheKey,
889 cache_key_local_instructions: u64,
890 sql: &str,
891 ensure_surface_supported: fn(&SqlStatement) -> Result<(), QueryError>,
892 ) -> Result<
893 (
894 CompiledSqlCommand,
895 SqlCacheAttribution,
896 SqlCompilePhaseAttribution,
897 ),
898 QueryError,
899 >
900 where
901 E: PersistedRow<Canister = C> + EntityValue,
902 {
903 let (cache_lookup_local_instructions, cached) = measure_sql_stage(|| {
904 let cached =
905 self.with_sql_compiled_command_cache(|cache| cache.get(&cache_key).cloned());
906 Ok::<_, QueryError>(cached)
907 });
908 let cached = cached?;
909 if let Some(compiled) = cached {
910 return Ok((
911 compiled,
912 SqlCacheAttribution::sql_compiled_command_cache_hit(),
913 SqlCompilePhaseAttribution::cache_hit(
914 cache_key_local_instructions,
915 cache_lookup_local_instructions,
916 ),
917 ));
918 }
919
920 let (parse_local_instructions, parsed) =
921 measure_sql_stage(|| parse_sql_statement_with_attribution(sql));
922 let (parsed, parse_attribution) = parsed?;
923 let parse_select_local_instructions = parse_local_instructions
924 .saturating_sub(parse_attribution.tokenize)
925 .saturating_sub(parse_attribution.expr)
926 .saturating_sub(parse_attribution.predicate);
927 ensure_surface_supported(&parsed)?;
928 let authority = EntityAuthority::for_type::<E>();
929 let (
930 compiled,
931 aggregate_lane_check_local_instructions,
932 prepare_local_instructions,
933 lower_local_instructions,
934 bind_local_instructions,
935 ) = Self::compile_sql_statement_for_authority(&parsed, authority, cache_key.clone())?;
936
937 let (cache_insert_local_instructions, cache_insert) = measure_sql_stage(|| {
938 self.with_sql_compiled_command_cache(|cache| {
939 cache.insert(cache_key, compiled.clone());
940 });
941 Ok::<_, QueryError>(())
942 });
943 cache_insert?;
944
945 Ok((
946 compiled,
947 SqlCacheAttribution::sql_compiled_command_cache_miss(),
948 SqlCompilePhaseAttribution {
949 cache_key: cache_key_local_instructions,
950 cache_lookup: cache_lookup_local_instructions,
951 parse: parse_local_instructions,
952 parse_tokenize: parse_attribution.tokenize,
953 parse_select: parse_select_local_instructions,
954 parse_expr: parse_attribution.expr,
955 parse_predicate: parse_attribution.predicate,
956 aggregate_lane_check: aggregate_lane_check_local_instructions,
957 prepare: prepare_local_instructions,
958 lower: lower_local_instructions,
959 bind: bind_local_instructions,
960 cache_insert: cache_insert_local_instructions,
961 },
962 ))
963 }
964}