pub(in crate::db::executor) mod capability;
mod contracts;
mod distinct;
mod execution;
mod fast_path;
pub(in crate::db::executor) mod field;
mod field_extrema;
mod helpers;
mod materialized_distinct;
mod numeric;
mod projection;
pub(in crate::db::executor) mod runtime;
mod terminals;
use crate::db::executor::aggregate::field::{
AggregateFieldValueError, resolve_orderable_aggregate_target_slot_with_model,
};
use crate::{
db::{
data::DataRow,
executor::{
AccessScanContinuationInput, AccessStreamBindings, ExecutionKernel,
PreparedAggregatePlan,
pipeline::contracts::{
ExecutionInputs, ExecutionRuntimeAdapter, LoadExecutor,
ProjectionMaterializationMode,
},
plan_metrics::{record_plan_metrics, record_rows_scanned_for_path},
route::{AggregateRouteShape, aggregate_materialized_fold_direction},
terminal::RowLayout,
validate_executor_plan_for_authority,
},
index::IndexCompilePolicy,
query::builder::AggregateExpr,
},
error::InternalError,
traits::{EntityKind, EntityValue},
};
pub(in crate::db::executor) use capability::{
AggregateExecutionPolicyInputs, derive_aggregate_execution_policy_for_model,
field_target_is_tie_free_probe_target_for_model,
};
pub(in crate::db::executor) use contracts::{
AggregateFoldMode, AggregateKind, ExecutionConfig, ExecutionContext, FoldControl, GroupError,
ScalarAggregateEngine, ScalarAggregateOutput, execute_scalar_aggregate,
execute_scalar_aggregate as execute_aggregate_engine,
};
pub(in crate::db::executor) use execution::{
AggregateExecutionDescriptor, AggregateFastPathInputs, PreparedAggregateExecutionState,
PreparedAggregateStreamingInputs, PreparedAggregateStreamingInputsCore,
PreparedCoveringDistinctStrategy, PreparedFieldOrderSensitiveTerminalOp,
PreparedOrderSensitiveTerminalBoundary, PreparedOrderSensitiveTerminalExecutionState,
PreparedScalarNumericAggregateStrategy, PreparedScalarNumericBoundary,
PreparedScalarNumericExecutionState, PreparedScalarNumericOp, PreparedScalarNumericPayload,
PreparedScalarProjectionBoundary, PreparedScalarProjectionExecutionState,
PreparedScalarProjectionOp, PreparedScalarProjectionStrategy, PreparedScalarTerminalBoundary,
PreparedScalarTerminalExecutionState, PreparedScalarTerminalOp, PreparedScalarTerminalStrategy,
ScalarProjectionWindow,
};
pub(in crate::db) use numeric::ScalarNumericFieldBoundaryRequest;
pub(in crate::db) use projection::{
ScalarProjectionBoundaryOutput, ScalarProjectionBoundaryRequest,
};
pub(in crate::db) use terminals::{ScalarTerminalBoundaryOutput, ScalarTerminalBoundaryRequest};
impl<E> LoadExecutor<E>
where
E: EntityKind + EntityValue,
{
pub(in crate::db::executor::aggregate) fn prepare_scalar_aggregate_boundary(
&self,
plan: PreparedAggregatePlan,
) -> Result<PreparedAggregateStreamingInputs<'_>, InternalError> {
ExecutionKernel::prepare_aggregate_streaming_inputs(self, plan)
}
}
impl ExecutionKernel {
pub(in crate::db::executor::aggregate) fn prepare_aggregate_execution_state_from_prepared(
prepared: PreparedAggregateStreamingInputs<'_>,
aggregate: AggregateExpr,
) -> PreparedAggregateExecutionState<'_> {
let route_plan =
crate::db::executor::route::build_execution_route_plan_for_aggregate_spec_with_model(
prepared.authority.model(),
&prepared.logical_plan,
AggregateRouteShape::from_aggregate_expr(&aggregate),
&prepared.execution_preparation,
);
let direction = route_plan.direction();
PreparedAggregateExecutionState {
descriptor: AggregateExecutionDescriptor {
aggregate,
direction,
route_plan,
},
prepared,
}
}
pub(in crate::db::executor::aggregate) fn prepare_aggregate_streaming_inputs<E>(
executor: &'_ LoadExecutor<E>,
plan: PreparedAggregatePlan,
) -> Result<PreparedAggregateStreamingInputs<'_>, InternalError>
where
E: EntityKind + EntityValue,
{
let execution_preparation = plan.execution_preparation();
let (authority, logical_plan, index_prefix_specs, index_range_specs) =
plan.into_streaming_parts()?;
validate_executor_plan_for_authority(authority, &logical_plan)?;
let store = executor.db.recovered_store(authority.store_path())?;
let store_resolver = executor.db.store_resolver();
record_plan_metrics(&logical_plan.access);
Ok(PreparedAggregateStreamingInputs {
store_resolver,
authority,
store,
logical_plan,
execution_preparation,
index_prefix_specs,
index_range_specs,
})
}
fn execute_materialized_aggregate_spec<E>(
executor: &LoadExecutor<E>,
prepared: PreparedAggregateStreamingInputs<'_>,
aggregate: &AggregateExpr,
) -> Result<ScalarAggregateOutput, InternalError>
where
E: EntityKind + EntityValue,
{
let kind = aggregate.kind();
if let Some(target_field) = aggregate.target_field() {
let field_slot = resolve_orderable_aggregate_target_slot_with_model(
prepared.authority.model(),
target_field,
)
.map_err(AggregateFieldValueError::into_internal_error)?;
let row_layout = RowLayout::from_model(prepared.authority.model());
let page = executor.execute_scalar_materialized_page_stage(prepared)?;
let (rows, _) = page.into_parts();
return Self::aggregate_field_extrema_from_materialized(
rows,
&row_layout,
kind,
target_field,
field_slot,
);
}
let page = executor.execute_scalar_materialized_page_stage(prepared)?;
let (rows, _) = page.into_parts();
Self::aggregate_from_materialized(rows, kind)
}
fn aggregate_from_materialized(
rows: Vec<DataRow>,
kind: AggregateKind,
) -> Result<ScalarAggregateOutput, InternalError> {
let direction = aggregate_materialized_fold_direction(kind);
let mut ingest_all = |engine: &mut ScalarAggregateEngine| -> Result<(), InternalError> {
for (data_key, _) in &rows {
let fold_control = engine.ingest(data_key)?;
if matches!(fold_control, FoldControl::Break) {
break;
}
}
Ok(())
};
execute_aggregate_engine(
ScalarAggregateEngine::new_scalar(kind, direction),
&mut ingest_all,
)
}
pub(in crate::db::executor::aggregate) fn execute_prepared_aggregate_state<E>(
executor: &LoadExecutor<E>,
state: PreparedAggregateExecutionState<'_>,
) -> Result<ScalarAggregateOutput, InternalError>
where
E: EntityKind + EntityValue,
{
let kind = state.descriptor.aggregate.kind();
let descriptor = state.descriptor;
let prepared = state.prepared;
if descriptor.route_plan.shape().is_materialized() {
return Self::execute_materialized_aggregate_spec(
executor,
prepared,
&descriptor.aggregate,
);
}
if let Some(target_field) = descriptor.aggregate.target_field() {
return Self::execute_field_target_extrema_aggregate(
&prepared,
kind,
target_field,
descriptor.direction,
&descriptor.route_plan,
);
}
let fold_mode = descriptor.route_plan.aggregate_fold_mode;
let physical_fetch_hint = descriptor.route_plan.scan_hints.physical_fetch_hint;
let fast_path_inputs = AggregateFastPathInputs {
logical_plan: &prepared.logical_plan,
authority: prepared.authority,
store: prepared.store,
route_plan: &descriptor.route_plan,
index_prefix_specs: prepared.index_prefix_specs.as_slice(),
index_range_specs: prepared.index_range_specs.as_slice(),
index_predicate_program: prepared.execution_preparation.strict_mode(),
direction: descriptor.direction,
physical_fetch_hint,
kind,
fold_mode,
};
if let Some((aggregate_output, rows_scanned)) =
Self::try_fast_path_aggregate(&fast_path_inputs)?
{
record_rows_scanned_for_path(prepared.authority.entity_path(), rows_scanned);
return Ok(aggregate_output);
}
let runtime = ExecutionRuntimeAdapter::from_runtime_parts(
&prepared.logical_plan.access,
crate::db::executor::TraversalRuntime::new(
prepared.store,
prepared.authority.entity_tag(),
),
prepared.store,
prepared.authority.model(),
);
let execution_inputs = ExecutionInputs::new(
prepared.authority.model(),
&runtime,
&prepared.logical_plan,
AccessStreamBindings {
index_prefix_specs: prepared.index_prefix_specs.as_slice(),
index_range_specs: prepared.index_range_specs.as_slice(),
continuation: AccessScanContinuationInput::new(None, descriptor.direction),
},
&prepared.execution_preparation,
ProjectionMaterializationMode::SharedValidation,
true,
)?;
let mut resolved = Self::resolve_execution_key_stream(
&execution_inputs,
&descriptor.route_plan,
IndexCompilePolicy::StrictAllOrNone,
)?;
let (aggregate_output, keys_scanned) = Self::run_streaming_aggregate_reducer(
prepared.store,
&prepared.logical_plan,
kind,
descriptor.direction,
fold_mode,
resolved.key_stream_mut(),
)?;
let rows_scanned = resolved.rows_scanned_override().unwrap_or(keys_scanned);
record_rows_scanned_for_path(prepared.authority.entity_path(), rows_scanned);
Ok(aggregate_output)
}
}