use crate::{
db::query::{
builder::AggregateExpr,
plan::{
AccessPlannedQuery, AggregateKind, FieldSlot, GroupAggregateSpec,
GroupDistinctAdmissibility, GroupDistinctPolicyReason, GroupedExecutionConfig,
GroupedPlanStrategy,
expr::{
Expr, ProjectionSpec, ScalarProjectionExpr, compile_scalar_projection_expr,
expr_references_only_fields, projection_field_expr,
},
grouped_distinct_admissibility, grouped_plan_strategy,
resolve_aggregate_target_field_slot, resolve_global_distinct_field_aggregate,
validate_grouped_projection_layout,
},
},
error::InternalError,
model::entity::EntityModel,
};
#[derive(Clone, Debug, Eq, PartialEq)]
pub(in crate::db) struct PlannedProjectionLayout {
pub(in crate::db) group_field_positions: Vec<usize>,
pub(in crate::db) aggregate_positions: Vec<usize>,
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub(in crate::db) struct GroupedAggregateExecutionSpec {
kind: AggregateKind,
target_slot: Option<FieldSlot>,
input_expr: Option<Expr>,
filter_expr: Option<Expr>,
compiled_input_expr: Option<ScalarProjectionExpr>,
compiled_filter_expr: Option<ScalarProjectionExpr>,
distinct: bool,
}
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
struct GroupedProjectionAggregateScan {
contains_aggregate: bool,
introduced_aggregate_count: usize,
}
impl GroupedProjectionAggregateScan {
#[must_use]
const fn none() -> Self {
Self {
contains_aggregate: false,
introduced_aggregate_count: 0,
}
}
#[must_use]
const fn found_aggregate(introduced_aggregate_count: usize) -> Self {
Self {
contains_aggregate: true,
introduced_aggregate_count,
}
}
#[must_use]
const fn combine(self, other: Self) -> Self {
Self {
contains_aggregate: self.contains_aggregate || other.contains_aggregate,
introduced_aggregate_count: self
.introduced_aggregate_count
.saturating_add(other.introduced_aggregate_count),
}
}
#[must_use]
const fn contains_aggregate(self) -> bool {
self.contains_aggregate
}
#[must_use]
const fn introduced_aggregate_count(self) -> usize {
self.introduced_aggregate_count
}
}
impl GroupedAggregateExecutionSpec {
#[must_use]
pub(in crate::db) fn from_aggregate_expr(aggregate_expr: &AggregateExpr) -> Self {
Self {
kind: aggregate_expr.kind(),
target_slot: None,
input_expr: aggregate_expr.input_expr().cloned(),
filter_expr: aggregate_expr.filter_expr().cloned(),
compiled_input_expr: None,
compiled_filter_expr: None,
distinct: aggregate_expr.is_distinct(),
}
}
pub(in crate::db) fn resolve_for_model(
&self,
model: &EntityModel,
) -> Result<Self, InternalError> {
let compiled_input_expr = self
.input_expr()
.map(|expr| {
compile_scalar_projection_expr(model, expr).ok_or_else(|| {
InternalError::planner_executor_invariant(format!(
"grouped aggregate execution input expression must stay on the scalar seam: kind={:?} input_expr={expr:?}",
self.kind(),
))
})
})
.transpose()?;
let compiled_filter_expr = self
.filter_expr()
.map(|expr| {
compile_scalar_projection_expr(model, expr).ok_or_else(|| {
InternalError::planner_executor_invariant(format!(
"grouped aggregate filter expression must stay on the scalar seam: kind={:?} filter_expr={expr:?}",
self.kind(),
))
})
})
.transpose()?;
let target_slot = self
.target_field()
.map(|field| {
resolve_aggregate_target_field_slot(model, field).map_err(|err| {
InternalError::planner_executor_invariant(format!(
"grouped aggregate execution target slot resolution failed: field='{field}', error={err}",
))
})
})
.transpose()?;
Ok(Self {
kind: self.kind(),
target_slot,
input_expr: self.input_expr().cloned(),
filter_expr: self.filter_expr().cloned(),
compiled_input_expr,
compiled_filter_expr,
distinct: self.distinct(),
})
}
#[must_use]
pub(in crate::db) const fn kind(&self) -> AggregateKind {
self.kind
}
#[must_use]
pub(in crate::db) const fn target_field(&self) -> Option<&str> {
match self.input_expr() {
Some(Expr::Field(field_id)) => Some(field_id.as_str()),
_ => None,
}
}
#[must_use]
pub(in crate::db) const fn target_slot(&self) -> Option<&FieldSlot> {
self.target_slot.as_ref()
}
#[must_use]
pub(in crate::db) const fn input_expr(&self) -> Option<&Expr> {
self.input_expr.as_ref()
}
#[must_use]
pub(in crate::db) const fn filter_expr(&self) -> Option<&Expr> {
self.filter_expr.as_ref()
}
#[must_use]
pub(in crate::db) const fn distinct(&self) -> bool {
self.distinct
}
#[must_use]
pub(in crate::db) fn matches_semantic_aggregate(&self, aggregate: &GroupAggregateSpec) -> bool {
self.kind == aggregate.kind()
&& self.input_expr() == aggregate.semantic_input_expr_owned().as_ref()
&& self.filter_expr() == aggregate.filter_expr()
&& self.distinct == aggregate.distinct()
}
#[must_use]
pub(in crate::db) const fn compiled_input_expr(&self) -> Option<&ScalarProjectionExpr> {
self.compiled_input_expr.as_ref()
}
#[must_use]
pub(in crate::db) const fn compiled_filter_expr(&self) -> Option<&ScalarProjectionExpr> {
self.compiled_filter_expr.as_ref()
}
#[must_use]
pub(in crate::db) fn matches_aggregate_expr(&self, aggregate_expr: &AggregateExpr) -> bool {
self.kind == aggregate_expr.kind()
&& self.input_expr() == aggregate_expr.input_expr()
&& self.filter_expr() == aggregate_expr.filter_expr()
&& self.distinct == aggregate_expr.is_distinct()
}
#[cfg(test)]
#[must_use]
pub(in crate::db) fn from_parts_for_test(
kind: AggregateKind,
target_slot: Option<FieldSlot>,
target_field: Option<&str>,
distinct: bool,
) -> Self {
Self {
kind,
target_slot,
input_expr: target_field
.map(|field| Expr::Field(crate::db::query::plan::expr::FieldId::new(field))),
filter_expr: None,
compiled_input_expr: None,
compiled_filter_expr: None,
distinct,
}
}
}
impl PlannedProjectionLayout {
#[must_use]
pub(in crate::db) const fn group_field_positions(&self) -> &[usize] {
self.group_field_positions.as_slice()
}
#[must_use]
pub(in crate::db) const fn aggregate_positions(&self) -> &[usize] {
self.aggregate_positions.as_slice()
}
pub(in crate::db) fn group_field_positions_not_strictly_increasing() -> InternalError {
InternalError::planner_executor_invariant(
"grouped projection layout group-field positions must be strictly increasing",
)
}
pub(in crate::db) fn aggregate_positions_not_strictly_increasing() -> InternalError {
InternalError::planner_executor_invariant(
"grouped projection layout aggregate positions must be strictly increasing",
)
}
pub(in crate::db) fn group_fields_must_precede_aggregates() -> InternalError {
InternalError::planner_executor_invariant(
"grouped projection layout must keep group fields before aggregate terminals",
)
}
pub(in crate::db) fn projected_position_out_of_bounds(
position_kind: &str,
position: usize,
projected_len: usize,
) -> InternalError {
InternalError::query_executor_invariant(format!(
"grouped projection layout {position_kind} position out of bounds: position={position}, projected_len={projected_len}",
))
}
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(in crate::db) enum GroupedFoldPath {
CountRowsDedicated,
GenericReducers,
}
impl GroupedFoldPath {
#[must_use]
pub(in crate::db) const fn from_plan_strategy(strategy: GroupedPlanStrategy) -> Self {
if strategy.is_single_count_rows() {
Self::CountRowsDedicated
} else {
Self::GenericReducers
}
}
#[must_use]
pub(in crate::db) const fn uses_count_rows_dedicated_fold(self) -> bool {
matches!(self, Self::CountRowsDedicated)
}
}
#[derive(Clone)]
pub(in crate::db) struct GroupedExecutorHandoff<'a> {
base: &'a AccessPlannedQuery,
group_fields: &'a [FieldSlot],
#[cfg(test)]
aggregate_specs: Vec<GroupedAggregateExecutionSpec>,
grouped_aggregate_execution_specs: Vec<GroupedAggregateExecutionSpec>,
projection_layout: PlannedProjectionLayout,
projection_is_identity: bool,
grouped_plan_strategy: GroupedPlanStrategy,
grouped_fold_path: GroupedFoldPath,
grouped_distinct_policy_contract: GroupedDistinctPolicyContract,
having_expr: Option<&'a crate::db::query::plan::expr::Expr>,
execution: GroupedExecutionConfig,
}
impl<'a> GroupedExecutorHandoff<'a> {
#[must_use]
pub(in crate::db) const fn base(&self) -> &'a AccessPlannedQuery {
self.base
}
#[must_use]
pub(in crate::db) const fn group_fields(&self) -> &'a [FieldSlot] {
self.group_fields
}
#[cfg(test)]
#[must_use]
pub(in crate::db) const fn aggregate_specs(&self) -> &[GroupedAggregateExecutionSpec] {
self.aggregate_specs.as_slice()
}
#[must_use]
pub(in crate::db) const fn grouped_aggregate_execution_specs(
&self,
) -> &[GroupedAggregateExecutionSpec] {
self.grouped_aggregate_execution_specs.as_slice()
}
#[must_use]
pub(in crate::db) const fn projection_layout(&self) -> &PlannedProjectionLayout {
&self.projection_layout
}
#[must_use]
pub(in crate::db) const fn projection_is_identity(&self) -> bool {
self.projection_is_identity
}
#[must_use]
pub(in crate::db) const fn grouped_plan_strategy(&self) -> GroupedPlanStrategy {
self.grouped_plan_strategy
}
#[must_use]
pub(in crate::db) const fn grouped_fold_path(&self) -> GroupedFoldPath {
self.grouped_fold_path
}
#[must_use]
pub(in crate::db) const fn distinct_execution_strategy(
&self,
) -> &GroupedDistinctExecutionStrategy {
self.grouped_distinct_policy_contract.execution_strategy()
}
#[must_use]
pub(in crate::db) const fn distinct_policy_violation_for_executor(
&self,
) -> Option<GroupDistinctPolicyReason> {
self.grouped_distinct_policy_contract
.violation_for_executor()
}
#[must_use]
pub(in crate::db) const fn having_expr(
&self,
) -> Option<&'a crate::db::query::plan::expr::Expr> {
self.having_expr
}
#[must_use]
pub(in crate::db) const fn execution(&self) -> GroupedExecutionConfig {
self.execution
}
}
pub(in crate::db) fn grouped_executor_handoff(
plan: &AccessPlannedQuery,
) -> Result<GroupedExecutorHandoff<'_>, InternalError> {
let Some(grouped) = plan.grouped_plan() else {
return Err(InternalError::planner_executor_invariant(
"grouped executor handoff requires grouped logical plans",
));
};
let projection_spec = plan.frozen_projection_spec();
let (projection_layout, aggregate_specs, projection_is_identity) =
planned_projection_layout_and_aggregate_specs_from_spec(
projection_spec,
grouped.group.group_fields.as_slice(),
grouped.group.aggregates.as_slice(),
)?;
#[cfg(not(test))]
let _ = &aggregate_specs;
validate_grouped_projection_layout(&projection_layout)?;
let grouped_plan_strategy = grouped_plan_strategy(plan).ok_or_else(|| {
InternalError::planner_executor_invariant(
"grouped executor handoff must carry grouped strategy for grouped plans",
)
})?;
let grouped_aggregate_execution_specs = plan
.grouped_aggregate_execution_specs()
.ok_or_else(|| {
InternalError::planner_executor_invariant(
"grouped executor handoff requires frozen grouped aggregate execution specs",
)
})?
.to_vec();
let grouped_fold_path = GroupedFoldPath::from_plan_strategy(grouped_plan_strategy);
let grouped_distinct_policy_contract = GroupedDistinctPolicyContract::new(
match grouped_distinct_admissibility(grouped.scalar.distinct, grouped.having_expr.is_some())
{
GroupDistinctAdmissibility::Allowed => None,
GroupDistinctAdmissibility::Disallowed(reason) => Some(reason),
},
plan.grouped_distinct_execution_strategy()
.ok_or_else(|| {
InternalError::planner_executor_invariant(
"grouped executor handoff requires frozen grouped DISTINCT strategy",
)
})?
.clone(),
);
Ok(GroupedExecutorHandoff {
base: plan,
group_fields: grouped.group.group_fields.as_slice(),
#[cfg(test)]
aggregate_specs,
grouped_aggregate_execution_specs,
projection_layout,
projection_is_identity,
grouped_plan_strategy,
grouped_fold_path,
grouped_distinct_policy_contract,
having_expr: grouped.having_expr.as_ref(),
execution: grouped.group.execution,
})
}
pub(in crate::db) fn grouped_aggregate_execution_specs(
model: &EntityModel,
aggregate_specs: &[GroupedAggregateExecutionSpec],
) -> Result<Vec<GroupedAggregateExecutionSpec>, InternalError> {
aggregate_specs
.iter()
.map(|aggregate_spec| aggregate_spec.resolve_for_model(model))
.collect()
}
pub(in crate::db) fn grouped_aggregate_specs_from_projection_spec(
projection_spec: &ProjectionSpec,
group_fields: &[FieldSlot],
aggregates: &[GroupAggregateSpec],
) -> Result<Vec<GroupedAggregateExecutionSpec>, InternalError> {
let (_, aggregate_specs, _) = planned_projection_layout_and_aggregate_specs_from_spec(
projection_spec,
group_fields,
aggregates,
)?;
Ok(aggregate_specs)
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub(in crate::db) struct GroupedDistinctPolicyContract {
violation_for_executor: Option<GroupDistinctPolicyReason>,
execution_strategy: GroupedDistinctExecutionStrategy,
}
impl GroupedDistinctPolicyContract {
#[must_use]
const fn new(
violation_for_executor: Option<GroupDistinctPolicyReason>,
execution_strategy: GroupedDistinctExecutionStrategy,
) -> Self {
Self {
violation_for_executor,
execution_strategy,
}
}
#[must_use]
pub(in crate::db) const fn execution_strategy(&self) -> &GroupedDistinctExecutionStrategy {
&self.execution_strategy
}
#[must_use]
pub(in crate::db) const fn violation_for_executor(&self) -> Option<GroupDistinctPolicyReason> {
self.violation_for_executor
}
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub(in crate::db) enum GroupedDistinctExecutionStrategy {
None,
GlobalDistinctFieldCount {
target_field: String,
target_slot: FieldSlot,
},
GlobalDistinctFieldSum {
target_field: String,
target_slot: FieldSlot,
},
GlobalDistinctFieldAvg {
target_field: String,
target_slot: FieldSlot,
},
}
impl GroupedDistinctExecutionStrategy {
#[must_use]
pub(in crate::db) const fn global_distinct_target_slot(&self) -> Option<&FieldSlot> {
match self {
Self::None => None,
Self::GlobalDistinctFieldCount { target_slot, .. }
| Self::GlobalDistinctFieldSum { target_slot, .. }
| Self::GlobalDistinctFieldAvg { target_slot, .. } => Some(target_slot),
}
}
#[must_use]
pub(in crate::db) const fn global_distinct_aggregate_kind(&self) -> Option<AggregateKind> {
match self {
Self::None => None,
Self::GlobalDistinctFieldCount { .. } => Some(AggregateKind::Count),
Self::GlobalDistinctFieldSum { .. } => Some(AggregateKind::Sum),
Self::GlobalDistinctFieldAvg { .. } => Some(AggregateKind::Avg),
}
}
}
pub(in crate::db) fn resolved_grouped_distinct_execution_strategy_for_model(
model: &EntityModel,
group_fields: &[FieldSlot],
aggregates: &[GroupAggregateSpec],
having_expr: Option<&crate::db::query::plan::expr::Expr>,
) -> Result<GroupedDistinctExecutionStrategy, InternalError> {
match resolve_global_distinct_field_aggregate(group_fields, aggregates, having_expr) {
Ok(Some(aggregate)) => {
let target_field = aggregate.target_field().to_string();
let target_slot =
resolve_aggregate_target_field_slot(model, aggregate.target_field()).map_err(
|err| {
InternalError::planner_executor_invariant(format!(
"grouped DISTINCT strategy target slot resolution failed: field='{}', error={err}",
aggregate.target_field(),
))
},
)?;
match aggregate.kind() {
AggregateKind::Count => {
Ok(GroupedDistinctExecutionStrategy::GlobalDistinctFieldCount {
target_field,
target_slot,
})
}
AggregateKind::Sum => {
Ok(GroupedDistinctExecutionStrategy::GlobalDistinctFieldSum {
target_field,
target_slot,
})
}
AggregateKind::Avg => {
Ok(GroupedDistinctExecutionStrategy::GlobalDistinctFieldAvg {
target_field,
target_slot,
})
}
AggregateKind::Exists
| AggregateKind::Min
| AggregateKind::Max
| AggregateKind::First
| AggregateKind::Last => Err(InternalError::planner_executor_invariant(
"planner grouped DISTINCT strategy handoff must lower only COUNT/SUM/AVG field-target aggregates",
)),
}
}
Ok(None) => Ok(GroupedDistinctExecutionStrategy::None),
Err(reason) => Err(reason.into_planner_handoff_internal_error()),
}
}
fn planned_projection_layout_and_aggregate_specs_core(
projection_spec: &ProjectionSpec,
group_fields: &[FieldSlot],
aggregates: &[GroupAggregateSpec],
) -> (
PlannedProjectionLayout,
Vec<GroupedAggregateExecutionSpec>,
bool,
) {
let grouped_field_names = group_fields
.iter()
.map(FieldSlot::field)
.collect::<Vec<_>>();
let mut group_field_positions = Vec::new();
let mut aggregate_positions = Vec::new();
let mut aggregate_specs = Vec::new();
let mut projection_is_identity =
projection_spec.len() == group_fields.len().saturating_add(aggregates.len());
let mut next_group_field_index = 0usize;
let mut next_aggregate_index = 0usize;
for (index, field) in projection_spec.fields().enumerate() {
let root_expr = expression_without_alias(projection_field_expr(field));
let aggregate_scan =
collect_grouped_projection_aggregate_scan(root_expr, &mut aggregate_specs);
match root_expr {
Expr::Field(field_id) => {
group_field_positions.push(index);
projection_is_identity &= next_aggregate_index == 0
&& group_fields
.get(next_group_field_index)
.is_some_and(|group_field| field_id.as_str() == group_field.field.as_str());
next_group_field_index = next_group_field_index.saturating_add(1);
}
Expr::Aggregate(aggregate_expr) => {
aggregate_positions.push(index);
let aggregate_spec =
GroupedAggregateExecutionSpec::from_aggregate_expr(aggregate_expr);
projection_is_identity &= next_group_field_index == group_fields.len()
&& aggregates
.get(next_aggregate_index)
.is_some_and(|aggregate| {
aggregate_spec.matches_semantic_aggregate(aggregate)
});
next_aggregate_index = next_aggregate_index
.saturating_add(aggregate_scan.introduced_aggregate_count());
}
_ if aggregate_scan.contains_aggregate() => {
aggregate_positions.push(index);
projection_is_identity = false;
next_aggregate_index = next_aggregate_index
.saturating_add(aggregate_scan.introduced_aggregate_count());
}
_ if expr_references_only_fields(root_expr, grouped_field_names.as_slice()) => {
group_field_positions.push(index);
projection_is_identity &= grouped_projection_expression_preserves_identity(
root_expr,
group_fields,
next_group_field_index,
next_aggregate_index,
);
next_group_field_index = next_group_field_index.saturating_add(1);
}
_ => {
group_field_positions.push(index);
projection_is_identity = false;
next_group_field_index = next_group_field_index.saturating_add(1);
}
}
}
projection_is_identity &=
next_group_field_index == group_fields.len() && next_aggregate_index == aggregates.len();
(
PlannedProjectionLayout {
group_field_positions,
aggregate_positions,
},
aggregate_specs,
projection_is_identity,
)
}
#[allow(
clippy::unnecessary_wraps,
reason = "test builds keep one extra grouped projection strictness pass while non-test builds stay on the planner core path"
)]
fn planned_projection_layout_and_aggregate_specs_from_spec(
projection_spec: &ProjectionSpec,
group_fields: &[FieldSlot],
aggregates: &[GroupAggregateSpec],
) -> Result<
(
PlannedProjectionLayout,
Vec<GroupedAggregateExecutionSpec>,
bool,
),
InternalError,
> {
#[cfg(test)]
{
let grouped_field_names = group_fields
.iter()
.map(FieldSlot::field)
.collect::<Vec<_>>();
for (index, field) in projection_spec.fields().enumerate() {
let root_expr = expression_without_alias(projection_field_expr(field));
if !expr_references_only_fields(root_expr, grouped_field_names.as_slice()) {
return Err(InternalError::planner_executor_invariant(format!(
"grouped projection layout expects only field/aggregate expressions; found non-grouped projection expression at index={index}",
)));
}
}
}
Ok(planned_projection_layout_and_aggregate_specs_core(
projection_spec,
group_fields,
aggregates,
))
}
fn grouped_projection_expression_preserves_identity(
root_expr: &Expr,
group_fields: &[FieldSlot],
next_group_field_index: usize,
next_aggregate_index: usize,
) -> bool {
next_aggregate_index == 0
&& matches!(
root_expr,
Expr::Field(field_id)
if group_fields.get(next_group_field_index).is_some_and(
|group_field| field_id.as_str() == group_field.field.as_str(),
)
)
}
fn collect_grouped_projection_aggregate_scan(
expr: &Expr,
aggregate_specs: &mut Vec<GroupedAggregateExecutionSpec>,
) -> GroupedProjectionAggregateScan {
match expr {
Expr::Aggregate(aggregate_expr) => {
GroupedProjectionAggregateScan::found_aggregate(push_unique_grouped_aggregate_spec(
aggregate_specs,
GroupedAggregateExecutionSpec::from_aggregate_expr(aggregate_expr),
))
}
Expr::Field(_) | Expr::Literal(_) => GroupedProjectionAggregateScan::none(),
Expr::FunctionCall { args, .. } => {
args.iter()
.fold(GroupedProjectionAggregateScan::none(), |scan, arg| {
scan.combine(collect_grouped_projection_aggregate_scan(
arg,
aggregate_specs,
))
})
}
Expr::Case {
when_then_arms,
else_expr,
} => when_then_arms.iter().fold(
collect_grouped_projection_aggregate_scan(else_expr.as_ref(), aggregate_specs),
|scan, arm| {
scan.combine(collect_grouped_projection_aggregate_scan(
arm.condition(),
aggregate_specs,
))
.combine(collect_grouped_projection_aggregate_scan(
arm.result(),
aggregate_specs,
))
},
),
Expr::Binary { left, right, .. } => {
collect_grouped_projection_aggregate_scan(left.as_ref(), aggregate_specs).combine(
collect_grouped_projection_aggregate_scan(right.as_ref(), aggregate_specs),
)
}
#[cfg(test)]
Expr::Alias { expr, .. } => {
collect_grouped_projection_aggregate_scan(expr.as_ref(), aggregate_specs)
}
Expr::Unary { expr, .. } => {
collect_grouped_projection_aggregate_scan(expr.as_ref(), aggregate_specs)
}
}
}
fn push_unique_grouped_aggregate_spec(
aggregate_specs: &mut Vec<GroupedAggregateExecutionSpec>,
aggregate_spec: GroupedAggregateExecutionSpec,
) -> usize {
if aggregate_specs
.iter()
.all(|current| current != &aggregate_spec)
{
aggregate_specs.push(aggregate_spec);
return 1;
}
0
}
#[allow(
clippy::missing_const_for_fn,
reason = "alias stripping traverses boxed expression refs that are not const-callable on stable"
)]
fn expression_without_alias(expr: &Expr) -> &Expr {
#[cfg(test)]
{
let mut current = expr;
while let Expr::Alias { expr: inner, .. } = current {
current = inner.as_ref();
}
current
}
#[cfg(not(test))]
expr
}