use crate::db::registry::StoreHandle;
use crate::{
db::executor::{
EntityAuthority, ExecutionTrace, LoadCursorInput, PreparedLoadPlan,
aggregate::runtime::{
GroupedOutputRuntimeObserverBindings, build_grouped_stream_with_runtime,
execute_group_fold_stage, finalize_grouped_output_with_observer,
},
pipeline::contracts::{
ExecutionRuntimeAdapter, GroupedCursorPage, GroupedFoldStage, GroupedRouteStage,
GroupedStreamStage, LoadExecutor, StructuralGroupedRowRuntime,
},
pipeline::entrypoints::{LoadExecutionMode, LoadTracingMode},
pipeline::orchestrator::LoadExecutionSurface,
pipeline::timing::{elapsed_execution_micros, start_execution_timer},
preparation::slot_map_for_model_plan,
stream::access::TraversalRuntime,
},
error::InternalError,
traits::{EntityKind, EntityValue},
};
struct GroupedPathRuntimeCore {
traversal_runtime: TraversalRuntime,
row_store: StoreHandle,
authority: EntityAuthority,
output_observer: GroupedOutputRuntimeObserverBindings,
}
pub(in crate::db::executor) struct PreparedGroupedRouteRuntime {
route: GroupedRouteStage,
runtime: GroupedPathRuntimeCore,
slot_map: Option<Vec<usize>>,
}
impl GroupedPathRuntimeCore {
fn build_grouped_stream<'a>(
&'a self,
route: &GroupedRouteStage,
slot_map: Option<Vec<usize>>,
) -> Result<GroupedStreamStage<'a>, InternalError> {
let runtime = ExecutionRuntimeAdapter::from_stream_runtime_parts(
&route.plan().access,
self.traversal_runtime,
self.authority.model(),
);
build_grouped_stream_with_runtime(
route,
&runtime,
self.authority.model(),
slot_map,
Box::new(StructuralGroupedRowRuntime::new(
self.row_store,
self.authority.model(),
)),
)
}
fn finalize_grouped_output(
&self,
route: GroupedRouteStage,
folded: GroupedFoldStage,
execution_time_micros: u64,
) -> (GroupedCursorPage, Option<ExecutionTrace>) {
finalize_grouped_output_with_observer(
&self.output_observer,
route,
folded,
execution_time_micros,
)
}
}
fn execute_grouped_route_path(
runtime: &GroupedPathRuntimeCore,
route: GroupedRouteStage,
slot_map: Option<Vec<usize>>,
) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), InternalError> {
let execution_started_at = start_execution_timer();
let stream = runtime.build_grouped_stream(&route, slot_map)?;
let folded = execute_group_fold_stage(&route, stream)?;
let execution_time_micros = elapsed_execution_micros(execution_started_at);
Ok(runtime.finalize_grouped_output(route, folded, execution_time_micros))
}
pub(in crate::db::executor) fn execute_prepared_grouped_route_runtime(
prepared: PreparedGroupedRouteRuntime,
) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), InternalError> {
let PreparedGroupedRouteRuntime {
route,
runtime,
slot_map,
} = prepared;
execute_grouped_route_path(&runtime, route, slot_map)
}
impl<E> LoadExecutor<E>
where
E: EntityKind + EntityValue,
{
fn grouped_path_runtime(&self) -> Result<GroupedPathRuntimeCore, InternalError> {
let authority = EntityAuthority::for_type::<E>();
let store = self.db.recovered_store(authority.store_path())?;
Ok(GroupedPathRuntimeCore {
traversal_runtime: TraversalRuntime::new(store, authority.entity_tag()),
row_store: store,
authority,
output_observer: GroupedOutputRuntimeObserverBindings::for_path(
authority.entity_path(),
),
})
}
pub(in crate::db::executor) fn prepare_grouped_route_runtime(
&self,
route: GroupedRouteStage,
) -> Result<PreparedGroupedRouteRuntime, InternalError> {
let authority = EntityAuthority::for_type::<E>();
let slot_map = slot_map_for_model_plan(authority.model(), route.plan());
Ok(PreparedGroupedRouteRuntime {
route,
runtime: self.grouped_path_runtime()?,
slot_map,
})
}
pub(in crate::db::executor) fn execute_load_grouped_page_with_trace(
&self,
plan: PreparedLoadPlan,
cursor: LoadCursorInput,
) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), InternalError> {
let surface = self.execute_load_surface(
plan,
cursor,
LoadExecutionMode::grouped_paged(LoadTracingMode::Enabled),
)?;
Self::expect_grouped_traced_surface(surface)
}
fn expect_grouped_traced_surface(
surface: LoadExecutionSurface,
) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), InternalError> {
match surface {
LoadExecutionSurface::GroupedPageWithTrace(page, trace) => Ok((page, trace)),
LoadExecutionSurface::ScalarPageWithTrace(..) => {
Err(InternalError::query_executor_invariant(
"grouped traced entrypoint must produce grouped traced page surface",
))
}
}
}
}