mod grouped;
mod scalar;
#[cfg(feature = "diagnostics")]
use crate::db::diagnostics::measure_local_instruction_delta as measure_load_entry_phase;
#[cfg(feature = "diagnostics")]
use crate::db::executor::pipeline::entrypoints::scalar::execute_prepared_scalar_rows_for_canister_with_phase_attribution;
use crate::{
db::{
PersistedRow,
cursor::{GroupedPlannedCursor, PlannedCursor},
executor::{
CursorPage, ExecutionTrace, LoadCursorInput, PreparedExecutionPlan,
pipeline::{
contracts::{LoadExecutor, StructuralGroupedProjectionResult},
orchestrator::{LoadExecutionContext, LoadExecutionPayload, LoadPayloadState},
},
terminal::decode_data_rows_into_entity_response,
},
response::EntityResponse,
},
error::InternalError,
traits::EntityValue,
};
pub(in crate::db::executor) use crate::db::executor::pipeline::orchestrator::{
LoadSurfaceMode, LoadTracingMode,
};
#[cfg(feature = "diagnostics")]
pub(in crate::db) use grouped::{GroupedCountAttribution, GroupedExecutePhaseAttribution};
pub(in crate::db::executor) use grouped::{
PreparedGroupedRouteRuntime, execute_prepared_grouped_route_runtime,
prepare_grouped_route_runtime_for_load_plan,
};
#[cfg(feature = "diagnostics")]
pub(in crate::db) use scalar::ScalarExecutePhaseAttribution;
#[cfg(feature = "sql")]
pub(in crate::db::executor) use scalar::execute_initial_scalar_retained_slot_page_from_runtime_parts_for_canister;
#[cfg(feature = "sql")]
pub(in crate::db::executor) use scalar::execute_prepared_scalar_aggregate_kernel_row_sink_for_canister;
pub(in crate::db::executor) use scalar::{
PreparedScalarMaterializedBoundary, PreparedScalarRouteRuntime,
execute_prepared_scalar_route_runtime, execute_prepared_scalar_rows_for_canister,
};
#[expect(
clippy::large_enum_variant,
reason = "prepared runtimes stay inline so the entrypoint boundary owns the scalar/grouped split directly"
)]
pub(in crate::db::executor) enum PreparedLoadRouteRuntime {
Scalar(PreparedScalarRouteRuntime),
Grouped(PreparedGroupedRouteRuntime),
}
impl PreparedLoadRouteRuntime {
pub(in crate::db::executor::pipeline) const fn scalar(
prepared: PreparedScalarRouteRuntime,
) -> Self {
Self::Scalar(prepared)
}
pub(in crate::db::executor::pipeline) const fn grouped(
prepared: PreparedGroupedRouteRuntime,
) -> Self {
Self::Grouped(prepared)
}
fn execute_payload(
self,
) -> Result<(LoadExecutionPayload, Option<ExecutionTrace>), InternalError> {
match self {
Self::Scalar(prepared) => {
let (page, trace) = execute_prepared_scalar_route_runtime(prepared)?;
Ok((LoadExecutionPayload::scalar(page), trace))
}
Self::Grouped(prepared) => {
let (page, trace) = execute_prepared_grouped_route_runtime(prepared)?;
Ok((LoadExecutionPayload::grouped(page), trace))
}
}
}
pub(in crate::db::executor::pipeline) fn execute(
self,
context: LoadExecutionContext,
) -> Result<LoadPayloadState, InternalError> {
let (payload, trace) = self.execute_payload()?;
Ok(LoadPayloadState::new(context, payload, trace))
}
}
#[cfg(feature = "diagnostics")]
fn resolve_grouped_perf_cursor(
plan: &crate::db::executor::PreparedLoadPlan,
cursor: LoadCursorInput,
) -> Result<crate::db::executor::PreparedLoadCursor, InternalError> {
crate::db::executor::LoadCursorResolver::resolve_load_cursor_context(
plan,
cursor,
LoadSurfaceMode::grouped_paged(LoadTracingMode::Enabled),
)
}
impl<E> LoadExecutor<E>
where
E: PersistedRow + EntityValue,
{
pub(in crate::db) fn execute(
&self,
plan: PreparedExecutionPlan<E>,
) -> Result<EntityResponse<E>, InternalError> {
let page = execute_prepared_scalar_rows_for_canister(
&self.db,
self.debug,
plan.into_prepared_load_plan(),
)?;
let (data_rows, _) = page.into_parts();
decode_data_rows_into_entity_response::<E>(data_rows)
}
#[cfg(feature = "diagnostics")]
pub(in crate::db) fn execute_with_phase_attribution(
&self,
plan: PreparedExecutionPlan<E>,
) -> Result<(EntityResponse<E>, ScalarExecutePhaseAttribution, u64), InternalError> {
let (page, phase_attribution) =
execute_prepared_scalar_rows_for_canister_with_phase_attribution(
&self.db,
self.debug,
plan.into_prepared_load_plan(),
)?;
let (data_rows, _) = page.into_parts();
let (response_decode_local_instructions, response) =
measure_load_entry_phase(|| decode_data_rows_into_entity_response::<E>(data_rows));
let response = response?;
Ok((
response,
phase_attribution,
response_decode_local_instructions,
))
}
pub(in crate::db) fn execute_paged_with_cursor_traced(
&self,
plan: PreparedExecutionPlan<E>,
cursor: PlannedCursor,
) -> Result<(CursorPage<E>, Option<ExecutionTrace>), InternalError> {
self.execute_load_scalar_page_with_trace(
plan.into_prepared_load_plan(),
LoadCursorInput::scalar(cursor),
)
}
#[cfg(test)]
pub(in crate::db) fn execute_paged_with_cursor(
&self,
plan: PreparedExecutionPlan<E>,
cursor: PlannedCursor,
) -> Result<CursorPage<E>, InternalError> {
let (page, _) = self.execute_paged_with_cursor_traced(plan, cursor)?;
Ok(page)
}
pub(in crate::db) fn execute_grouped_paged_with_cursor_traced(
&self,
plan: PreparedExecutionPlan<E>,
cursor: impl Into<GroupedPlannedCursor>,
) -> Result<(StructuralGroupedProjectionResult, Option<ExecutionTrace>), InternalError> {
let (page, trace) = self.execute_load_grouped_page_with_trace(
plan.into_prepared_load_plan(),
LoadCursorInput::grouped(cursor),
)?;
Ok((StructuralGroupedProjectionResult::from_page(page), trace))
}
#[cfg(feature = "diagnostics")]
pub(in crate::db) fn execute_grouped_paged_with_cursor_traced_with_phase_attribution(
&self,
plan: PreparedExecutionPlan<E>,
cursor: impl Into<GroupedPlannedCursor>,
) -> Result<
(
StructuralGroupedProjectionResult,
Option<ExecutionTrace>,
GroupedExecutePhaseAttribution,
),
InternalError,
> {
let (page, trace, phase_attribution) = self
.execute_load_grouped_page_with_trace_with_phase_attribution(
plan.into_prepared_load_plan(),
LoadCursorInput::grouped(cursor),
)?;
Ok((
StructuralGroupedProjectionResult::from_page(page),
trace,
phase_attribution,
))
}
}