mod grouped;
mod scalar;
#[cfg(feature = "diagnostics")]
use crate::db::executor::pipeline::entrypoints::scalar::execute_prepared_scalar_rows_for_canister_with_phase_attribution;
use crate::{
db::{
PersistedRow,
cursor::{GroupedPlannedCursor, PlannedCursor},
data::decode_data_rows_into_entity_response,
executor::{
CursorPage, ExecutionTrace, LoadCursorInput, PreparedExecutionPlan,
pipeline::{
contracts::{GroupedCursorPage, LoadExecutor},
orchestrator::{LoadExecutionContext, LoadExecutionPayload, LoadPayloadState},
},
},
response::EntityResponse,
},
error::InternalError,
traits::EntityValue,
};
pub(in crate::db::executor) use crate::db::executor::pipeline::orchestrator::{
LoadSurfaceMode, LoadTracingMode,
};
#[cfg(test)]
pub(in crate::db::executor) use crate::db::executor::pipeline::orchestrator::{
load_execute_stage_order_guard, load_pipeline_state_optional_slot_count_guard,
};
#[cfg(feature = "sql")]
pub(in crate::db) use grouped::execute_initial_grouped_rows_for_canister;
#[cfg(all(feature = "sql", feature = "diagnostics"))]
pub(in crate::db) use grouped::execute_initial_grouped_rows_for_canister_with_phase_attribution;
#[cfg(feature = "diagnostics")]
pub(in crate::db) use grouped::{GroupedCountAttribution, GroupedExecutePhaseAttribution};
pub(in crate::db::executor) use grouped::{
PreparedGroupedRouteRuntime, execute_prepared_grouped_route_runtime,
prepare_grouped_route_runtime_for_load_plan,
};
#[cfg(feature = "diagnostics")]
pub(in crate::db) use scalar::ScalarExecutePhaseAttribution;
#[cfg(feature = "sql")]
pub(in crate::db) use scalar::execute_initial_scalar_retained_slot_page_for_canister;
pub(in crate::db::executor) use scalar::{
PreparedScalarMaterializedBoundary, PreparedScalarRouteRuntime,
execute_prepared_scalar_route_runtime, execute_prepared_scalar_rows_for_canister,
};
#[expect(
clippy::large_enum_variant,
reason = "prepared runtimes stay inline so the entrypoint boundary owns the scalar/grouped split directly"
)]
pub(in crate::db::executor) enum PreparedLoadRouteRuntime {
Scalar(PreparedScalarRouteRuntime),
Grouped(PreparedGroupedRouteRuntime),
}
impl PreparedLoadRouteRuntime {
pub(in crate::db::executor::pipeline) const fn scalar(
prepared: PreparedScalarRouteRuntime,
) -> Self {
Self::Scalar(prepared)
}
pub(in crate::db::executor::pipeline) const fn grouped(
prepared: PreparedGroupedRouteRuntime,
) -> Self {
Self::Grouped(prepared)
}
fn execute_payload(
self,
) -> Result<(LoadExecutionPayload, Option<ExecutionTrace>), InternalError> {
match self {
Self::Scalar(prepared) => {
let (page, trace) = execute_prepared_scalar_route_runtime(prepared)?;
Ok((LoadExecutionPayload::scalar(page), trace))
}
Self::Grouped(prepared) => {
let (page, trace) = execute_prepared_grouped_route_runtime(prepared)?;
Ok((LoadExecutionPayload::grouped(page), trace))
}
}
}
pub(in crate::db::executor::pipeline) fn execute(
self,
context: LoadExecutionContext,
) -> Result<LoadPayloadState, InternalError> {
let (payload, trace) = self.execute_payload()?;
Ok(LoadPayloadState::new(context, payload, trace))
}
}
#[cfg(feature = "diagnostics")]
#[expect(
clippy::missing_const_for_fn,
reason = "the wasm32 branch reads the runtime performance counter and cannot be const"
)]
fn read_load_entry_local_instruction_counter() -> u64 {
#[cfg(target_arch = "wasm32")]
{
canic_cdk::api::performance_counter(1)
}
#[cfg(not(target_arch = "wasm32"))]
{
0
}
}
#[cfg(feature = "diagnostics")]
fn measure_load_entry_phase<T, E>(run: impl FnOnce() -> Result<T, E>) -> (u64, Result<T, E>) {
let start = read_load_entry_local_instruction_counter();
let result = run();
let delta = read_load_entry_local_instruction_counter().saturating_sub(start);
(delta, result)
}
#[cfg(feature = "diagnostics")]
fn resolve_grouped_perf_cursor(
plan: &crate::db::executor::PreparedLoadPlan,
cursor: LoadCursorInput,
) -> Result<crate::db::executor::PreparedLoadCursor, InternalError> {
crate::db::executor::LoadCursorResolver::resolve_load_cursor_context(
plan,
cursor,
LoadSurfaceMode::grouped_paged(LoadTracingMode::Enabled),
)
}
impl<E> LoadExecutor<E>
where
E: PersistedRow + EntityValue,
{
pub(crate) fn execute(
&self,
plan: PreparedExecutionPlan<E>,
) -> Result<EntityResponse<E>, InternalError> {
let page = execute_prepared_scalar_rows_for_canister(
&self.db,
self.debug,
plan.into_prepared_load_plan(),
)?;
let (data_rows, _) = page.into_parts();
decode_data_rows_into_entity_response::<E>(data_rows)
}
#[cfg(feature = "diagnostics")]
pub(in crate::db) fn execute_with_phase_attribution(
&self,
plan: PreparedExecutionPlan<E>,
) -> Result<(EntityResponse<E>, ScalarExecutePhaseAttribution, u64), InternalError> {
let (page, phase_attribution) =
execute_prepared_scalar_rows_for_canister_with_phase_attribution(
&self.db,
self.debug,
plan.into_prepared_load_plan(),
)?;
let (data_rows, _) = page.into_parts();
let (response_decode_local_instructions, response) =
measure_load_entry_phase(|| decode_data_rows_into_entity_response::<E>(data_rows));
let response = response?;
Ok((
response,
phase_attribution,
response_decode_local_instructions,
))
}
pub(in crate::db) fn execute_paged_with_cursor_traced(
&self,
plan: PreparedExecutionPlan<E>,
cursor: impl Into<PlannedCursor>,
) -> Result<(CursorPage<E>, Option<ExecutionTrace>), InternalError> {
self.execute_load_scalar_page_with_trace(
plan.into_prepared_load_plan(),
LoadCursorInput::scalar(cursor),
)
}
#[cfg(test)]
pub(in crate::db) fn execute_paged_with_cursor(
&self,
plan: PreparedExecutionPlan<E>,
cursor: impl Into<PlannedCursor>,
) -> Result<CursorPage<E>, InternalError> {
let (page, _) = self.execute_paged_with_cursor_traced(plan, cursor)?;
Ok(page)
}
pub(in crate::db) fn execute_grouped_paged_with_cursor_traced(
&self,
plan: PreparedExecutionPlan<E>,
cursor: impl Into<GroupedPlannedCursor>,
) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), InternalError> {
self.execute_load_grouped_page_with_trace(
plan.into_prepared_load_plan(),
LoadCursorInput::grouped(cursor),
)
}
#[cfg(feature = "diagnostics")]
pub(in crate::db) fn execute_grouped_paged_with_cursor_traced_with_phase_attribution(
&self,
plan: PreparedExecutionPlan<E>,
cursor: impl Into<GroupedPlannedCursor>,
) -> Result<
(
GroupedCursorPage,
Option<ExecutionTrace>,
GroupedExecutePhaseAttribution,
),
InternalError,
> {
self.execute_load_grouped_page_with_trace_with_phase_attribution(
plan.into_prepared_load_plan(),
LoadCursorInput::grouped(cursor),
)
}
}