use crate::{
db::query::{
builder::AggregateExpr,
plan::{
AccessPlannedQuery, AggregateKind, FieldSlot, GroupAggregateSpec,
GroupDistinctAdmissibility, GroupDistinctPolicyReason, GroupHavingSpec,
GroupedExecutionConfig, GroupedPlanStrategyHint,
expr::{Expr, ProjectionField, ProjectionSpec},
grouped_distinct_admissibility, grouped_plan_strategy_hint,
resolve_global_distinct_field_aggregate, validate_grouped_projection_layout,
},
},
error::InternalError,
};
#[derive(Clone, Debug, Eq, PartialEq)]
pub(in crate::db) struct PlannedProjectionLayout {
pub(in crate::db) group_field_positions: Vec<usize>,
pub(in crate::db) aggregate_positions: Vec<usize>,
}
impl PlannedProjectionLayout {
#[must_use]
pub(in crate::db) const fn group_field_positions(&self) -> &[usize] {
self.group_field_positions.as_slice()
}
#[must_use]
pub(in crate::db) const fn aggregate_positions(&self) -> &[usize] {
self.aggregate_positions.as_slice()
}
pub(in crate::db) fn group_field_count_mismatch(
layout_count: usize,
handoff_count: usize,
) -> InternalError {
InternalError::planner_executor_invariant(format!(
"grouped projection layout group-field count mismatch: layout={layout_count}, handoff={handoff_count}",
))
}
pub(in crate::db) fn aggregate_count_mismatch(
layout_count: usize,
handoff_count: usize,
) -> InternalError {
InternalError::planner_executor_invariant(format!(
"grouped projection layout aggregate count mismatch: layout={layout_count}, handoff={handoff_count}",
))
}
pub(in crate::db) fn group_field_positions_not_strictly_increasing() -> InternalError {
InternalError::planner_executor_invariant(
"grouped projection layout group-field positions must be strictly increasing",
)
}
pub(in crate::db) fn aggregate_positions_not_strictly_increasing() -> InternalError {
InternalError::planner_executor_invariant(
"grouped projection layout aggregate positions must be strictly increasing",
)
}
pub(in crate::db) fn group_fields_must_precede_aggregates() -> InternalError {
InternalError::planner_executor_invariant(
"grouped projection layout must keep group fields before aggregate terminals",
)
}
pub(in crate::db) fn projected_position_out_of_bounds(
position_kind: &str,
position: usize,
projected_len: usize,
) -> InternalError {
InternalError::query_executor_invariant(format!(
"grouped projection layout {position_kind} position out of bounds: position={position}, projected_len={projected_len}",
))
}
}
#[derive(Clone)]
pub(in crate::db) struct GroupedExecutorHandoff<'a> {
base: &'a AccessPlannedQuery,
group_fields: &'a [FieldSlot],
aggregate_exprs: Vec<AggregateExpr>,
projection_layout: PlannedProjectionLayout,
projection_layout_valid: bool,
grouped_plan_strategy_hint: GroupedPlanStrategyHint,
grouped_distinct_policy_contract: GroupedDistinctPolicyContract,
having: Option<&'a GroupHavingSpec>,
execution: GroupedExecutionConfig,
}
impl<'a> GroupedExecutorHandoff<'a> {
#[must_use]
pub(in crate::db) const fn base(&self) -> &'a AccessPlannedQuery {
self.base
}
#[must_use]
pub(in crate::db) const fn group_fields(&self) -> &'a [FieldSlot] {
self.group_fields
}
#[must_use]
pub(in crate::db) const fn aggregate_exprs(&self) -> &[AggregateExpr] {
self.aggregate_exprs.as_slice()
}
#[must_use]
pub(in crate::db) const fn projection_layout(&self) -> &PlannedProjectionLayout {
&self.projection_layout
}
#[must_use]
pub(in crate::db) const fn projection_layout_valid(&self) -> bool {
self.projection_layout_valid
}
#[must_use]
pub(in crate::db) const fn grouped_plan_strategy_hint(&self) -> GroupedPlanStrategyHint {
self.grouped_plan_strategy_hint
}
#[must_use]
pub(in crate::db) const fn distinct_execution_strategy(
&self,
) -> &GroupedDistinctExecutionStrategy {
self.grouped_distinct_policy_contract.execution_strategy()
}
#[must_use]
pub(in crate::db) const fn distinct_policy_violation_for_executor(
&self,
) -> Option<GroupDistinctPolicyReason> {
self.grouped_distinct_policy_contract
.violation_for_executor()
}
#[must_use]
pub(in crate::db) const fn having(&self) -> Option<&'a GroupHavingSpec> {
self.having
}
#[must_use]
pub(in crate::db) const fn execution(&self) -> GroupedExecutionConfig {
self.execution
}
}
pub(in crate::db) fn grouped_executor_handoff(
plan: &AccessPlannedQuery,
) -> Result<GroupedExecutorHandoff<'_>, InternalError> {
let Some(grouped) = plan.grouped_plan() else {
return Err(InternalError::planner_executor_invariant(
"grouped executor handoff requires grouped logical plans",
));
};
let projection_spec = plan.projection_spec_for_identity();
let (projection_layout, aggregate_exprs) =
planned_projection_layout_and_aggregate_exprs_from_spec(&projection_spec)?;
let projection_layout_valid = validate_grouped_projection_layout(
&projection_layout,
grouped.group.group_fields.len(),
aggregate_exprs.len(),
)
.map(|()| true)?;
let grouped_plan_strategy_hint = grouped_plan_strategy_hint(plan).ok_or_else(|| {
InternalError::planner_executor_invariant(
"grouped executor handoff must carry grouped strategy hint for grouped plans",
)
})?;
let grouped_distinct_policy_contract = grouped_distinct_policy_contract(
grouped.scalar.distinct,
grouped.having.is_some(),
grouped.group.group_fields.as_slice(),
grouped.group.aggregates.as_slice(),
grouped.having.as_ref(),
)?;
Ok(GroupedExecutorHandoff {
base: plan,
group_fields: grouped.group.group_fields.as_slice(),
aggregate_exprs,
projection_layout,
projection_layout_valid,
grouped_plan_strategy_hint,
grouped_distinct_policy_contract,
having: grouped.having.as_ref(),
execution: grouped.group.execution,
})
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub(in crate::db) struct GroupedDistinctPolicyContract {
violation_for_executor: Option<GroupDistinctPolicyReason>,
execution_strategy: GroupedDistinctExecutionStrategy,
}
impl GroupedDistinctPolicyContract {
#[must_use]
const fn new(
violation_for_executor: Option<GroupDistinctPolicyReason>,
execution_strategy: GroupedDistinctExecutionStrategy,
) -> Self {
Self {
violation_for_executor,
execution_strategy,
}
}
#[must_use]
pub(in crate::db) const fn execution_strategy(&self) -> &GroupedDistinctExecutionStrategy {
&self.execution_strategy
}
#[must_use]
pub(in crate::db) const fn violation_for_executor(&self) -> Option<GroupDistinctPolicyReason> {
self.violation_for_executor
}
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub(in crate::db) enum GroupedDistinctExecutionStrategy {
None,
GlobalDistinctFieldCount { target_field: String },
GlobalDistinctFieldSum { target_field: String },
GlobalDistinctFieldAvg { target_field: String },
}
fn grouped_distinct_execution_strategy(
group_fields: &[FieldSlot],
aggregates: &[GroupAggregateSpec],
having: Option<&GroupHavingSpec>,
) -> Result<GroupedDistinctExecutionStrategy, InternalError> {
match resolve_global_distinct_field_aggregate(group_fields, aggregates, having) {
Ok(Some(aggregate)) => match aggregate.kind() {
AggregateKind::Count => {
Ok(GroupedDistinctExecutionStrategy::GlobalDistinctFieldCount {
target_field: aggregate.target_field().to_string(),
})
}
AggregateKind::Sum => Ok(GroupedDistinctExecutionStrategy::GlobalDistinctFieldSum {
target_field: aggregate.target_field().to_string(),
}),
AggregateKind::Avg => Ok(GroupedDistinctExecutionStrategy::GlobalDistinctFieldAvg {
target_field: aggregate.target_field().to_string(),
}),
AggregateKind::Exists
| AggregateKind::Min
| AggregateKind::Max
| AggregateKind::First
| AggregateKind::Last => Err(InternalError::planner_executor_invariant(
"planner grouped DISTINCT strategy handoff must lower only COUNT/SUM/AVG field-target aggregates",
)),
},
Ok(None) => Ok(GroupedDistinctExecutionStrategy::None),
Err(reason) => Err(reason.into_planner_handoff_internal_error()),
}
}
fn grouped_distinct_policy_contract(
scalar_distinct: bool,
has_having: bool,
group_fields: &[FieldSlot],
aggregates: &[GroupAggregateSpec],
having: Option<&GroupHavingSpec>,
) -> Result<GroupedDistinctPolicyContract, InternalError> {
let violation_for_executor = match grouped_distinct_admissibility(scalar_distinct, has_having) {
GroupDistinctAdmissibility::Allowed => None,
GroupDistinctAdmissibility::Disallowed(reason) => Some(reason),
};
let execution_strategy = grouped_distinct_execution_strategy(group_fields, aggregates, having)?;
Ok(GroupedDistinctPolicyContract::new(
violation_for_executor,
execution_strategy,
))
}
fn planned_projection_layout_and_aggregate_exprs_from_spec(
projection_spec: &ProjectionSpec,
) -> Result<(PlannedProjectionLayout, Vec<AggregateExpr>), InternalError> {
let mut group_field_positions = Vec::new();
let mut aggregate_positions = Vec::new();
let mut aggregate_exprs = Vec::new();
for (index, field) in projection_spec.fields().enumerate() {
match field {
ProjectionField::Scalar { expr, .. } => {
let root_expr = expression_without_alias(expr);
match root_expr {
Expr::Field(_) => group_field_positions.push(index),
Expr::Aggregate(aggregate_expr) => {
aggregate_positions.push(index);
aggregate_exprs.push(aggregate_expr.clone());
}
Expr::Literal(_) | Expr::Unary { .. } | Expr::Binary { .. } => {
return Err(InternalError::planner_executor_invariant(format!(
"grouped projection layout expects only field/aggregate expressions; found non-grouped projection expression at index={index}",
)));
}
Expr::Alias { .. } => {
return Err(InternalError::planner_executor_invariant(
"grouped projection layout alias normalization must remove alias wrappers",
));
}
}
}
}
}
Ok((
PlannedProjectionLayout {
group_field_positions,
aggregate_positions,
},
aggregate_exprs,
))
}
fn expression_without_alias(mut expr: &Expr) -> &Expr {
while let Expr::Alias { expr: inner, .. } = expr {
expr = inner.as_ref();
}
expr
}