#[cfg(feature = "diagnostics")]
use crate::db::executor::{
GroupedCountAttribution, GroupedExecutePhaseAttribution, ScalarExecutePhaseAttribution,
};
use crate::{
db::{
DbSession, EntityResponse, LoadQueryResult, PagedGroupedExecutionWithTrace,
PagedLoadExecutionWithTrace, PersistedRow, Query, QueryError, QueryTracePlan,
access::AccessStrategy,
commit::CommitSchemaFingerprint,
cursor::{
CursorPlanError, decode_optional_cursor_token, decode_optional_grouped_cursor_token,
},
diagnostics::ExecutionTrace,
executor::{
ExecutionFamily, GroupedCursorPage, LoadExecutor, PreparedExecutionPlan,
SharedPreparedExecutionPlan,
},
predicate::predicate_fingerprint_normalized,
query::builder::{
PreparedFluentAggregateExplainStrategy, PreparedFluentProjectionStrategy,
},
query::explain::{
ExplainAggregateTerminalPlan, ExplainExecutionNodeDescriptor, ExplainPlan,
},
query::{
intent::{CompiledQuery, PlannedQuery, StructuralQuery},
plan::{QueryMode, VisibleIndexes},
},
},
error::InternalError,
model::entity::EntityModel,
traits::{CanisterKind, EntityKind, EntityValue, Path},
};
#[cfg(feature = "diagnostics")]
use candid::CandidType;
use icydb_utils::Xxh3;
#[cfg(feature = "diagnostics")]
use serde::Deserialize;
use std::{cell::RefCell, collections::HashMap, hash::BuildHasherDefault};
type CacheBuildHasher = BuildHasherDefault<Xxh3>;
const SHARED_QUERY_PLAN_CACHE_METHOD_VERSION: u8 = 1;
#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
pub(in crate::db) enum QueryPlanVisibility {
StoreNotReady,
StoreReady,
}
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
pub(in crate::db) struct QueryPlanCacheKey {
cache_method_version: u8,
entity_path: &'static str,
schema_fingerprint: CommitSchemaFingerprint,
visibility: QueryPlanVisibility,
structural_query: crate::db::query::intent::StructuralQueryCacheKey,
}
pub(in crate::db) type QueryPlanCache =
HashMap<QueryPlanCacheKey, SharedPreparedExecutionPlan, CacheBuildHasher>;
thread_local! {
static QUERY_PLAN_CACHES: RefCell<HashMap<usize, QueryPlanCache, CacheBuildHasher>> =
RefCell::new(HashMap::default());
}
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
pub(in crate::db) struct QueryPlanCacheAttribution {
pub hits: u64,
pub misses: u64,
}
impl QueryPlanCacheAttribution {
#[must_use]
const fn hit() -> Self {
Self { hits: 1, misses: 0 }
}
#[must_use]
const fn miss() -> Self {
Self { hits: 0, misses: 1 }
}
}
#[cfg(feature = "diagnostics")]
#[derive(CandidType, Clone, Debug, Deserialize, Eq, PartialEq)]
pub struct QueryExecutionAttribution {
pub compile_local_instructions: u64,
pub runtime_local_instructions: u64,
pub finalize_local_instructions: u64,
pub direct_data_row_scan_local_instructions: u64,
pub direct_data_row_key_stream_local_instructions: u64,
pub direct_data_row_row_read_local_instructions: u64,
pub direct_data_row_key_encode_local_instructions: u64,
pub direct_data_row_store_get_local_instructions: u64,
pub direct_data_row_order_window_local_instructions: u64,
pub direct_data_row_page_window_local_instructions: u64,
pub grouped_stream_local_instructions: u64,
pub grouped_fold_local_instructions: u64,
pub grouped_finalize_local_instructions: u64,
pub grouped_count_borrowed_hash_computations: u64,
pub grouped_count_bucket_candidate_checks: u64,
pub grouped_count_existing_group_hits: u64,
pub grouped_count_new_group_inserts: u64,
pub grouped_count_row_materialization_local_instructions: u64,
pub grouped_count_group_lookup_local_instructions: u64,
pub grouped_count_existing_group_update_local_instructions: u64,
pub grouped_count_new_group_insert_local_instructions: u64,
pub response_decode_local_instructions: u64,
pub execute_local_instructions: u64,
pub total_local_instructions: u64,
pub shared_query_plan_cache_hits: u64,
pub shared_query_plan_cache_misses: u64,
}
#[cfg(feature = "diagnostics")]
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
struct QueryExecutePhaseAttribution {
runtime_local_instructions: u64,
finalize_local_instructions: u64,
direct_data_row_scan_local_instructions: u64,
direct_data_row_key_stream_local_instructions: u64,
direct_data_row_row_read_local_instructions: u64,
direct_data_row_key_encode_local_instructions: u64,
direct_data_row_store_get_local_instructions: u64,
direct_data_row_order_window_local_instructions: u64,
direct_data_row_page_window_local_instructions: u64,
grouped_stream_local_instructions: u64,
grouped_fold_local_instructions: u64,
grouped_finalize_local_instructions: u64,
grouped_count: GroupedCountAttribution,
}
#[cfg(feature = "diagnostics")]
#[expect(
clippy::missing_const_for_fn,
reason = "the wasm32 branch reads the runtime performance counter and cannot be const"
)]
fn read_query_local_instruction_counter() -> u64 {
#[cfg(target_arch = "wasm32")]
{
canic_cdk::api::performance_counter(1)
}
#[cfg(not(target_arch = "wasm32"))]
{
0
}
}
#[cfg(feature = "diagnostics")]
fn measure_query_stage<T, E>(run: impl FnOnce() -> Result<T, E>) -> (u64, Result<T, E>) {
let start = read_query_local_instruction_counter();
let result = run();
let delta = read_query_local_instruction_counter().saturating_sub(start);
(delta, result)
}
impl<C: CanisterKind> DbSession<C> {
#[cfg(feature = "diagnostics")]
const fn empty_query_execute_phase_attribution() -> QueryExecutePhaseAttribution {
QueryExecutePhaseAttribution {
runtime_local_instructions: 0,
finalize_local_instructions: 0,
direct_data_row_scan_local_instructions: 0,
direct_data_row_key_stream_local_instructions: 0,
direct_data_row_row_read_local_instructions: 0,
direct_data_row_key_encode_local_instructions: 0,
direct_data_row_store_get_local_instructions: 0,
direct_data_row_order_window_local_instructions: 0,
direct_data_row_page_window_local_instructions: 0,
grouped_stream_local_instructions: 0,
grouped_fold_local_instructions: 0,
grouped_finalize_local_instructions: 0,
grouped_count: GroupedCountAttribution::none(),
}
}
#[cfg(feature = "diagnostics")]
const fn scalar_query_execute_phase_attribution(
phase: ScalarExecutePhaseAttribution,
) -> QueryExecutePhaseAttribution {
QueryExecutePhaseAttribution {
runtime_local_instructions: phase.runtime_local_instructions,
finalize_local_instructions: phase.finalize_local_instructions,
direct_data_row_scan_local_instructions: phase.direct_data_row_scan_local_instructions,
direct_data_row_key_stream_local_instructions: phase
.direct_data_row_key_stream_local_instructions,
direct_data_row_row_read_local_instructions: phase
.direct_data_row_row_read_local_instructions,
direct_data_row_key_encode_local_instructions: phase
.direct_data_row_key_encode_local_instructions,
direct_data_row_store_get_local_instructions: phase
.direct_data_row_store_get_local_instructions,
direct_data_row_order_window_local_instructions: phase
.direct_data_row_order_window_local_instructions,
direct_data_row_page_window_local_instructions: phase
.direct_data_row_page_window_local_instructions,
grouped_stream_local_instructions: 0,
grouped_fold_local_instructions: 0,
grouped_finalize_local_instructions: 0,
grouped_count: GroupedCountAttribution::none(),
}
}
#[cfg(feature = "diagnostics")]
const fn grouped_query_execute_phase_attribution(
phase: GroupedExecutePhaseAttribution,
) -> QueryExecutePhaseAttribution {
QueryExecutePhaseAttribution {
runtime_local_instructions: phase
.stream_local_instructions
.saturating_add(phase.fold_local_instructions),
finalize_local_instructions: phase.finalize_local_instructions,
direct_data_row_scan_local_instructions: 0,
direct_data_row_key_stream_local_instructions: 0,
direct_data_row_row_read_local_instructions: 0,
direct_data_row_key_encode_local_instructions: 0,
direct_data_row_store_get_local_instructions: 0,
direct_data_row_order_window_local_instructions: 0,
direct_data_row_page_window_local_instructions: 0,
grouped_stream_local_instructions: phase.stream_local_instructions,
grouped_fold_local_instructions: phase.fold_local_instructions,
grouped_finalize_local_instructions: phase.finalize_local_instructions,
grouped_count: phase.grouped_count,
}
}
fn with_query_plan_cache<R>(&self, f: impl FnOnce(&mut QueryPlanCache) -> R) -> R {
let scope_id = self.db.cache_scope_id();
QUERY_PLAN_CACHES.with(|caches| {
let mut caches = caches.borrow_mut();
let cache = caches.entry(scope_id).or_default();
f(cache)
})
}
const fn visible_indexes_for_model(
model: &'static EntityModel,
visibility: QueryPlanVisibility,
) -> VisibleIndexes<'static> {
match visibility {
QueryPlanVisibility::StoreReady => VisibleIndexes::planner_visible(model.indexes()),
QueryPlanVisibility::StoreNotReady => VisibleIndexes::none(),
}
}
#[cfg(test)]
pub(in crate::db) fn query_plan_cache_len(&self) -> usize {
self.with_query_plan_cache(|cache| cache.len())
}
#[cfg(test)]
pub(in crate::db) fn clear_query_plan_cache_for_tests(&self) {
self.with_query_plan_cache(QueryPlanCache::clear);
}
pub(in crate::db) fn query_plan_visibility_for_store_path(
&self,
store_path: &'static str,
) -> Result<QueryPlanVisibility, QueryError> {
let store = self
.db
.recovered_store(store_path)
.map_err(QueryError::execute)?;
let visibility = if store.index_state() == crate::db::IndexState::Ready {
QueryPlanVisibility::StoreReady
} else {
QueryPlanVisibility::StoreNotReady
};
Ok(visibility)
}
pub(in crate::db) fn cached_shared_query_plan_for_authority(
&self,
authority: crate::db::executor::EntityAuthority,
schema_fingerprint: CommitSchemaFingerprint,
query: &StructuralQuery,
) -> Result<(SharedPreparedExecutionPlan, QueryPlanCacheAttribution), QueryError> {
let visibility = self.query_plan_visibility_for_store_path(authority.store_path())?;
let visible_indexes = Self::visible_indexes_for_model(authority.model(), visibility);
let planning_state = query.prepare_scalar_planning_state()?;
let normalized_predicate_fingerprint = planning_state
.normalized_predicate()
.map(predicate_fingerprint_normalized);
let cache_key =
QueryPlanCacheKey::for_authority_with_normalized_predicate_fingerprint_and_method_version(
authority,
schema_fingerprint,
visibility,
query,
normalized_predicate_fingerprint,
SHARED_QUERY_PLAN_CACHE_METHOD_VERSION,
);
{
let cached = self.with_query_plan_cache(|cache| cache.get(&cache_key).cloned());
if let Some(prepared_plan) = cached {
return Ok((prepared_plan, QueryPlanCacheAttribution::hit()));
}
}
let plan = query.build_plan_with_visible_indexes_from_scalar_planning_state(
&visible_indexes,
planning_state,
)?;
let prepared_plan = SharedPreparedExecutionPlan::from_plan(authority, plan);
self.with_query_plan_cache(|cache| {
cache.insert(cache_key, prepared_plan.clone());
});
Ok((prepared_plan, QueryPlanCacheAttribution::miss()))
}
#[cfg(test)]
pub(in crate::db) fn query_plan_cache_key_for_tests(
authority: crate::db::executor::EntityAuthority,
schema_fingerprint: CommitSchemaFingerprint,
visibility: QueryPlanVisibility,
query: &StructuralQuery,
cache_method_version: u8,
) -> QueryPlanCacheKey {
QueryPlanCacheKey::for_authority_with_method_version(
authority,
schema_fingerprint,
visibility,
query,
cache_method_version,
)
}
fn with_query_visible_indexes<E, T>(
&self,
query: &Query<E>,
op: impl FnOnce(
&Query<E>,
&crate::db::query::plan::VisibleIndexes<'static>,
) -> Result<T, QueryError>,
) -> Result<T, QueryError>
where
E: EntityKind<Canister = C>,
{
let visibility = self.query_plan_visibility_for_store_path(E::Store::PATH)?;
let visible_indexes = Self::visible_indexes_for_model(E::MODEL, visibility);
op(query, &visible_indexes)
}
pub(in crate::db::session) fn cached_prepared_query_plan_for_entity<E>(
&self,
query: &Query<E>,
) -> Result<(PreparedExecutionPlan<E>, QueryPlanCacheAttribution), QueryError>
where
E: EntityKind<Canister = C>,
{
let (prepared_plan, attribution) = self.cached_shared_query_plan_for_entity::<E>(query)?;
Ok((prepared_plan.typed_clone::<E>(), attribution))
}
fn cached_shared_query_plan_for_entity<E>(
&self,
query: &Query<E>,
) -> Result<(SharedPreparedExecutionPlan, QueryPlanCacheAttribution), QueryError>
where
E: EntityKind<Canister = C>,
{
self.cached_shared_query_plan_for_authority(
crate::db::executor::EntityAuthority::for_type::<E>(),
crate::db::schema::commit_schema_fingerprint_for_entity::<E>(),
query.structural(),
)
}
fn map_cached_shared_query_plan_for_entity<E, T>(
&self,
query: &Query<E>,
map: impl FnOnce(SharedPreparedExecutionPlan) -> T,
) -> Result<T, QueryError>
where
E: EntityKind<Canister = C>,
{
let (prepared_plan, _) = self.cached_shared_query_plan_for_entity::<E>(query)?;
Ok(map(prepared_plan))
}
pub(in crate::db) fn compile_query_with_visible_indexes<E>(
&self,
query: &Query<E>,
) -> Result<CompiledQuery<E>, QueryError>
where
E: EntityKind<Canister = C>,
{
self.map_cached_shared_query_plan_for_entity(query, CompiledQuery::<E>::from_prepared_plan)
}
pub(in crate::db) fn planned_query_with_visible_indexes<E>(
&self,
query: &Query<E>,
) -> Result<PlannedQuery<E>, QueryError>
where
E: EntityKind<Canister = C>,
{
self.map_cached_shared_query_plan_for_entity(query, PlannedQuery::<E>::from_prepared_plan)
}
pub(in crate::db) fn explain_query_with_visible_indexes<E>(
&self,
query: &Query<E>,
) -> Result<ExplainPlan, QueryError>
where
E: EntityKind<Canister = C>,
{
self.with_query_visible_indexes(query, Query::<E>::explain_with_visible_indexes)
}
pub(in crate::db) fn query_plan_hash_hex_with_visible_indexes<E>(
&self,
query: &Query<E>,
) -> Result<String, QueryError>
where
E: EntityKind<Canister = C>,
{
self.with_query_visible_indexes(query, Query::<E>::plan_hash_hex_with_visible_indexes)
}
pub(in crate::db) fn explain_query_execution_with_visible_indexes<E>(
&self,
query: &Query<E>,
) -> Result<ExplainExecutionNodeDescriptor, QueryError>
where
E: EntityValue + EntityKind<Canister = C>,
{
self.with_query_visible_indexes(query, Query::<E>::explain_execution_with_visible_indexes)
}
pub(in crate::db) fn explain_query_execution_verbose_with_visible_indexes<E>(
&self,
query: &Query<E>,
) -> Result<String, QueryError>
where
E: EntityValue + EntityKind<Canister = C>,
{
self.with_query_visible_indexes(
query,
Query::<E>::explain_execution_verbose_with_visible_indexes,
)
}
pub(in crate::db) fn explain_query_prepared_aggregate_terminal_with_visible_indexes<E, S>(
&self,
query: &Query<E>,
strategy: &S,
) -> Result<ExplainAggregateTerminalPlan, QueryError>
where
E: EntityValue + EntityKind<Canister = C>,
S: PreparedFluentAggregateExplainStrategy,
{
self.with_query_visible_indexes(query, |query, visible_indexes| {
query
.explain_prepared_aggregate_terminal_with_visible_indexes(visible_indexes, strategy)
})
}
pub(in crate::db) fn explain_query_bytes_by_with_visible_indexes<E>(
&self,
query: &Query<E>,
target_field: &str,
) -> Result<ExplainExecutionNodeDescriptor, QueryError>
where
E: EntityValue + EntityKind<Canister = C>,
{
self.with_query_visible_indexes(query, |query, visible_indexes| {
query.explain_bytes_by_with_visible_indexes(visible_indexes, target_field)
})
}
pub(in crate::db) fn explain_query_prepared_projection_terminal_with_visible_indexes<E>(
&self,
query: &Query<E>,
strategy: &PreparedFluentProjectionStrategy,
) -> Result<ExplainExecutionNodeDescriptor, QueryError>
where
E: EntityValue + EntityKind<Canister = C>,
{
self.with_query_visible_indexes(query, |query, visible_indexes| {
query.explain_prepared_projection_terminal_with_visible_indexes(
visible_indexes,
strategy,
)
})
}
fn ensure_scalar_paged_execution_family(family: ExecutionFamily) -> Result<(), QueryError> {
match family {
ExecutionFamily::PrimaryKey => Err(QueryError::invariant(
CursorPlanError::cursor_requires_explicit_or_grouped_ordering_message(),
)),
ExecutionFamily::Ordered => Ok(()),
ExecutionFamily::Grouped => Err(QueryError::invariant(
"grouped queries execute via execute(), not page().execute()",
)),
}
}
fn ensure_grouped_execution_family(family: ExecutionFamily) -> Result<(), QueryError> {
match family {
ExecutionFamily::Grouped => Ok(()),
ExecutionFamily::PrimaryKey | ExecutionFamily::Ordered => Err(QueryError::invariant(
"grouped execution requires grouped logical plans",
)),
}
}
fn finalize_grouped_execution_page(
page: GroupedCursorPage,
trace: Option<ExecutionTrace>,
) -> Result<PagedGroupedExecutionWithTrace, QueryError> {
let next_cursor = page
.next_cursor
.map(|token| {
let Some(token) = token.as_grouped() else {
return Err(QueryError::grouped_paged_emitted_scalar_continuation());
};
token.encode().map_err(|err| {
QueryError::serialize_internal(format!(
"failed to serialize grouped continuation cursor: {err}"
))
})
})
.transpose()?;
Ok(PagedGroupedExecutionWithTrace::new(
page.rows,
next_cursor,
trace,
))
}
pub fn execute_query<E>(&self, query: &Query<E>) -> Result<EntityResponse<E>, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
let mode = query.mode();
let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
self.execute_query_dyn(mode, plan)
}
#[cfg(feature = "diagnostics")]
#[doc(hidden)]
#[expect(
clippy::too_many_lines,
reason = "the diagnostics-only attribution path keeps grouped and scalar execution on one explicit compile/execute accounting seam"
)]
pub fn execute_query_result_with_attribution<E>(
&self,
query: &Query<E>,
) -> Result<(LoadQueryResult<E>, QueryExecutionAttribution), QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
let (compile_local_instructions, plan_and_cache) =
measure_query_stage(|| self.cached_prepared_query_plan_for_entity::<E>(query));
let (plan, cache_attribution) = plan_and_cache?;
let (execute_local_instructions, result) = measure_query_stage(
|| -> Result<(LoadQueryResult<E>, QueryExecutePhaseAttribution, u64), QueryError> {
if query.has_grouping() {
let (page, trace, phase_attribution) =
self.execute_grouped_plan_with(plan, None, |executor, plan, cursor| {
executor
.execute_grouped_paged_with_cursor_traced_with_phase_attribution(
plan, cursor,
)
})?;
let grouped = Self::finalize_grouped_execution_page(page, trace)?;
Ok((
LoadQueryResult::Grouped(grouped),
Self::grouped_query_execute_phase_attribution(phase_attribution),
0,
))
} else {
match query.mode() {
QueryMode::Load(_) => {
let (rows, phase_attribution, response_decode_local_instructions) =
self.load_executor::<E>()
.execute_with_phase_attribution(plan)
.map_err(QueryError::execute)?;
Ok((
LoadQueryResult::Rows(rows),
Self::scalar_query_execute_phase_attribution(phase_attribution),
response_decode_local_instructions,
))
}
QueryMode::Delete(_) => {
let result = self.execute_query_dyn(query.mode(), plan)?;
Ok((
LoadQueryResult::Rows(result),
Self::empty_query_execute_phase_attribution(),
0,
))
}
}
}
},
);
let (result, execute_phase_attribution, response_decode_local_instructions) = result?;
let total_local_instructions =
compile_local_instructions.saturating_add(execute_local_instructions);
Ok((
result,
QueryExecutionAttribution {
compile_local_instructions,
runtime_local_instructions: execute_phase_attribution.runtime_local_instructions,
finalize_local_instructions: execute_phase_attribution.finalize_local_instructions,
direct_data_row_scan_local_instructions: execute_phase_attribution
.direct_data_row_scan_local_instructions,
direct_data_row_key_stream_local_instructions: execute_phase_attribution
.direct_data_row_key_stream_local_instructions,
direct_data_row_row_read_local_instructions: execute_phase_attribution
.direct_data_row_row_read_local_instructions,
direct_data_row_key_encode_local_instructions: execute_phase_attribution
.direct_data_row_key_encode_local_instructions,
direct_data_row_store_get_local_instructions: execute_phase_attribution
.direct_data_row_store_get_local_instructions,
direct_data_row_order_window_local_instructions: execute_phase_attribution
.direct_data_row_order_window_local_instructions,
direct_data_row_page_window_local_instructions: execute_phase_attribution
.direct_data_row_page_window_local_instructions,
grouped_stream_local_instructions: execute_phase_attribution
.grouped_stream_local_instructions,
grouped_fold_local_instructions: execute_phase_attribution
.grouped_fold_local_instructions,
grouped_finalize_local_instructions: execute_phase_attribution
.grouped_finalize_local_instructions,
grouped_count_borrowed_hash_computations: execute_phase_attribution
.grouped_count
.borrowed_hash_computations,
grouped_count_bucket_candidate_checks: execute_phase_attribution
.grouped_count
.bucket_candidate_checks,
grouped_count_existing_group_hits: execute_phase_attribution
.grouped_count
.existing_group_hits,
grouped_count_new_group_inserts: execute_phase_attribution
.grouped_count
.new_group_inserts,
grouped_count_row_materialization_local_instructions: execute_phase_attribution
.grouped_count
.row_materialization_local_instructions,
grouped_count_group_lookup_local_instructions: execute_phase_attribution
.grouped_count
.group_lookup_local_instructions,
grouped_count_existing_group_update_local_instructions: execute_phase_attribution
.grouped_count
.existing_group_update_local_instructions,
grouped_count_new_group_insert_local_instructions: execute_phase_attribution
.grouped_count
.new_group_insert_local_instructions,
response_decode_local_instructions,
execute_local_instructions,
total_local_instructions,
shared_query_plan_cache_hits: cache_attribution.hits,
shared_query_plan_cache_misses: cache_attribution.misses,
},
))
}
#[doc(hidden)]
pub fn execute_query_result<E>(
&self,
query: &Query<E>,
) -> Result<LoadQueryResult<E>, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
if query.has_grouping() {
return self
.execute_grouped(query, None)
.map(LoadQueryResult::Grouped);
}
self.execute_query(query).map(LoadQueryResult::Rows)
}
#[doc(hidden)]
pub fn execute_delete_count<E>(&self, query: &Query<E>) -> Result<u32, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
if !query.mode().is_delete() {
return Err(QueryError::unsupported_query(
"delete count execution requires delete query mode",
));
}
let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
self.with_metrics(|| self.delete_executor::<E>().execute_count(plan))
.map_err(QueryError::execute)
}
pub(in crate::db) fn execute_query_dyn<E>(
&self,
mode: QueryMode,
plan: PreparedExecutionPlan<E>,
) -> Result<EntityResponse<E>, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
let result = match mode {
QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
};
result.map_err(QueryError::execute)
}
pub(in crate::db) fn execute_load_query_with<E, T>(
&self,
query: &Query<E>,
op: impl FnOnce(LoadExecutor<E>, PreparedExecutionPlan<E>) -> Result<T, InternalError>,
) -> Result<T, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
self.with_metrics(|| op(self.load_executor::<E>(), plan))
.map_err(QueryError::execute)
}
pub fn trace_query<E>(&self, query: &Query<E>) -> Result<QueryTracePlan, QueryError>
where
E: EntityKind<Canister = C>,
{
let (prepared_plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query)?;
let logical_plan = prepared_plan.logical_plan();
let explain = logical_plan.explain();
let plan_hash = logical_plan.fingerprint().to_string();
let access_strategy = AccessStrategy::from_plan(prepared_plan.access()).debug_summary();
let execution_family = match query.mode() {
QueryMode::Load(_) => Some(
prepared_plan
.execution_family()
.map_err(QueryError::execute)?,
),
QueryMode::Delete(_) => None,
};
Ok(QueryTracePlan::new(
plan_hash,
access_strategy,
execution_family,
explain,
))
}
pub(crate) fn execute_load_query_paged_with_trace<E>(
&self,
query: &Query<E>,
cursor_token: Option<&str>,
) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
let plan = self.cached_prepared_query_plan_for_entity::<E>(query)?.0;
Self::ensure_scalar_paged_execution_family(
plan.execution_family().map_err(QueryError::execute)?,
)?;
let cursor_bytes = decode_optional_cursor_token(cursor_token)
.map_err(QueryError::from_cursor_plan_error)?;
let cursor = plan
.prepare_cursor(cursor_bytes.as_deref())
.map_err(QueryError::from_executor_plan_error)?;
let (page, trace) = self
.with_metrics(|| {
self.load_executor::<E>()
.execute_paged_with_cursor_traced(plan, cursor)
})
.map_err(QueryError::execute)?;
let next_cursor = page
.next_cursor
.map(|token| {
let Some(token) = token.as_scalar() else {
return Err(QueryError::scalar_paged_emitted_grouped_continuation());
};
token.encode().map_err(|err| {
QueryError::serialize_internal(format!(
"failed to serialize continuation cursor: {err}"
))
})
})
.transpose()?;
Ok(PagedLoadExecutionWithTrace::new(
page.items,
next_cursor,
trace,
))
}
pub(in crate::db) fn execute_grouped<E>(
&self,
query: &Query<E>,
cursor_token: Option<&str>,
) -> Result<PagedGroupedExecutionWithTrace, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
let plan = self.cached_prepared_query_plan_for_entity::<E>(query)?.0;
let (page, trace) = self.execute_grouped_plan_with_trace(plan, cursor_token)?;
Self::finalize_grouped_execution_page(page, trace)
}
fn execute_grouped_plan_with<E, T>(
&self,
plan: PreparedExecutionPlan<E>,
cursor_token: Option<&str>,
op: impl FnOnce(
LoadExecutor<E>,
PreparedExecutionPlan<E>,
crate::db::cursor::GroupedPlannedCursor,
) -> Result<T, InternalError>,
) -> Result<T, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
Self::ensure_grouped_execution_family(
plan.execution_family().map_err(QueryError::execute)?,
)?;
let cursor = decode_optional_grouped_cursor_token(cursor_token)
.map_err(QueryError::from_cursor_plan_error)?;
let cursor = plan
.prepare_grouped_cursor_token(cursor)
.map_err(QueryError::from_executor_plan_error)?;
self.with_metrics(|| op(self.load_executor::<E>(), plan, cursor))
.map_err(QueryError::execute)
}
fn execute_grouped_plan_with_trace<E>(
&self,
plan: PreparedExecutionPlan<E>,
cursor_token: Option<&str>,
) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
self.execute_grouped_plan_with(plan, cursor_token, |executor, plan, cursor| {
executor.execute_grouped_paged_with_cursor_traced(plan, cursor)
})
}
}
impl QueryPlanCacheKey {
const fn from_authority_parts(
authority: crate::db::executor::EntityAuthority,
schema_fingerprint: CommitSchemaFingerprint,
visibility: QueryPlanVisibility,
structural_query: crate::db::query::intent::StructuralQueryCacheKey,
cache_method_version: u8,
) -> Self {
Self {
cache_method_version,
entity_path: authority.entity_path(),
schema_fingerprint,
visibility,
structural_query,
}
}
#[cfg(test)]
fn for_authority_with_method_version(
authority: crate::db::executor::EntityAuthority,
schema_fingerprint: CommitSchemaFingerprint,
visibility: QueryPlanVisibility,
query: &StructuralQuery,
cache_method_version: u8,
) -> Self {
Self::from_authority_parts(
authority,
schema_fingerprint,
visibility,
query.structural_cache_key(),
cache_method_version,
)
}
fn for_authority_with_normalized_predicate_fingerprint_and_method_version(
authority: crate::db::executor::EntityAuthority,
schema_fingerprint: CommitSchemaFingerprint,
visibility: QueryPlanVisibility,
query: &StructuralQuery,
normalized_predicate_fingerprint: Option<[u8; 32]>,
cache_method_version: u8,
) -> Self {
Self::from_authority_parts(
authority,
schema_fingerprint,
visibility,
query.structural_cache_key_with_normalized_predicate_fingerprint(
normalized_predicate_fingerprint,
),
cache_method_version,
)
}
}