use crate::db::{
access::AccessPlan,
query::plan::{
AccessPlannedQuery, AggregateKind, FieldSlot, GroupAggregateSpec, OrderSpec,
expr::{
GroupedOrderTermAdmissibility, GroupedTopKOrderTermAdmissibility,
classify_grouped_order_term_for_field, classify_grouped_top_k_order_term,
grouped_top_k_order_term_requires_heap,
},
},
};
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
enum GroupedPlanFamily {
Hash,
Ordered,
TopK,
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(crate) enum GroupedPlanAggregateFamily {
CountRowsOnly,
FieldTargetRows,
GenericRows,
}
impl GroupedPlanAggregateFamily {
#[must_use]
pub(crate) const fn code(self) -> &'static str {
match self {
Self::CountRowsOnly => "count_rows_only",
Self::FieldTargetRows => "field_target_rows",
Self::GenericRows => "generic_rows",
}
}
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(crate) enum GroupedPlanFallbackReason {
DistinctGroupingNotAdmitted,
ResidualPredicateBlocksGroupedOrder,
AggregateStreamingNotSupported,
HavingBlocksGroupedOrder,
GroupKeyOrderPrefixMismatch,
GroupKeyOrderExpressionNotAdmissible,
GroupKeyOrderUnavailable,
}
impl GroupedPlanFallbackReason {
#[must_use]
pub(in crate::db) const fn code(self) -> &'static str {
match self {
Self::DistinctGroupingNotAdmitted => "distinct_grouping_not_admitted",
Self::ResidualPredicateBlocksGroupedOrder => "residual_predicate_blocks_grouped_order",
Self::AggregateStreamingNotSupported => "aggregate_streaming_not_supported",
Self::HavingBlocksGroupedOrder => "having_blocks_grouped_order",
Self::GroupKeyOrderPrefixMismatch => "group_key_order_prefix_mismatch",
Self::GroupKeyOrderExpressionNotAdmissible => {
"group_key_order_expression_not_admissible"
}
Self::GroupKeyOrderUnavailable => "group_key_order_unavailable",
}
}
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(crate) struct GroupedPlanStrategy {
family: GroupedPlanFamily,
aggregate_family: GroupedPlanAggregateFamily,
fallback_reason: Option<GroupedPlanFallbackReason>,
}
impl GroupedPlanStrategy {
#[must_use]
pub(crate) const fn code(self) -> &'static str {
match self.family {
GroupedPlanFamily::Hash => "hash_group",
GroupedPlanFamily::Ordered => "ordered_group",
GroupedPlanFamily::TopK => "top_k_group",
}
}
#[cfg(test)]
#[must_use]
pub(crate) const fn hash_group(reason: GroupedPlanFallbackReason) -> Self {
Self::hash_group_with_aggregate_family(reason, GroupedPlanAggregateFamily::CountRowsOnly)
}
#[must_use]
pub(crate) const fn hash_group_with_aggregate_family(
reason: GroupedPlanFallbackReason,
aggregate_family: GroupedPlanAggregateFamily,
) -> Self {
Self {
family: GroupedPlanFamily::Hash,
aggregate_family,
fallback_reason: Some(reason),
}
}
#[cfg(test)]
#[must_use]
pub(crate) const fn ordered_group() -> Self {
Self::ordered_group_with_aggregate_family(GroupedPlanAggregateFamily::CountRowsOnly)
}
#[must_use]
pub(crate) const fn ordered_group_with_aggregate_family(
aggregate_family: GroupedPlanAggregateFamily,
) -> Self {
Self {
family: GroupedPlanFamily::Ordered,
aggregate_family,
fallback_reason: None,
}
}
#[must_use]
pub(crate) const fn top_k_group_with_aggregate_family(
aggregate_family: GroupedPlanAggregateFamily,
) -> Self {
Self {
family: GroupedPlanFamily::TopK,
aggregate_family,
fallback_reason: None,
}
}
#[must_use]
pub(crate) const fn is_ordered_group(self) -> bool {
matches!(self.family, GroupedPlanFamily::Ordered)
}
#[must_use]
pub(crate) const fn is_top_k_group(self) -> bool {
matches!(self.family, GroupedPlanFamily::TopK)
}
#[must_use]
pub(crate) const fn ordered_group_admitted(self) -> bool {
self.is_ordered_group()
}
#[must_use]
pub(crate) const fn aggregate_family(self) -> GroupedPlanAggregateFamily {
self.aggregate_family
}
#[must_use]
pub(crate) const fn is_single_count_rows(self) -> bool {
matches!(
self.aggregate_family,
GroupedPlanAggregateFamily::CountRowsOnly
)
}
#[must_use]
pub(crate) const fn fallback_reason(self) -> Option<GroupedPlanFallbackReason> {
self.fallback_reason
}
}
#[must_use]
pub(in crate::db) fn grouped_plan_strategy(
plan: &AccessPlannedQuery,
) -> Option<GroupedPlanStrategy> {
let grouped = plan.grouped_plan()?;
let aggregate_family = grouped_plan_aggregate_family(grouped.group.aggregates.as_slice());
let order_strategy_projection = grouped_order_strategy_projection(
grouped.scalar.order.as_ref(),
grouped.group.group_fields.as_slice(),
);
if grouped.scalar.distinct {
return Some(hash_group_fallback_strategy(
GroupedPlanFallbackReason::DistinctGroupingNotAdmitted,
aggregate_family,
));
}
if matches!(
order_strategy_projection,
GroupedOrderStrategyProjection::TopK
) {
return Some(GroupedPlanStrategy::top_k_group_with_aggregate_family(
aggregate_family,
));
}
if plan.has_residual_predicate() {
return Some(hash_group_fallback_strategy(
GroupedPlanFallbackReason::ResidualPredicateBlocksGroupedOrder,
aggregate_family,
));
}
if !matches!(
order_strategy_projection,
GroupedOrderStrategyProjection::TopK
) && !grouped_aggregates_streaming_compatible(grouped.group.aggregates.as_slice())
{
return Some(hash_group_fallback_strategy(
GroupedPlanFallbackReason::AggregateStreamingNotSupported,
aggregate_family,
));
}
if !crate::db::query::plan::semantics::group_having::grouped_having_streaming_compatible(
grouped.having_expr.as_ref(),
) {
return Some(hash_group_fallback_strategy(
GroupedPlanFallbackReason::HavingBlocksGroupedOrder,
aggregate_family,
));
}
match order_strategy_projection {
GroupedOrderStrategyProjection::Canonical => {}
GroupedOrderStrategyProjection::TopK => unreachable!(
"bounded grouped Top-K lane should be reserved before streaming-only fallback checks"
),
GroupedOrderStrategyProjection::HashFallback(reason) => {
return Some(hash_group_fallback_strategy(reason, aggregate_family));
}
}
if grouped_access_path_proves_group_order(grouped.group.group_fields.as_slice(), &plan.access) {
return Some(GroupedPlanStrategy::ordered_group_with_aggregate_family(
aggregate_family,
));
}
Some(hash_group_fallback_strategy(
GroupedPlanFallbackReason::GroupKeyOrderUnavailable,
aggregate_family,
))
}
pub(in crate::db) fn grouped_plan_aggregate_family(
aggregates: &[GroupAggregateSpec],
) -> GroupedPlanAggregateFamily {
if matches!(aggregates, [aggregate] if aggregate.kind() == AggregateKind::Count
&& aggregate.target_field().is_none()
&& !aggregate.distinct())
{
return GroupedPlanAggregateFamily::CountRowsOnly;
}
if aggregates.iter().all(|aggregate| {
aggregate.target_field().is_some()
&& matches!(
aggregate.kind(),
AggregateKind::Count
| AggregateKind::Sum
| AggregateKind::Avg
| AggregateKind::Min
| AggregateKind::Max
)
}) {
return GroupedPlanAggregateFamily::FieldTargetRows;
}
if aggregates.iter().all(|aggregate| {
aggregate.target_field().is_none()
&& matches!(
aggregate.kind(),
AggregateKind::Exists
| AggregateKind::Min
| AggregateKind::Max
| AggregateKind::First
| AggregateKind::Last
)
}) {
return GroupedPlanAggregateFamily::GenericRows;
}
GroupedPlanAggregateFamily::GenericRows
}
fn grouped_aggregates_streaming_compatible(aggregates: &[GroupAggregateSpec]) -> bool {
aggregates
.iter()
.all(GroupAggregateSpec::streaming_compatible_v1)
}
const fn hash_group_fallback_strategy(
reason: GroupedPlanFallbackReason,
aggregate_family: GroupedPlanAggregateFamily,
) -> GroupedPlanStrategy {
GroupedPlanStrategy::hash_group_with_aggregate_family(reason, aggregate_family)
}
enum GroupedOrderStrategyProjection {
Canonical,
TopK,
HashFallback(GroupedPlanFallbackReason),
}
fn grouped_order_strategy_projection(
order: Option<&OrderSpec>,
group_fields: &[FieldSlot],
) -> GroupedOrderStrategyProjection {
let Some(order) = order else {
return GroupedOrderStrategyProjection::Canonical;
};
let grouped_field_names = group_fields
.iter()
.map(FieldSlot::field)
.collect::<Vec<_>>();
let mut top_k_required = false;
for (index, (order_field, _)) in order.fields.iter().enumerate() {
let aggregate_driven = grouped_top_k_order_term_requires_heap(order_field);
if index < group_fields.len() {
match classify_grouped_order_term_for_field(order_field, group_fields[index].field()) {
GroupedOrderTermAdmissibility::Preserves(_) => continue,
GroupedOrderTermAdmissibility::PrefixMismatch => {
if !aggregate_driven {
return GroupedOrderStrategyProjection::HashFallback(
GroupedPlanFallbackReason::GroupKeyOrderPrefixMismatch,
);
}
}
GroupedOrderTermAdmissibility::UnsupportedExpression => {
if !aggregate_driven {
return GroupedOrderStrategyProjection::HashFallback(
GroupedPlanFallbackReason::GroupKeyOrderExpressionNotAdmissible,
);
}
}
}
}
if !aggregate_driven {
continue;
}
match classify_grouped_top_k_order_term(order_field, grouped_field_names.as_slice()) {
GroupedTopKOrderTermAdmissibility::Admissible => {
top_k_required = true;
}
GroupedTopKOrderTermAdmissibility::NonGroupFieldReference => {
return GroupedOrderStrategyProjection::HashFallback(
GroupedPlanFallbackReason::GroupKeyOrderPrefixMismatch,
);
}
GroupedTopKOrderTermAdmissibility::UnsupportedExpression => {
return GroupedOrderStrategyProjection::HashFallback(
GroupedPlanFallbackReason::GroupKeyOrderExpressionNotAdmissible,
);
}
}
}
if top_k_required {
return GroupedOrderStrategyProjection::TopK;
}
if order.fields.len() < group_fields.len() {
return GroupedOrderStrategyProjection::HashFallback(
GroupedPlanFallbackReason::GroupKeyOrderPrefixMismatch,
);
}
GroupedOrderStrategyProjection::Canonical
}
fn grouped_access_path_proves_group_order<K>(
group_fields: &[FieldSlot],
access: &AccessPlan<K>,
) -> bool {
let executable = access.resolve_strategy();
let Some(path) = executable.as_path() else {
return false;
};
let Some((index, prefix_len)) = path
.index_prefix_details()
.or_else(|| path.index_range_details())
else {
return false;
};
let index_fields = index.fields();
let mut cursor = 0usize;
for group_field in group_fields {
while cursor < prefix_len
&& cursor < index_fields.len()
&& index_fields[cursor] != group_field.field()
{
cursor = cursor.saturating_add(1);
}
if cursor >= index_fields.len() || index_fields[cursor] != group_field.field() {
return false;
}
cursor = cursor.saturating_add(1);
}
true
}