use crate::{
db::query::{
builder::AggregateExpr,
plan::{
AccessPlannedQuery, AggregateIdentity, AggregateKind, AggregateSemanticKey, FieldSlot,
GlobalDistinctAggregateKind, GroupAggregateSpec, GroupDistinctAdmissibility,
GroupDistinctPolicyReason, GroupedExecutionConfig, GroupedPlanStrategy,
expr::{CompiledExpr, Expr, ProjectionSpec, compile_scalar_projection_expr},
grouped_distinct_admissibility, grouped_plan_strategy,
resolve_aggregate_target_field_slot, resolve_global_distinct_field_aggregate,
validate_grouped_projection_layout,
},
},
error::InternalError,
model::entity::EntityModel,
};
#[derive(Clone, Debug, Eq, PartialEq)]
pub(in crate::db) struct PlannedProjectionLayout {
pub(in crate::db) group_field_positions: Vec<usize>,
pub(in crate::db) aggregate_positions: Vec<usize>,
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub(in crate::db) struct GroupedAggregateExecutionSpec {
identity: AggregateIdentity,
target_slot: Option<FieldSlot>,
filter_expr: Option<Expr>,
compiled_input_expr: Option<CompiledExpr>,
compiled_filter_expr: Option<CompiledExpr>,
}
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
struct GroupedProjectionAggregateScan {
contains_aggregate: bool,
introduced_aggregate_count: usize,
}
impl GroupedProjectionAggregateScan {
#[must_use]
const fn none() -> Self {
Self {
contains_aggregate: false,
introduced_aggregate_count: 0,
}
}
#[must_use]
const fn found_aggregate(introduced_aggregate_count: usize) -> Self {
Self {
contains_aggregate: true,
introduced_aggregate_count,
}
}
#[must_use]
const fn contains_aggregate(self) -> bool {
self.contains_aggregate
}
#[must_use]
const fn introduced_aggregate_count(self) -> usize {
self.introduced_aggregate_count
}
}
impl GroupedAggregateExecutionSpec {
fn compile_attached_scalar_expr(
model: &EntityModel,
kind: AggregateKind,
role: &'static str,
expr: &Expr,
) -> Result<CompiledExpr, InternalError> {
let scalar = compile_scalar_projection_expr(model, expr).ok_or_else(|| {
InternalError::planner_executor_invariant(format!(
"grouped aggregate {role} expression must stay on the scalar seam: kind={kind:?} {role}_expr={expr:?}",
))
})?;
Ok(CompiledExpr::compile(&scalar))
}
#[must_use]
pub(in crate::db) fn from_aggregate_expr(aggregate_expr: &AggregateExpr) -> Self {
Self {
identity: AggregateIdentity::from_aggregate_expr(aggregate_expr),
target_slot: None,
filter_expr: aggregate_expr.filter_expr().cloned(),
compiled_input_expr: None,
compiled_filter_expr: None,
}
}
#[must_use]
pub(in crate::db) const fn from_uncompiled_parts(
kind: AggregateKind,
target_slot: Option<FieldSlot>,
input_expr: Option<Expr>,
filter_expr: Option<Expr>,
distinct: bool,
) -> Self {
Self {
identity: AggregateIdentity::from_parts(kind, input_expr, distinct),
target_slot,
filter_expr,
compiled_input_expr: None,
compiled_filter_expr: None,
}
}
pub(in crate::db) fn resolve_for_model(
&self,
model: &EntityModel,
) -> Result<Self, InternalError> {
let compiled_input_expr = self
.input_expr()
.map(|expr| Self::compile_attached_scalar_expr(model, self.kind(), "input", expr))
.transpose()?;
let compiled_filter_expr = self
.filter_expr()
.map(|expr| Self::compile_attached_scalar_expr(model, self.kind(), "filter", expr))
.transpose()?;
let target_slot = self
.target_field()
.map(|field| {
resolve_aggregate_target_field_slot(model, field).map_err(|err| {
InternalError::planner_executor_invariant(format!(
"grouped aggregate execution target slot resolution failed: field='{field}', error={err}",
))
})
})
.transpose()?;
Ok(Self {
identity: self.identity.clone(),
target_slot,
filter_expr: self.filter_expr().cloned(),
compiled_input_expr,
compiled_filter_expr,
})
}
#[must_use]
pub(in crate::db) const fn kind(&self) -> AggregateKind {
self.identity.kind()
}
#[must_use]
pub(in crate::db) const fn target_field(&self) -> Option<&str> {
match self.input_expr() {
Some(Expr::Field(field_id)) => Some(field_id.as_str()),
_ => None,
}
}
#[must_use]
pub(in crate::db) const fn target_slot(&self) -> Option<&FieldSlot> {
self.target_slot.as_ref()
}
#[must_use]
pub(in crate::db) const fn input_expr(&self) -> Option<&Expr> {
self.identity.input_expr()
}
#[must_use]
pub(in crate::db) const fn filter_expr(&self) -> Option<&Expr> {
self.filter_expr.as_ref()
}
#[must_use]
pub(in crate::db) fn identity(&self) -> AggregateIdentity {
self.identity.clone()
}
#[must_use]
pub(in crate::db) fn semantic_key(&self) -> AggregateSemanticKey {
AggregateSemanticKey::from_identity(self.identity(), self.filter_expr().cloned())
}
#[must_use]
pub(in crate::db) const fn distinct(&self) -> bool {
self.identity.distinct()
}
#[must_use]
pub(in crate::db) const fn uses_grouped_distinct_value_dedup(&self) -> bool {
self.identity.uses_grouped_distinct_value_dedup()
}
#[must_use]
pub(in crate::db) fn matches_aggregate_identity(&self, aggregate: &GroupAggregateSpec) -> bool {
self.semantic_key() == aggregate.semantic_key()
}
#[must_use]
pub(in crate::db) const fn compiled_input_expr(&self) -> Option<&CompiledExpr> {
self.compiled_input_expr.as_ref()
}
#[must_use]
pub(in crate::db) const fn compiled_filter_expr(&self) -> Option<&CompiledExpr> {
self.compiled_filter_expr.as_ref()
}
#[must_use]
pub(in crate::db) const fn admits_count_rows_dedicated_fold(&self) -> bool {
matches!(self.kind(), AggregateKind::Count)
&& self.input_expr().is_none()
&& self.filter_expr().is_none()
&& !self.distinct()
}
#[must_use]
pub(in crate::db) fn matches_aggregate_expr(&self, aggregate_expr: &AggregateExpr) -> bool {
self.semantic_key() == AggregateSemanticKey::from_aggregate_expr(aggregate_expr)
}
#[cfg(test)]
#[must_use]
pub(in crate::db) fn from_parts_for_test(
kind: AggregateKind,
target_slot: Option<FieldSlot>,
target_field: Option<&str>,
distinct: bool,
) -> Self {
Self {
identity: AggregateIdentity::from_parts(
kind,
target_field
.map(|field| Expr::Field(crate::db::query::plan::expr::FieldId::new(field))),
distinct,
),
target_slot,
filter_expr: None,
compiled_input_expr: None,
compiled_filter_expr: None,
}
}
}
impl PlannedProjectionLayout {
#[must_use]
pub(in crate::db) const fn group_field_positions(&self) -> &[usize] {
self.group_field_positions.as_slice()
}
#[must_use]
pub(in crate::db) const fn aggregate_positions(&self) -> &[usize] {
self.aggregate_positions.as_slice()
}
pub(in crate::db) fn group_field_positions_not_strictly_increasing() -> InternalError {
InternalError::planner_executor_invariant(
"grouped projection layout group-field positions must be strictly increasing",
)
}
pub(in crate::db) fn aggregate_positions_not_strictly_increasing() -> InternalError {
InternalError::planner_executor_invariant(
"grouped projection layout aggregate positions must be strictly increasing",
)
}
pub(in crate::db) fn group_fields_must_precede_aggregates() -> InternalError {
InternalError::planner_executor_invariant(
"grouped projection layout must keep group fields before aggregate terminals",
)
}
pub(in crate::db) fn projected_position_out_of_bounds(
position_kind: &str,
position: usize,
projected_len: usize,
) -> InternalError {
InternalError::query_executor_invariant(format!(
"grouped projection layout {position_kind} position out of bounds: position={position}, projected_len={projected_len}",
))
}
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(in crate::db) enum GroupedFoldPath {
CountRowsDedicated,
GenericReducers,
}
impl GroupedFoldPath {
#[must_use]
pub(in crate::db) fn from_plan_strategy(
strategy: GroupedPlanStrategy,
aggregate_specs: &[GroupedAggregateExecutionSpec],
) -> Self {
if strategy.is_single_count_rows()
&& aggregate_specs
.iter()
.all(GroupedAggregateExecutionSpec::admits_count_rows_dedicated_fold)
{
Self::CountRowsDedicated
} else {
Self::GenericReducers
}
}
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(in crate::db) enum GroupedExecutionRoute {
GlobalDistinctTopK,
GlobalDistinctFull,
CountRowsDedicated,
GenericTopK,
GenericFull,
}
impl GroupedExecutionRoute {
#[must_use]
pub(in crate::db) const fn from_planner_parts(
strategy: GroupedPlanStrategy,
fold_path: GroupedFoldPath,
distinct_strategy: &GroupedDistinctExecutionStrategy,
) -> Self {
let uses_global_distinct = distinct_strategy.global_distinct_target_slot().is_some();
let uses_top_k = strategy.is_top_k_group();
match (uses_global_distinct, uses_top_k, fold_path) {
(true, true, _) => Self::GlobalDistinctTopK,
(true, false, _) => Self::GlobalDistinctFull,
(false, true, _) => Self::GenericTopK,
(false, false, GroupedFoldPath::CountRowsDedicated) => Self::CountRowsDedicated,
(false, false, GroupedFoldPath::GenericReducers) => Self::GenericFull,
}
}
#[must_use]
pub(in crate::db) const fn uses_global_distinct_fold(self) -> bool {
matches!(self, Self::GlobalDistinctTopK | Self::GlobalDistinctFull)
}
#[must_use]
pub(in crate::db) const fn uses_top_k_group_selection(self) -> bool {
matches!(self, Self::GlobalDistinctTopK | Self::GenericTopK)
}
#[must_use]
pub(in crate::db) const fn uses_count_rows_dedicated_fold(self) -> bool {
matches!(self, Self::CountRowsDedicated)
}
}
#[derive(Clone)]
pub(in crate::db) struct GroupedExecutorHandoff<'a> {
base: &'a AccessPlannedQuery,
group_fields: &'a [FieldSlot],
#[cfg(test)]
aggregate_specs: Vec<GroupedAggregateExecutionSpec>,
grouped_aggregate_execution_specs: Vec<GroupedAggregateExecutionSpec>,
projection_layout: PlannedProjectionLayout,
projection_is_identity: bool,
grouped_plan_strategy: GroupedPlanStrategy,
grouped_execution_route: GroupedExecutionRoute,
grouped_distinct_policy_contract: GroupedDistinctPolicyContract,
having_expr: Option<&'a crate::db::query::plan::expr::Expr>,
execution: GroupedExecutionConfig,
}
impl<'a> GroupedExecutorHandoff<'a> {
#[must_use]
pub(in crate::db) const fn base(&self) -> &'a AccessPlannedQuery {
self.base
}
#[must_use]
pub(in crate::db) const fn group_fields(&self) -> &'a [FieldSlot] {
self.group_fields
}
#[cfg(test)]
#[must_use]
pub(in crate::db) const fn aggregate_specs(&self) -> &[GroupedAggregateExecutionSpec] {
self.aggregate_specs.as_slice()
}
#[cfg(test)]
#[must_use]
pub(in crate::db) const fn grouped_aggregate_execution_specs(
&self,
) -> &[GroupedAggregateExecutionSpec] {
self.grouped_aggregate_execution_specs.as_slice()
}
#[cfg(test)]
#[must_use]
pub(in crate::db) const fn projection_layout(&self) -> &PlannedProjectionLayout {
&self.projection_layout
}
#[must_use]
pub(in crate::db) const fn projection_is_identity(&self) -> bool {
self.projection_is_identity
}
#[must_use]
pub(in crate::db) const fn grouped_plan_strategy(&self) -> GroupedPlanStrategy {
self.grouped_plan_strategy
}
#[must_use]
pub(in crate::db) const fn grouped_execution_route(&self) -> GroupedExecutionRoute {
self.grouped_execution_route
}
#[cfg(test)]
#[must_use]
pub(in crate::db) const fn distinct_execution_strategy(
&self,
) -> &GroupedDistinctExecutionStrategy {
self.grouped_distinct_policy_contract.execution_strategy()
}
#[must_use]
pub(in crate::db) fn into_route_stage_residents(
self,
) -> (
Vec<GroupedAggregateExecutionSpec>,
PlannedProjectionLayout,
GroupedDistinctExecutionStrategy,
) {
(
self.grouped_aggregate_execution_specs,
self.projection_layout,
self.grouped_distinct_policy_contract
.into_execution_strategy(),
)
}
#[must_use]
pub(in crate::db) const fn distinct_policy_violation_for_executor(
&self,
) -> Option<GroupDistinctPolicyReason> {
self.grouped_distinct_policy_contract
.violation_for_executor()
}
#[must_use]
pub(in crate::db) const fn having_expr(
&self,
) -> Option<&'a crate::db::query::plan::expr::Expr> {
self.having_expr
}
#[must_use]
pub(in crate::db) const fn execution(&self) -> GroupedExecutionConfig {
self.execution
}
}
pub(in crate::db) fn grouped_executor_handoff(
plan: &AccessPlannedQuery,
) -> Result<GroupedExecutorHandoff<'_>, InternalError> {
let Some(grouped) = plan.grouped_plan() else {
return Err(InternalError::planner_executor_invariant(
"grouped executor handoff requires grouped logical plans",
));
};
let projection_spec = plan.frozen_projection_spec();
let (projection_layout, aggregate_specs, projection_is_identity) =
planned_projection_layout_and_aggregate_specs_from_spec(
projection_spec,
grouped.group.group_fields.as_slice(),
grouped.group.aggregates.as_slice(),
)?;
#[cfg(not(test))]
let _ = &aggregate_specs;
validate_grouped_projection_layout(&projection_layout)?;
let grouped_plan_strategy = grouped_plan_strategy(plan).ok_or_else(|| {
InternalError::planner_executor_invariant(
"grouped executor handoff must carry grouped strategy for grouped plans",
)
})?;
let grouped_aggregate_execution_specs = plan
.grouped_aggregate_execution_specs()
.ok_or_else(|| {
InternalError::planner_executor_invariant(
"grouped executor handoff requires frozen grouped aggregate execution specs",
)
})?
.to_vec();
let grouped_fold_path = GroupedFoldPath::from_plan_strategy(
grouped_plan_strategy,
grouped_aggregate_execution_specs.as_slice(),
);
let grouped_distinct_policy_contract = GroupedDistinctPolicyContract::new(
match grouped_distinct_admissibility(grouped.scalar.distinct, grouped.having_expr.is_some())
{
GroupDistinctAdmissibility::Allowed => None,
GroupDistinctAdmissibility::Disallowed(reason) => Some(reason),
},
plan.grouped_distinct_execution_strategy()
.ok_or_else(|| {
InternalError::planner_executor_invariant(
"grouped executor handoff requires frozen grouped DISTINCT strategy",
)
})?
.clone(),
);
let grouped_execution_route = GroupedExecutionRoute::from_planner_parts(
grouped_plan_strategy,
grouped_fold_path,
grouped_distinct_policy_contract.execution_strategy(),
);
Ok(GroupedExecutorHandoff {
base: plan,
group_fields: grouped.group.group_fields.as_slice(),
#[cfg(test)]
aggregate_specs,
grouped_aggregate_execution_specs,
projection_layout,
projection_is_identity,
grouped_plan_strategy,
grouped_execution_route,
grouped_distinct_policy_contract,
having_expr: grouped.having_expr.as_ref(),
execution: grouped.group.execution,
})
}
pub(in crate::db) fn grouped_aggregate_execution_specs(
model: &EntityModel,
aggregate_specs: &[GroupedAggregateExecutionSpec],
) -> Result<Vec<GroupedAggregateExecutionSpec>, InternalError> {
aggregate_specs
.iter()
.map(|aggregate_spec| aggregate_spec.resolve_for_model(model))
.collect()
}
pub(in crate::db) fn grouped_aggregate_specs_from_projection_spec(
projection_spec: &ProjectionSpec,
group_fields: &[FieldSlot],
aggregates: &[GroupAggregateSpec],
) -> Result<Vec<GroupedAggregateExecutionSpec>, InternalError> {
let (_, aggregate_specs, _) = planned_projection_layout_and_aggregate_specs_from_spec(
projection_spec,
group_fields,
aggregates,
)?;
Ok(aggregate_specs)
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub(in crate::db) struct GroupedDistinctPolicyContract {
violation_for_executor: Option<GroupDistinctPolicyReason>,
execution_strategy: GroupedDistinctExecutionStrategy,
}
impl GroupedDistinctPolicyContract {
#[must_use]
const fn new(
violation_for_executor: Option<GroupDistinctPolicyReason>,
execution_strategy: GroupedDistinctExecutionStrategy,
) -> Self {
Self {
violation_for_executor,
execution_strategy,
}
}
#[must_use]
pub(in crate::db) const fn execution_strategy(&self) -> &GroupedDistinctExecutionStrategy {
&self.execution_strategy
}
#[must_use]
pub(in crate::db) fn into_execution_strategy(self) -> GroupedDistinctExecutionStrategy {
self.execution_strategy
}
#[must_use]
pub(in crate::db) const fn violation_for_executor(&self) -> Option<GroupDistinctPolicyReason> {
self.violation_for_executor
}
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub(in crate::db) enum GroupedDistinctExecutionStrategy {
None,
GlobalDistinctFieldCount {
target_field: String,
target_slot: FieldSlot,
},
GlobalDistinctFieldSum {
target_field: String,
target_slot: FieldSlot,
},
GlobalDistinctFieldAvg {
target_field: String,
target_slot: FieldSlot,
},
}
impl GroupedDistinctExecutionStrategy {
#[must_use]
pub(in crate::db) const fn global_distinct_target_slot(&self) -> Option<&FieldSlot> {
match self {
Self::None => None,
Self::GlobalDistinctFieldCount { target_slot, .. }
| Self::GlobalDistinctFieldSum { target_slot, .. }
| Self::GlobalDistinctFieldAvg { target_slot, .. } => Some(target_slot),
}
}
#[must_use]
pub(in crate::db) const fn global_distinct_aggregate_kind(&self) -> Option<AggregateKind> {
match self {
Self::None => None,
Self::GlobalDistinctFieldCount { .. } => Some(AggregateKind::Count),
Self::GlobalDistinctFieldSum { .. } => Some(AggregateKind::Sum),
Self::GlobalDistinctFieldAvg { .. } => Some(AggregateKind::Avg),
}
}
#[must_use]
pub(in crate::db) const fn from_supported_global_distinct(
kind: GlobalDistinctAggregateKind,
target_field: String,
target_slot: FieldSlot,
) -> Self {
match kind {
GlobalDistinctAggregateKind::Count => Self::GlobalDistinctFieldCount {
target_field,
target_slot,
},
GlobalDistinctAggregateKind::Sum => Self::GlobalDistinctFieldSum {
target_field,
target_slot,
},
GlobalDistinctAggregateKind::Avg => Self::GlobalDistinctFieldAvg {
target_field,
target_slot,
},
}
}
}
pub(in crate::db) fn resolved_grouped_distinct_execution_strategy_for_model(
model: &EntityModel,
group_fields: &[FieldSlot],
aggregates: &[GroupAggregateSpec],
having_expr: Option<&crate::db::query::plan::expr::Expr>,
) -> Result<GroupedDistinctExecutionStrategy, InternalError> {
match resolve_global_distinct_field_aggregate(group_fields, aggregates, having_expr) {
Ok(Some(aggregate)) => {
let target_field = aggregate.target_field().to_string();
let target_slot =
resolve_aggregate_target_field_slot(model, aggregate.target_field()).map_err(
|err| {
InternalError::planner_executor_invariant(format!(
"grouped DISTINCT strategy target slot resolution failed: field='{}', error={err}",
aggregate.target_field(),
))
},
)?;
let distinct_kind = aggregate.kind().global_distinct_kind().ok_or_else(|| {
InternalError::planner_executor_invariant(
"planner grouped DISTINCT strategy handoff must lower only COUNT/SUM/AVG field-target aggregates",
)
})?;
Ok(
GroupedDistinctExecutionStrategy::from_supported_global_distinct(
distinct_kind,
target_field,
target_slot,
),
)
}
Ok(None) => Ok(GroupedDistinctExecutionStrategy::None),
Err(reason) => Err(reason.into_planner_handoff_internal_error()),
}
}
fn planned_projection_layout_and_aggregate_specs_core(
projection_spec: &ProjectionSpec,
group_fields: &[FieldSlot],
aggregates: &[GroupAggregateSpec],
) -> (
PlannedProjectionLayout,
Vec<GroupedAggregateExecutionSpec>,
bool,
) {
let grouped_field_names = group_fields
.iter()
.map(FieldSlot::field)
.collect::<Vec<_>>();
let mut group_field_positions = Vec::new();
let mut aggregate_positions = Vec::new();
let mut aggregate_specs = Vec::new();
let mut projection_is_identity =
projection_spec.len() == group_fields.len().saturating_add(aggregates.len());
let mut next_group_field_index = 0usize;
let mut next_aggregate_index = 0usize;
for (index, field) in projection_spec.fields().enumerate() {
let root_expr = expression_without_alias(field.expr());
let aggregate_scan =
collect_grouped_projection_aggregate_scan(root_expr, &mut aggregate_specs);
match root_expr {
Expr::Field(field_id) => {
group_field_positions.push(index);
projection_is_identity &= next_aggregate_index == 0
&& group_fields
.get(next_group_field_index)
.is_some_and(|group_field| field_id.as_str() == group_field.field.as_str());
next_group_field_index = next_group_field_index.saturating_add(1);
}
Expr::Aggregate(aggregate_expr) => {
aggregate_positions.push(index);
let aggregate_spec =
GroupedAggregateExecutionSpec::from_aggregate_expr(aggregate_expr);
projection_is_identity &= next_group_field_index == group_fields.len()
&& aggregates
.get(next_aggregate_index)
.is_some_and(|aggregate| {
aggregate_spec.matches_aggregate_identity(aggregate)
});
next_aggregate_index = next_aggregate_index
.saturating_add(aggregate_scan.introduced_aggregate_count());
}
_ if aggregate_scan.contains_aggregate() => {
aggregate_positions.push(index);
projection_is_identity = false;
next_aggregate_index = next_aggregate_index
.saturating_add(aggregate_scan.introduced_aggregate_count());
}
_ if root_expr.references_only_fields(grouped_field_names.as_slice()) => {
group_field_positions.push(index);
projection_is_identity &= grouped_projection_expression_preserves_identity(
root_expr,
group_fields,
next_group_field_index,
next_aggregate_index,
);
next_group_field_index = next_group_field_index.saturating_add(1);
}
_ => {
group_field_positions.push(index);
projection_is_identity = false;
next_group_field_index = next_group_field_index.saturating_add(1);
}
}
}
projection_is_identity &=
next_group_field_index == group_fields.len() && next_aggregate_index == aggregates.len();
(
PlannedProjectionLayout {
group_field_positions,
aggregate_positions,
},
aggregate_specs,
projection_is_identity,
)
}
#[cfg_attr(
not(test),
expect(
clippy::unnecessary_wraps,
reason = "test builds keep one extra grouped projection strictness pass while non-test builds stay on the planner core path"
)
)]
fn planned_projection_layout_and_aggregate_specs_from_spec(
projection_spec: &ProjectionSpec,
group_fields: &[FieldSlot],
aggregates: &[GroupAggregateSpec],
) -> Result<
(
PlannedProjectionLayout,
Vec<GroupedAggregateExecutionSpec>,
bool,
),
InternalError,
> {
#[cfg(test)]
{
let grouped_field_names = group_fields
.iter()
.map(FieldSlot::field)
.collect::<Vec<_>>();
for (index, field) in projection_spec.fields().enumerate() {
let root_expr = expression_without_alias(field.expr());
if !root_expr.references_only_fields(grouped_field_names.as_slice()) {
return Err(InternalError::planner_executor_invariant(format!(
"grouped projection layout expects only field/aggregate expressions; found non-grouped projection expression at index={index}",
)));
}
}
}
Ok(planned_projection_layout_and_aggregate_specs_core(
projection_spec,
group_fields,
aggregates,
))
}
fn grouped_projection_expression_preserves_identity(
root_expr: &Expr,
group_fields: &[FieldSlot],
next_group_field_index: usize,
next_aggregate_index: usize,
) -> bool {
next_aggregate_index == 0
&& matches!(
root_expr,
Expr::Field(field_id)
if group_fields.get(next_group_field_index).is_some_and(
|group_field| field_id.as_str() == group_field.field.as_str(),
)
)
}
fn collect_grouped_projection_aggregate_scan(
expr: &Expr,
aggregate_specs: &mut Vec<GroupedAggregateExecutionSpec>,
) -> GroupedProjectionAggregateScan {
let mut introduced_aggregate_count = 0usize;
expr.try_for_each_tree_aggregate(&mut |aggregate_expr| {
introduced_aggregate_count =
introduced_aggregate_count.saturating_add(push_unique_grouped_aggregate_spec(
aggregate_specs,
GroupedAggregateExecutionSpec::from_aggregate_expr(aggregate_expr),
));
Ok::<(), InternalError>(())
})
.expect("grouped projection aggregate collection cannot fail");
if expr.contains_aggregate() {
GroupedProjectionAggregateScan::found_aggregate(introduced_aggregate_count)
} else {
GroupedProjectionAggregateScan::none()
}
}
fn push_unique_grouped_aggregate_spec(
aggregate_specs: &mut Vec<GroupedAggregateExecutionSpec>,
aggregate_spec: GroupedAggregateExecutionSpec,
) -> usize {
if aggregate_specs
.iter()
.all(|current| current != &aggregate_spec)
{
aggregate_specs.push(aggregate_spec);
return 1;
}
0
}
#[cfg_attr(
not(test),
expect(
clippy::missing_const_for_fn,
reason = "test-only alias stripping keeps the shared helper non-const across the full target matrix"
)
)]
fn expression_without_alias(expr: &Expr) -> &Expr {
#[cfg(test)]
{
let mut current = expr;
while let Expr::Alias { expr: inner, .. } = current {
current = inner.as_ref();
}
current
}
#[cfg(not(test))]
expr
}