pub(in crate::db::executor) mod capability;
mod contracts;
mod distinct;
mod execution;
mod fast_path;
pub(in crate::db::executor) mod field;
mod field_extrema;
mod helpers;
mod materialized_distinct;
mod numeric;
mod projection;
pub(in crate::db::executor) mod runtime;
mod terminals;
use crate::{
db::{
data::DataRow,
executor::{
AccessScanContinuationInput, AccessStreamBindings, ExecutionKernel,
PreparedAggregatePlan,
pipeline::contracts::{
ExecutionInputs, ExecutionRuntimeAdapter, LoadExecutor,
PreparedExecutionProjection, ProjectionMaterializationMode,
},
plan_metrics::{record_plan_metrics, record_rows_scanned_for_path},
planning::route::{
aggregate_materialized_fold_direction,
build_execution_route_plan_for_aggregate_spec,
},
terminal::RowLayout,
validate_executor_plan_for_authority,
},
index::IndexCompilePolicy,
},
error::InternalError,
traits::{EntityKind, EntityValue},
};
pub(in crate::db::executor) use capability::{
AggregateExecutionPolicyInputs, derive_aggregate_execution_policy,
field_target_is_tie_free_probe_target,
};
pub(in crate::db::executor) use contracts::{
AggregateFoldMode, AggregateKind, ExecutionConfig, ExecutionContext, FoldControl, GroupError,
ScalarAggregateEngine, ScalarAggregateOutput, execute_scalar_aggregate,
execute_scalar_aggregate as execute_aggregate_engine,
};
pub(in crate::db::executor) use execution::{
AggregateExecutionDescriptor, AggregateFastPathInputs, PreparedAggregateExecutionState,
PreparedAggregateSpec, PreparedAggregateStreamingInputs, PreparedAggregateTargetField,
PreparedCoveringDistinctStrategy, PreparedFieldOrderSensitiveTerminalOp,
PreparedOrderSensitiveTerminalBoundary, PreparedOrderSensitiveTerminalOp,
PreparedScalarNumericAggregateStrategy, PreparedScalarNumericBoundary, PreparedScalarNumericOp,
PreparedScalarNumericPayload, PreparedScalarProjectionBoundary, PreparedScalarProjectionOp,
PreparedScalarProjectionStrategy, PreparedScalarTerminalBoundary, PreparedScalarTerminalOp,
PreparedScalarTerminalStrategy, ScalarProjectionWindow,
};
pub(in crate::db) use numeric::ScalarNumericFieldBoundaryRequest;
pub(in crate::db) use projection::{
ScalarProjectionBoundaryOutput, ScalarProjectionBoundaryRequest,
};
pub(in crate::db) use terminals::{ScalarTerminalBoundaryOutput, ScalarTerminalBoundaryRequest};
impl<E> LoadExecutor<E>
where
E: EntityKind + EntityValue,
{
pub(in crate::db::executor::aggregate) fn prepare_scalar_aggregate_boundary(
&self,
plan: PreparedAggregatePlan,
) -> Result<PreparedAggregateStreamingInputs<'_>, InternalError> {
ExecutionKernel::prepare_aggregate_streaming_inputs(self, plan)
}
pub(in crate::db::executor::aggregate) fn load_materialized_aggregate_rows(
&self,
prepared: PreparedAggregateStreamingInputs<'_>,
) -> Result<(Vec<DataRow>, RowLayout), InternalError> {
let row_layout = prepared.authority.row_layout();
let page = self.execute_scalar_materialized_page_stage(prepared)?;
let (rows, _) = page.into_parts();
Ok((rows, row_layout))
}
}
impl ExecutionKernel {
pub(in crate::db::executor::aggregate) fn prepare_aggregate_execution_state_from_prepared(
prepared: PreparedAggregateStreamingInputs<'_>,
aggregate: PreparedAggregateSpec,
) -> PreparedAggregateExecutionState<'_> {
let route_plan = build_execution_route_plan_for_aggregate_spec(
&prepared.logical_plan,
aggregate.route_shape(),
&prepared.execution_preparation,
);
let direction = route_plan.direction();
PreparedAggregateExecutionState {
descriptor: AggregateExecutionDescriptor {
aggregate,
direction,
route_plan,
},
prepared,
}
}
pub(in crate::db::executor::aggregate) fn prepare_aggregate_streaming_inputs<E>(
executor: &'_ LoadExecutor<E>,
plan: PreparedAggregatePlan,
) -> Result<PreparedAggregateStreamingInputs<'_>, InternalError>
where
E: EntityKind + EntityValue,
{
let execution_preparation = plan.execution_preparation();
let (authority, logical_plan, index_prefix_specs, index_range_specs) =
plan.into_streaming_parts()?;
validate_executor_plan_for_authority(authority, &logical_plan)?;
let store = executor.db.recovered_store(authority.store_path())?;
let store_resolver = executor.db.store_resolver();
record_plan_metrics(&logical_plan.access);
Ok(PreparedAggregateStreamingInputs {
store_resolver,
authority,
store,
logical_plan,
execution_preparation,
index_prefix_specs,
index_range_specs,
})
}
fn execute_materialized_aggregate_spec<E>(
executor: &LoadExecutor<E>,
prepared: PreparedAggregateStreamingInputs<'_>,
aggregate: &PreparedAggregateSpec,
) -> Result<ScalarAggregateOutput, InternalError>
where
E: EntityKind + EntityValue,
{
let kind = aggregate.kind();
if let Some(target_field) = aggregate.target_field() {
let (rows, row_layout) = executor.load_materialized_aggregate_rows(prepared)?;
return Self::aggregate_field_extrema_from_materialized(
rows,
&row_layout,
kind,
target_field.target_field_name(),
target_field.field_slot(),
);
}
let (rows, _) = executor.load_materialized_aggregate_rows(prepared)?;
Self::aggregate_from_materialized(rows, kind)
}
fn aggregate_from_materialized(
rows: Vec<DataRow>,
kind: AggregateKind,
) -> Result<ScalarAggregateOutput, InternalError> {
let direction = aggregate_materialized_fold_direction(kind);
let ingest_all = |engine: &mut ScalarAggregateEngine| -> Result<(), InternalError> {
for (data_key, _) in &rows {
let fold_control = engine.ingest(data_key)?;
if matches!(fold_control, FoldControl::Break) {
break;
}
}
Ok(())
};
execute_aggregate_engine(
ScalarAggregateEngine::new_scalar(kind, direction),
ingest_all,
)
}
pub(in crate::db::executor::aggregate) fn execute_prepared_aggregate_state<E>(
executor: &LoadExecutor<E>,
state: PreparedAggregateExecutionState<'_>,
) -> Result<ScalarAggregateOutput, InternalError>
where
E: EntityKind + EntityValue,
{
let kind = state.descriptor.aggregate.kind();
let descriptor = state.descriptor;
let prepared = state.prepared;
if descriptor.route_plan.is_materialized() {
return Self::execute_materialized_aggregate_spec(
executor,
prepared,
&descriptor.aggregate,
);
}
if let Some(target_field) = descriptor.aggregate.target_field() {
return Self::execute_field_target_extrema_aggregate(
&prepared,
kind,
target_field.target_field_name(),
target_field.field_slot(),
descriptor.direction,
&descriptor.route_plan,
);
}
let fold_mode = descriptor.route_plan.aggregate_fold_mode;
let physical_fetch_hint = descriptor.route_plan.scan_hints.physical_fetch_hint;
let fast_path_inputs = AggregateFastPathInputs {
logical_plan: &prepared.logical_plan,
authority: prepared.authority,
store: prepared.store,
route_plan: &descriptor.route_plan,
index_prefix_specs: prepared.index_prefix_specs.as_slice(),
index_range_specs: prepared.index_range_specs.as_slice(),
index_predicate_program: prepared.execution_preparation.strict_mode(),
direction: descriptor.direction,
physical_fetch_hint,
kind,
fold_mode,
};
if let Some((aggregate_output, rows_scanned)) =
Self::try_fast_path_aggregate(&fast_path_inputs)?
{
record_rows_scanned_for_path(prepared.authority.entity_path(), rows_scanned);
return Ok(aggregate_output);
}
let runtime = ExecutionRuntimeAdapter::from_stream_runtime_parts(
&prepared.logical_plan.access,
crate::db::executor::TraversalRuntime::new(
prepared.store,
prepared.authority.entity_tag(),
),
);
let execution_inputs = ExecutionInputs::new_prepared(
&runtime,
&prepared.logical_plan,
AccessStreamBindings {
index_prefix_specs: prepared.index_prefix_specs.as_slice(),
index_range_specs: prepared.index_range_specs.as_slice(),
continuation: AccessScanContinuationInput::new(None, descriptor.direction),
},
&prepared.execution_preparation,
ProjectionMaterializationMode::SharedValidation,
PreparedExecutionProjection::empty(),
false,
);
let mut resolved = execution_inputs.resolve_execution_key_stream(
&descriptor.route_plan,
IndexCompilePolicy::StrictAllOrNone,
)?;
let (aggregate_output, keys_scanned) = Self::run_streaming_aggregate_reducer(
prepared.store,
&prepared.logical_plan,
kind,
descriptor.direction,
fold_mode,
resolved.key_stream_mut(),
)?;
let rows_scanned = resolved.rows_scanned_override().unwrap_or(keys_scanned);
record_rows_scanned_for_path(prepared.authority.entity_path(), rows_scanned);
Ok(aggregate_output)
}
}