use crate::{
db::{
DbSession, LoadQueryResult, PersistedRow, Query, QueryError,
diagnostics::{
StoreCounterSnapshot, measure_local_instruction_delta as measure_query_stage,
},
executor::{
DirectDataRowPhaseAttribution,
GroupedCountAttribution as ExecutorGroupedCountAttribution,
GroupedExecutePhaseAttribution, KernelRowPhaseAttribution,
ScalarAggregateTerminalAttribution, ScalarExecutePhaseAttribution,
},
session::finalize_structural_grouped_projection_result,
session::query::{
PreparedQueryExecutionOutcome, PreparedQueryExecutionOutput, QueryPlanCacheAttribution,
QueryPlanCompilePhaseAttribution,
},
},
traits::{CanisterKind, EntityValue},
};
use candid::CandidType;
use serde::Deserialize;
#[derive(CandidType, Clone, Copy, Debug, Default, Deserialize, Eq, PartialEq)]
pub struct DirectDataRowAttribution {
pub scan_local_instructions: u64,
pub key_stream_local_instructions: u64,
pub row_read_local_instructions: u64,
pub key_encode_local_instructions: u64,
pub store_get_local_instructions: u64,
pub order_window_local_instructions: u64,
pub page_window_local_instructions: u64,
}
impl DirectDataRowAttribution {
#[cfg(any(test, feature = "sql"))]
const fn from_direct_phase(phase: DirectDataRowPhaseAttribution) -> Option<Self> {
if phase.has_work() {
Some(Self::from_phase_unchecked(phase))
} else {
None
}
}
pub(in crate::db) const fn from_scalar_phase(phase: ScalarExecutePhaseAttribution) -> Self {
Self::from_phase_unchecked(DirectDataRowPhaseAttribution {
scan_local_instructions: phase.direct_data_row_scan_local_instructions,
key_stream_local_instructions: phase.direct_data_row_key_stream_local_instructions,
row_read_local_instructions: phase.direct_data_row_row_read_local_instructions,
key_encode_local_instructions: phase.direct_data_row_key_encode_local_instructions,
store_get_local_instructions: phase.direct_data_row_store_get_local_instructions,
order_window_local_instructions: phase.direct_data_row_order_window_local_instructions,
page_window_local_instructions: phase.direct_data_row_page_window_local_instructions,
})
}
#[cfg(any(test, feature = "sql"))]
pub(in crate::db) const fn from_captured_phase(
phase: DirectDataRowPhaseAttribution,
) -> Option<Self> {
Self::from_direct_phase(phase)
}
const fn from_phase_unchecked(phase: DirectDataRowPhaseAttribution) -> Self {
Self {
scan_local_instructions: phase.scan_local_instructions,
key_stream_local_instructions: phase.key_stream_local_instructions,
row_read_local_instructions: phase.row_read_local_instructions,
key_encode_local_instructions: phase.key_encode_local_instructions,
store_get_local_instructions: phase.store_get_local_instructions,
order_window_local_instructions: phase.order_window_local_instructions,
page_window_local_instructions: phase.page_window_local_instructions,
}
}
}
#[derive(CandidType, Clone, Copy, Debug, Default, Deserialize, Eq, PartialEq)]
pub struct KernelRowAttribution {
pub scan_local_instructions: u64,
pub key_stream_local_instructions: u64,
pub row_read_local_instructions: u64,
pub order_window_local_instructions: u64,
pub page_window_local_instructions: u64,
pub retained_layout_hits: u64,
pub retained_slot_values: u64,
pub retained_octet_length_values: u64,
}
impl KernelRowAttribution {
pub(in crate::db) const fn from_scalar_phase(
phase: ScalarExecutePhaseAttribution,
) -> Option<Self> {
Self::from_kernel_phase(KernelRowPhaseAttribution {
scan_local_instructions: phase.kernel_row_scan_local_instructions,
key_stream_local_instructions: phase.kernel_row_key_stream_local_instructions,
row_read_local_instructions: phase.kernel_row_row_read_local_instructions,
order_window_local_instructions: phase.kernel_row_order_window_local_instructions,
page_window_local_instructions: phase.kernel_row_page_window_local_instructions,
retained_layout_hits: phase.kernel_row_retained_layout_hits,
retained_slot_values: phase.kernel_row_retained_slot_values,
retained_octet_length_values: phase.kernel_row_retained_octet_length_values,
})
}
#[cfg(any(test, feature = "sql"))]
pub(in crate::db) const fn from_captured_phase(
phase: KernelRowPhaseAttribution,
) -> Option<Self> {
Self::from_kernel_phase(phase)
}
const fn from_kernel_phase(phase: KernelRowPhaseAttribution) -> Option<Self> {
if phase.has_work() {
Some(Self {
scan_local_instructions: phase.scan_local_instructions,
key_stream_local_instructions: phase.key_stream_local_instructions,
row_read_local_instructions: phase.row_read_local_instructions,
order_window_local_instructions: phase.order_window_local_instructions,
page_window_local_instructions: phase.page_window_local_instructions,
retained_layout_hits: phase.retained_layout_hits,
retained_slot_values: phase.retained_slot_values,
retained_octet_length_values: phase.retained_octet_length_values,
})
} else {
None
}
}
}
#[derive(CandidType, Clone, Copy, Debug, Default, Deserialize, Eq, PartialEq)]
pub struct GroupedCountAttribution {
pub borrowed_hash_computations: u64,
pub bucket_candidate_checks: u64,
pub existing_group_hits: u64,
pub new_group_inserts: u64,
pub row_materialization_local_instructions: u64,
pub group_lookup_local_instructions: u64,
pub existing_group_update_local_instructions: u64,
pub new_group_insert_local_instructions: u64,
}
impl GroupedCountAttribution {
pub(in crate::db) const fn from_executor(count: ExecutorGroupedCountAttribution) -> Self {
Self {
borrowed_hash_computations: count.borrowed_hash_computations,
bucket_candidate_checks: count.bucket_candidate_checks,
existing_group_hits: count.existing_group_hits,
new_group_inserts: count.new_group_inserts,
row_materialization_local_instructions: count.row_materialization_local_instructions,
group_lookup_local_instructions: count.group_lookup_local_instructions,
existing_group_update_local_instructions: count
.existing_group_update_local_instructions,
new_group_insert_local_instructions: count.new_group_insert_local_instructions,
}
}
}
#[derive(CandidType, Clone, Copy, Debug, Default, Deserialize, Eq, PartialEq)]
pub struct GroupedExecutionAttribution {
pub stream_local_instructions: u64,
pub fold_local_instructions: u64,
pub finalize_local_instructions: u64,
pub count: GroupedCountAttribution,
}
impl GroupedExecutionAttribution {
pub(in crate::db) const fn from_executor_phase(phase: GroupedExecutePhaseAttribution) -> Self {
Self::from_executor_parts(
phase.stream_local_instructions,
phase.fold_local_instructions,
phase.finalize_local_instructions,
phase.grouped_count,
)
}
pub(in crate::db) const fn from_executor_parts(
stream_local_instructions: u64,
fold_local_instructions: u64,
finalize_local_instructions: u64,
count: ExecutorGroupedCountAttribution,
) -> Self {
Self {
stream_local_instructions,
fold_local_instructions,
finalize_local_instructions,
count: GroupedCountAttribution::from_executor(count),
}
}
}
#[derive(CandidType, Clone, Debug, Default, Deserialize, Eq, PartialEq)]
pub struct ScalarAggregateAttribution {
pub base_row_local_instructions: u64,
pub reducer_fold_local_instructions: u64,
pub expression_evaluations: u64,
pub filter_evaluations: u64,
pub rows_ingested: u64,
pub terminal_count: u64,
pub unique_input_expr_count: u64,
pub unique_filter_expr_count: u64,
pub sink_mode: Option<String>,
}
impl ScalarAggregateAttribution {
pub(in crate::db) fn from_executor(
terminal: ScalarAggregateTerminalAttribution,
) -> Option<Self> {
if terminal.has_work() {
Some(Self {
base_row_local_instructions: terminal.base_row_local_instructions,
reducer_fold_local_instructions: terminal.reducer_fold_local_instructions,
expression_evaluations: terminal.expression_evaluations,
filter_evaluations: terminal.filter_evaluations,
rows_ingested: terminal.rows_ingested,
terminal_count: terminal.terminal_count,
unique_input_expr_count: terminal.unique_input_expr_count,
unique_filter_expr_count: terminal.unique_filter_expr_count,
sink_mode: terminal.sink_mode.label().map(str::to_string),
})
} else {
None
}
}
}
#[derive(CandidType, Clone, Debug, Default, Deserialize, Eq, PartialEq)]
pub struct FluentTerminalExecutionAttribution {
pub compile_local_instructions: u64,
pub compile_schema_catalog_local_instructions: u64,
pub compile_schema_info_local_instructions: u64,
pub compile_prepare_local_instructions: u64,
pub compile_cache_key_local_instructions: u64,
pub compile_cache_lookup_local_instructions: u64,
pub compile_plan_build_local_instructions: u64,
pub compile_cache_insert_local_instructions: u64,
pub plan_lookup_local_instructions: u64,
pub executor_invocation_local_instructions: u64,
pub execute_local_instructions: u64,
pub total_local_instructions: u64,
pub store_get_calls: u64,
pub index_store_get_calls: u64,
pub index_store_range_scan_calls: u64,
pub index_store_entry_reads: u64,
pub scalar_aggregate: Option<ScalarAggregateAttribution>,
pub shared_query_plan_cache_hits: u64,
pub shared_query_plan_cache_misses: u64,
}
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
pub(in crate::db::session::query) struct QueryAttributionCommon {
compile_phase_attribution: QueryPlanCompilePhaseAttribution,
plan_lookup_local_instructions: u64,
store_counters: StoreCounterSnapshot,
cache_attribution: QueryPlanCacheAttribution,
}
impl QueryAttributionCommon {
#[must_use]
pub(in crate::db::session::query) const fn new(
plan_lookup_local_instructions: u64,
compile_phase_attribution: QueryPlanCompilePhaseAttribution,
cache_attribution: QueryPlanCacheAttribution,
store_counters: StoreCounterSnapshot,
) -> Self {
Self {
compile_phase_attribution,
plan_lookup_local_instructions,
store_counters,
cache_attribution,
}
}
const fn compile_local_instructions(self) -> u64 {
self.plan_lookup_local_instructions
}
const fn total_local_instructions(self, execute_local_instructions: u64) -> u64 {
self.compile_local_instructions()
.saturating_add(execute_local_instructions)
}
}
#[derive(CandidType, Clone, Debug, Default, Deserialize, Eq, PartialEq)]
pub struct QueryExecutionAttribution {
pub compile_local_instructions: u64,
pub compile_schema_catalog_local_instructions: u64,
pub compile_schema_info_local_instructions: u64,
pub compile_prepare_local_instructions: u64,
pub compile_cache_key_local_instructions: u64,
pub compile_cache_lookup_local_instructions: u64,
pub compile_plan_build_local_instructions: u64,
pub compile_cache_insert_local_instructions: u64,
pub plan_lookup_local_instructions: u64,
pub executor_invocation_local_instructions: u64,
pub response_finalization_local_instructions: u64,
pub load_plan_local_instructions: u64,
pub row_layout_local_instructions: u64,
pub continuation_signature_local_instructions: u64,
pub scalar_runtime_handoff_local_instructions: u64,
pub route_plan_local_instructions: u64,
pub runtime_prepare_local_instructions: u64,
pub runtime_local_instructions: u64,
pub finalize_local_instructions: u64,
pub direct_data_row: Option<DirectDataRowAttribution>,
pub kernel_row: Option<KernelRowAttribution>,
pub grouped: Option<GroupedExecutionAttribution>,
pub response_decode_local_instructions: u64,
pub execute_local_instructions: u64,
pub total_local_instructions: u64,
pub store_get_calls: u64,
pub index_store_get_calls: u64,
pub index_store_range_scan_calls: u64,
pub index_store_entry_reads: u64,
pub shared_query_plan_cache_hits: u64,
pub shared_query_plan_cache_misses: u64,
}
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
struct QueryExecutePhaseAttribution {
executor_invocation_local_instructions: u64,
response_finalization_local_instructions: u64,
runtime_local_instructions: u64,
finalize_local_instructions: u64,
load_plan_local_instructions: u64,
row_layout_local_instructions: u64,
continuation_signature_local_instructions: u64,
scalar_runtime_handoff_local_instructions: u64,
route_plan_local_instructions: u64,
runtime_prepare_local_instructions: u64,
direct_data_row: Option<DirectDataRowAttribution>,
kernel_row: Option<KernelRowAttribution>,
grouped: Option<GroupedExecutionAttribution>,
}
impl FluentTerminalExecutionAttribution {
pub(in crate::db::session::query) const fn from_common(
common: QueryAttributionCommon,
executor_invocation_local_instructions: u64,
scalar_aggregate: Option<ScalarAggregateAttribution>,
) -> Self {
let execute_local_instructions = executor_invocation_local_instructions;
Self {
compile_local_instructions: common.compile_local_instructions(),
compile_schema_catalog_local_instructions: common
.compile_phase_attribution
.schema_catalog,
compile_schema_info_local_instructions: common.compile_phase_attribution.schema_info,
compile_prepare_local_instructions: common.compile_phase_attribution.prepare,
compile_cache_key_local_instructions: common.compile_phase_attribution.cache_key,
compile_cache_lookup_local_instructions: common.compile_phase_attribution.cache_lookup,
compile_plan_build_local_instructions: common.compile_phase_attribution.plan_build,
compile_cache_insert_local_instructions: common.compile_phase_attribution.cache_insert,
plan_lookup_local_instructions: common.plan_lookup_local_instructions,
executor_invocation_local_instructions,
execute_local_instructions,
total_local_instructions: common.total_local_instructions(execute_local_instructions),
store_get_calls: common.store_counters.data_store_get_calls,
index_store_get_calls: common.store_counters.index_store_get_calls,
index_store_range_scan_calls: common.store_counters.index_store_range_scan_calls,
index_store_entry_reads: common.store_counters.index_store_entry_reads,
scalar_aggregate,
shared_query_plan_cache_hits: common.cache_attribution.hits,
shared_query_plan_cache_misses: common.cache_attribution.misses,
}
}
}
impl QueryExecutionAttribution {
const fn from_common(
common: QueryAttributionCommon,
execute_phase_attribution: &QueryExecutePhaseAttribution,
response_decode_local_instructions: u64,
) -> Self {
let execute_local_instructions = execute_phase_attribution
.executor_invocation_local_instructions
.saturating_add(execute_phase_attribution.response_finalization_local_instructions);
Self {
compile_local_instructions: common.compile_local_instructions(),
compile_schema_catalog_local_instructions: common
.compile_phase_attribution
.schema_catalog,
compile_schema_info_local_instructions: common.compile_phase_attribution.schema_info,
compile_prepare_local_instructions: common.compile_phase_attribution.prepare,
compile_cache_key_local_instructions: common.compile_phase_attribution.cache_key,
compile_cache_lookup_local_instructions: common.compile_phase_attribution.cache_lookup,
compile_plan_build_local_instructions: common.compile_phase_attribution.plan_build,
compile_cache_insert_local_instructions: common.compile_phase_attribution.cache_insert,
plan_lookup_local_instructions: common.plan_lookup_local_instructions,
executor_invocation_local_instructions: execute_phase_attribution
.executor_invocation_local_instructions,
response_finalization_local_instructions: execute_phase_attribution
.response_finalization_local_instructions,
load_plan_local_instructions: execute_phase_attribution.load_plan_local_instructions,
row_layout_local_instructions: execute_phase_attribution.row_layout_local_instructions,
continuation_signature_local_instructions: execute_phase_attribution
.continuation_signature_local_instructions,
scalar_runtime_handoff_local_instructions: execute_phase_attribution
.scalar_runtime_handoff_local_instructions,
route_plan_local_instructions: execute_phase_attribution.route_plan_local_instructions,
runtime_prepare_local_instructions: execute_phase_attribution
.runtime_prepare_local_instructions,
runtime_local_instructions: execute_phase_attribution.runtime_local_instructions,
finalize_local_instructions: execute_phase_attribution.finalize_local_instructions,
direct_data_row: execute_phase_attribution.direct_data_row,
kernel_row: execute_phase_attribution.kernel_row,
grouped: execute_phase_attribution.grouped,
response_decode_local_instructions,
execute_local_instructions,
total_local_instructions: common.total_local_instructions(execute_local_instructions),
store_get_calls: common.store_counters.data_store_get_calls,
index_store_get_calls: common.store_counters.index_store_get_calls,
index_store_range_scan_calls: common.store_counters.index_store_range_scan_calls,
index_store_entry_reads: common.store_counters.index_store_entry_reads,
shared_query_plan_cache_hits: common.cache_attribution.hits,
shared_query_plan_cache_misses: common.cache_attribution.misses,
}
}
}
impl<C: CanisterKind> DbSession<C> {
const fn empty_query_execute_phase_attribution() -> QueryExecutePhaseAttribution {
QueryExecutePhaseAttribution {
executor_invocation_local_instructions: 0,
response_finalization_local_instructions: 0,
load_plan_local_instructions: 0,
row_layout_local_instructions: 0,
continuation_signature_local_instructions: 0,
scalar_runtime_handoff_local_instructions: 0,
route_plan_local_instructions: 0,
runtime_prepare_local_instructions: 0,
runtime_local_instructions: 0,
finalize_local_instructions: 0,
direct_data_row: None,
kernel_row: None,
grouped: None,
}
}
const fn scalar_query_execute_phase_attribution(
phase: ScalarExecutePhaseAttribution,
executor_invocation_local_instructions: u64,
) -> QueryExecutePhaseAttribution {
QueryExecutePhaseAttribution {
executor_invocation_local_instructions,
response_finalization_local_instructions: 0,
load_plan_local_instructions: phase.load_plan_local_instructions,
row_layout_local_instructions: phase.row_layout_local_instructions,
continuation_signature_local_instructions: phase
.continuation_signature_local_instructions,
scalar_runtime_handoff_local_instructions: phase
.scalar_runtime_handoff_local_instructions,
route_plan_local_instructions: phase.route_plan_local_instructions,
runtime_prepare_local_instructions: phase.runtime_prepare_local_instructions,
runtime_local_instructions: phase.runtime_local_instructions,
finalize_local_instructions: phase.finalize_local_instructions,
direct_data_row: Some(DirectDataRowAttribution::from_scalar_phase(phase)),
kernel_row: KernelRowAttribution::from_scalar_phase(phase),
grouped: None,
}
}
const fn grouped_query_execute_phase_attribution(
phase: GroupedExecutePhaseAttribution,
executor_invocation_local_instructions: u64,
response_finalization_local_instructions: u64,
) -> QueryExecutePhaseAttribution {
QueryExecutePhaseAttribution {
executor_invocation_local_instructions,
response_finalization_local_instructions,
load_plan_local_instructions: 0,
row_layout_local_instructions: 0,
continuation_signature_local_instructions: 0,
scalar_runtime_handoff_local_instructions: 0,
route_plan_local_instructions: 0,
runtime_prepare_local_instructions: 0,
runtime_local_instructions: phase
.stream_local_instructions
.saturating_add(phase.fold_local_instructions),
finalize_local_instructions: phase.finalize_local_instructions,
direct_data_row: None,
kernel_row: None,
grouped: Some(GroupedExecutionAttribution::from_executor_phase(phase)),
}
}
#[doc(hidden)]
pub fn execute_query_result_with_attribution<E>(
&self,
query: &Query<E>,
) -> Result<(LoadQueryResult<E>, QueryExecutionAttribution), QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
let (plan_lookup_local_instructions, plan_and_cache) = measure_query_stage(|| {
self.cached_prepared_query_plan_for_entity_with_compile_phase_attribution::<E>(query)
});
let (plan, cache_attribution, compile_phase_attribution) = plan_and_cache?;
let store_counters_before = StoreCounterSnapshot::capture();
let (executor_invocation_local_instructions, outcome) = measure_query_stage(|| {
self.execute_prepared(plan, true, PreparedQueryExecutionOutput::Rows)
});
let outcome = outcome?;
let store_counters = store_counters_before.delta_since();
let (result, execute_phase_attribution, response_decode_local_instructions) =
Self::query_execution_attribution_from_outcome(
outcome,
executor_invocation_local_instructions,
)?;
let common_attribution = QueryAttributionCommon::new(
plan_lookup_local_instructions,
compile_phase_attribution,
cache_attribution,
store_counters,
);
Ok((
result,
QueryExecutionAttribution::from_common(
common_attribution,
&execute_phase_attribution,
response_decode_local_instructions,
),
))
}
fn query_execution_attribution_from_outcome<E>(
outcome: PreparedQueryExecutionOutcome<E>,
executor_invocation_local_instructions: u64,
) -> Result<(LoadQueryResult<E>, QueryExecutePhaseAttribution, u64), QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
match outcome {
PreparedQueryExecutionOutcome::Scalar {
rows,
phase: Some(phase_attribution),
response_decode_local_instructions,
} => Ok((
LoadQueryResult::Rows(rows),
Self::scalar_query_execute_phase_attribution(
phase_attribution,
executor_invocation_local_instructions,
),
response_decode_local_instructions,
)),
PreparedQueryExecutionOutcome::Grouped {
result,
trace,
phase: Some(phase_attribution),
} => {
let (response_finalization_local_instructions, grouped) =
measure_query_stage(|| {
finalize_structural_grouped_projection_result(result, trace)
});
let grouped = grouped?;
Ok((
LoadQueryResult::Grouped(grouped),
Self::grouped_query_execute_phase_attribution(
phase_attribution,
executor_invocation_local_instructions,
response_finalization_local_instructions,
),
0,
))
}
PreparedQueryExecutionOutcome::Delete { rows } => Ok((
LoadQueryResult::Rows(rows),
QueryExecutePhaseAttribution {
executor_invocation_local_instructions,
..Self::empty_query_execute_phase_attribution()
},
0,
)),
PreparedQueryExecutionOutcome::DeleteCount { .. } => Err(QueryError::invariant()),
PreparedQueryExecutionOutcome::Scalar { phase: None, .. }
| PreparedQueryExecutionOutcome::Grouped { phase: None, .. } => {
Err(QueryError::invariant())
}
}
}
}