use crate::db::registry::StoreHandle;
use crate::{
db::{
executor::{
EntityAuthority, ExecutionPreparation, ExecutionTrace, LoadCursorInput,
PreparedLoadPlan, RetainedSlotLayout,
aggregate::runtime::{
GroupedOutputRuntimeObserverBindings, build_grouped_stream_with_runtime,
execute_group_fold_stage, finalize_grouped_output_with_observer,
},
pipeline::contracts::{
ExecutionRuntimeAdapter, GroupedCursorPage, GroupedFoldStage, GroupedRouteStage,
GroupedStreamStage, LoadExecutor, StructuralGroupedRowRuntime,
},
pipeline::entrypoints::{LoadSurfaceMode, LoadTracingMode},
pipeline::grouped_runtime::resolve_grouped_route_for_plan,
pipeline::orchestrator::LoadExecutionSurface,
pipeline::timing::{elapsed_execution_micros, start_execution_timer},
stream::access::TraversalRuntime,
terminal::RowLayout,
},
query::plan::AccessPlannedQuery,
},
error::InternalError,
traits::{CanisterKind, EntityKind, EntityValue},
};
struct GroupedPathRuntimeCore {
traversal_runtime: TraversalRuntime,
row_store: StoreHandle,
authority: EntityAuthority,
output_observer: GroupedOutputRuntimeObserverBindings,
}
pub(in crate::db::executor) struct PreparedGroupedRouteRuntime {
route: GroupedRouteStage,
runtime: GroupedPathRuntimeCore,
execution_preparation: ExecutionPreparation,
}
impl GroupedPathRuntimeCore {
fn build_grouped_stream(
&self,
route: &GroupedRouteStage,
execution_preparation: ExecutionPreparation,
) -> Result<GroupedStreamStage, InternalError> {
let runtime = ExecutionRuntimeAdapter::from_stream_runtime_parts(
&route.plan().access,
self.traversal_runtime,
);
let grouped_slot_layout = compile_grouped_row_slot_layout(
self.authority.row_layout(),
route,
&execution_preparation,
);
build_grouped_stream_with_runtime(
route,
&runtime,
execution_preparation,
StructuralGroupedRowRuntime::new(
self.row_store,
self.authority.row_layout(),
grouped_slot_layout,
),
)
}
fn finalize_grouped_output(
&self,
route: GroupedRouteStage,
folded: GroupedFoldStage,
execution_time_micros: u64,
) -> (GroupedCursorPage, Option<ExecutionTrace>) {
finalize_grouped_output_with_observer(
&self.output_observer,
route,
folded,
execution_time_micros,
)
}
}
fn compile_grouped_row_slot_layout(
row_layout: RowLayout,
route: &GroupedRouteStage,
execution_preparation: &ExecutionPreparation,
) -> RetainedSlotLayout {
let field_count = row_layout.field_count();
let mut required_slots = vec![false; field_count];
for field in route.group_fields() {
if let Some(required_slot) = required_slots.get_mut(field.index()) {
*required_slot = true;
}
}
if let Some(compiled_predicate) = execution_preparation.compiled_predicate() {
compiled_predicate.mark_referenced_slots(&mut required_slots);
}
for aggregate in route.grouped_aggregate_execution_specs() {
let Some(target_field) = aggregate.target_field() else {
continue;
};
if let Some(required_slot) = required_slots.get_mut(target_field.index()) {
*required_slot = true;
}
}
if let Some(target_field) = route
.grouped_distinct_execution_strategy()
.global_distinct_target_slot()
&& let Some(required_slot) = required_slots.get_mut(target_field.index())
{
*required_slot = true;
}
RetainedSlotLayout::compile(
field_count,
required_slots
.into_iter()
.enumerate()
.filter_map(|(slot, required)| required.then_some(slot))
.collect(),
)
}
fn execute_grouped_route_path(
runtime: &GroupedPathRuntimeCore,
route: GroupedRouteStage,
execution_preparation: ExecutionPreparation,
) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), InternalError> {
let execution_started_at = start_execution_timer();
let stream = runtime.build_grouped_stream(&route, execution_preparation)?;
let folded = execute_group_fold_stage(&route, stream)?;
let execution_time_micros = elapsed_execution_micros(execution_started_at);
Ok(runtime.finalize_grouped_output(route, folded, execution_time_micros))
}
pub(in crate::db::executor) fn execute_prepared_grouped_route_runtime(
prepared: PreparedGroupedRouteRuntime,
) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), InternalError> {
let PreparedGroupedRouteRuntime {
route,
runtime,
execution_preparation,
} = prepared;
execute_grouped_route_path(&runtime, route, execution_preparation)
}
#[cfg(feature = "sql")]
pub(in crate::db) fn execute_initial_grouped_rows_for_canister<C>(
db: &crate::db::Db<C>,
debug: bool,
authority: EntityAuthority,
plan: AccessPlannedQuery,
) -> Result<GroupedCursorPage, InternalError>
where
C: CanisterKind,
{
let plan = PreparedLoadPlan::from_plan(authority, plan);
let route = resolve_grouped_route_for_plan(
plan,
crate::db::cursor::GroupedPlannedCursor::none(),
debug,
)?;
let execution_preparation = ExecutionPreparation::from_runtime_plan(
route.plan(),
route.plan().slot_map().map(<[usize]>::to_vec),
);
let store = db.recovered_store(authority.store_path())?;
let prepared = PreparedGroupedRouteRuntime {
route,
runtime: GroupedPathRuntimeCore {
traversal_runtime: TraversalRuntime::new(store, authority.entity_tag()),
row_store: store,
authority,
output_observer: GroupedOutputRuntimeObserverBindings::for_path(
authority.entity_path(),
),
},
execution_preparation,
};
let (page, _) = execute_prepared_grouped_route_runtime(prepared)?;
Ok(page)
}
impl<E> LoadExecutor<E>
where
E: EntityKind + EntityValue,
{
fn grouped_path_runtime(&self) -> Result<GroupedPathRuntimeCore, InternalError> {
let authority = EntityAuthority::for_type::<E>();
let store = self.db.recovered_store(authority.store_path())?;
Ok(GroupedPathRuntimeCore {
traversal_runtime: TraversalRuntime::new(store, authority.entity_tag()),
row_store: store,
authority,
output_observer: GroupedOutputRuntimeObserverBindings::for_path(
authority.entity_path(),
),
})
}
pub(in crate::db::executor) fn prepare_grouped_route_runtime(
&self,
route: GroupedRouteStage,
) -> Result<PreparedGroupedRouteRuntime, InternalError> {
let execution_preparation = ExecutionPreparation::from_runtime_plan(
route.plan(),
route.plan().slot_map().map(<[usize]>::to_vec),
);
Ok(PreparedGroupedRouteRuntime {
route,
runtime: self.grouped_path_runtime()?,
execution_preparation,
})
}
pub(in crate::db::executor) fn execute_load_grouped_page_with_trace(
&self,
plan: PreparedLoadPlan,
cursor: LoadCursorInput,
) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), InternalError> {
let surface = self.execute_load_surface(
plan,
cursor,
LoadSurfaceMode::grouped_paged(LoadTracingMode::Enabled),
)?;
Self::expect_grouped_traced_surface(surface)
}
fn expect_grouped_traced_surface(
surface: LoadExecutionSurface,
) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), InternalError> {
match surface {
LoadExecutionSurface::GroupedPageWithTrace(page, trace) => Ok((page, trace)),
LoadExecutionSurface::ScalarPageWithTrace(..) => {
Err(InternalError::query_executor_invariant(
"grouped traced entrypoint must produce grouped traced page surface",
))
}
}
}
}