use crate::db::{
access::AccessPlan,
query::plan::{
AccessPlannedQuery, FieldSlot, GroupAggregateSpec, GroupedPlanAggregateFamily, OrderSpec,
expr::{
GroupedOrderTermAdmissibility, GroupedTopKOrderTermAdmissibility,
classify_grouped_order_term_for_field, classify_grouped_top_k_order_term,
grouped_top_k_order_term_requires_heap,
},
},
};
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
enum GroupedPlanFamily {
Hash,
Ordered,
TopK,
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(crate) enum GroupedPlanFallbackReason {
DistinctGroupingNotAdmitted,
ResidualFilterBlocksGroupedOrder,
AggregateStreamingNotSupported,
HavingBlocksGroupedOrder,
GroupKeyOrderPrefixMismatch,
GroupKeyOrderExpressionNotAdmissible,
GroupKeyOrderUnavailable,
}
impl GroupedPlanFallbackReason {
#[must_use]
pub(in crate::db) const fn code(self) -> &'static str {
match self {
Self::DistinctGroupingNotAdmitted => "distinct_grouping_not_admitted",
Self::ResidualFilterBlocksGroupedOrder => "residual_filter_blocks_grouped_order",
Self::AggregateStreamingNotSupported => "aggregate_streaming_not_supported",
Self::HavingBlocksGroupedOrder => "having_blocks_grouped_order",
Self::GroupKeyOrderPrefixMismatch => "group_key_order_prefix_mismatch",
Self::GroupKeyOrderExpressionNotAdmissible => {
"group_key_order_expression_not_admissible"
}
Self::GroupKeyOrderUnavailable => "group_key_order_unavailable",
}
}
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(crate) struct GroupedPlanStrategy {
family: GroupedPlanFamily,
aggregate_family: GroupedPlanAggregateFamily,
fallback_reason: Option<GroupedPlanFallbackReason>,
}
impl GroupedPlanStrategy {
#[must_use]
pub(crate) const fn code(self) -> &'static str {
match self.family {
GroupedPlanFamily::Hash => "hash_group",
GroupedPlanFamily::Ordered => "ordered_group",
GroupedPlanFamily::TopK => "top_k_group",
}
}
#[cfg(test)]
#[must_use]
pub(crate) const fn hash_group(reason: GroupedPlanFallbackReason) -> Self {
Self::hash_group_with_aggregate_family(reason, GroupedPlanAggregateFamily::CountRowsOnly)
}
#[must_use]
pub(crate) const fn hash_group_with_aggregate_family(
reason: GroupedPlanFallbackReason,
aggregate_family: GroupedPlanAggregateFamily,
) -> Self {
Self {
family: GroupedPlanFamily::Hash,
aggregate_family,
fallback_reason: Some(reason),
}
}
#[cfg(test)]
#[must_use]
pub(crate) const fn ordered_group() -> Self {
Self::ordered_group_with_aggregate_family(GroupedPlanAggregateFamily::CountRowsOnly)
}
#[must_use]
pub(crate) const fn ordered_group_with_aggregate_family(
aggregate_family: GroupedPlanAggregateFamily,
) -> Self {
Self {
family: GroupedPlanFamily::Ordered,
aggregate_family,
fallback_reason: None,
}
}
#[must_use]
pub(crate) const fn top_k_group_with_aggregate_family(
aggregate_family: GroupedPlanAggregateFamily,
) -> Self {
Self {
family: GroupedPlanFamily::TopK,
aggregate_family,
fallback_reason: None,
}
}
#[must_use]
pub(crate) const fn is_ordered_group(self) -> bool {
matches!(self.family, GroupedPlanFamily::Ordered)
}
#[must_use]
pub(crate) const fn is_top_k_group(self) -> bool {
matches!(self.family, GroupedPlanFamily::TopK)
}
#[must_use]
pub(crate) const fn ordered_group_admitted(self) -> bool {
self.is_ordered_group()
}
#[must_use]
pub(crate) const fn aggregate_family(self) -> GroupedPlanAggregateFamily {
self.aggregate_family
}
#[must_use]
pub(crate) const fn is_single_count_rows(self) -> bool {
matches!(
self.aggregate_family,
GroupedPlanAggregateFamily::CountRowsOnly
)
}
#[must_use]
pub(crate) const fn fallback_reason(self) -> Option<GroupedPlanFallbackReason> {
self.fallback_reason
}
}
#[must_use]
pub(in crate::db) fn grouped_plan_strategy(
plan: &AccessPlannedQuery,
) -> Option<GroupedPlanStrategy> {
let grouped = plan.grouped_plan()?;
let aggregate_family =
GroupedPlanAggregateFamily::from_grouped_aggregates(grouped.group.aggregates.as_slice());
let order_strategy_projection = grouped_order_strategy_projection(
grouped.scalar.order.as_ref(),
grouped.group.group_fields.as_slice(),
);
if grouped.scalar.distinct {
return Some(hash_group_fallback_strategy(
GroupedPlanFallbackReason::DistinctGroupingNotAdmitted,
aggregate_family,
));
}
if matches!(
order_strategy_projection,
GroupedOrderStrategyProjection::TopK
) {
return Some(GroupedPlanStrategy::top_k_group_with_aggregate_family(
aggregate_family,
));
}
if plan.has_residual_filter_expr() || plan.has_residual_filter_predicate() {
return Some(hash_group_fallback_strategy(
GroupedPlanFallbackReason::ResidualFilterBlocksGroupedOrder,
aggregate_family,
));
}
if !matches!(
order_strategy_projection,
GroupedOrderStrategyProjection::TopK
) && !grouped_aggregates_streaming_compatible(grouped.group.aggregates.as_slice())
{
return Some(hash_group_fallback_strategy(
GroupedPlanFallbackReason::AggregateStreamingNotSupported,
aggregate_family,
));
}
if !crate::db::query::plan::semantics::group_having::grouped_having_streaming_compatible(
grouped.having_expr.as_ref(),
) {
return Some(hash_group_fallback_strategy(
GroupedPlanFallbackReason::HavingBlocksGroupedOrder,
aggregate_family,
));
}
match order_strategy_projection {
GroupedOrderStrategyProjection::Canonical => {}
GroupedOrderStrategyProjection::TopK => unreachable!(
"bounded grouped Top-K lane should be reserved before streaming-only fallback checks"
),
GroupedOrderStrategyProjection::HashFallback(reason) => {
return Some(hash_group_fallback_strategy(reason, aggregate_family));
}
}
if grouped_access_path_proves_group_order(grouped.group.group_fields.as_slice(), &plan.access) {
return Some(GroupedPlanStrategy::ordered_group_with_aggregate_family(
aggregate_family,
));
}
Some(hash_group_fallback_strategy(
GroupedPlanFallbackReason::GroupKeyOrderUnavailable,
aggregate_family,
))
}
fn grouped_aggregates_streaming_compatible(aggregates: &[GroupAggregateSpec]) -> bool {
aggregates
.iter()
.all(GroupAggregateSpec::streaming_compatible_v1)
}
const fn hash_group_fallback_strategy(
reason: GroupedPlanFallbackReason,
aggregate_family: GroupedPlanAggregateFamily,
) -> GroupedPlanStrategy {
GroupedPlanStrategy::hash_group_with_aggregate_family(reason, aggregate_family)
}
enum GroupedOrderStrategyProjection {
Canonical,
TopK,
HashFallback(GroupedPlanFallbackReason),
}
fn grouped_order_strategy_projection(
order: Option<&OrderSpec>,
group_fields: &[FieldSlot],
) -> GroupedOrderStrategyProjection {
let Some(order) = order else {
return GroupedOrderStrategyProjection::Canonical;
};
let grouped_field_names = group_fields
.iter()
.map(FieldSlot::field)
.collect::<Vec<_>>();
let top_k_required = order
.fields
.iter()
.any(|term| grouped_top_k_order_term_requires_heap(term.expr()));
if top_k_required {
return grouped_top_k_strategy_projection(order, grouped_field_names.as_slice());
}
grouped_canonical_order_strategy_projection(order, group_fields)
}
fn grouped_canonical_order_strategy_projection(
order: &OrderSpec,
group_fields: &[FieldSlot],
) -> GroupedOrderStrategyProjection {
if order.fields.len() < group_fields.len() {
return GroupedOrderStrategyProjection::HashFallback(
GroupedPlanFallbackReason::GroupKeyOrderPrefixMismatch,
);
}
for (index, term) in order.fields.iter().enumerate() {
if index < group_fields.len() {
match classify_grouped_order_term_for_field(term.expr(), group_fields[index].field()) {
GroupedOrderTermAdmissibility::Preserves(_) => {}
GroupedOrderTermAdmissibility::PrefixMismatch => {
return GroupedOrderStrategyProjection::HashFallback(
GroupedPlanFallbackReason::GroupKeyOrderPrefixMismatch,
);
}
GroupedOrderTermAdmissibility::UnsupportedExpression => {
return GroupedOrderStrategyProjection::HashFallback(
GroupedPlanFallbackReason::GroupKeyOrderExpressionNotAdmissible,
);
}
}
}
}
GroupedOrderStrategyProjection::Canonical
}
fn grouped_top_k_strategy_projection(
order: &OrderSpec,
grouped_field_names: &[&str],
) -> GroupedOrderStrategyProjection {
for term in &order.fields {
match classify_grouped_top_k_order_term(term.expr(), grouped_field_names) {
GroupedTopKOrderTermAdmissibility::Admissible => {}
GroupedTopKOrderTermAdmissibility::NonGroupFieldReference => {
return GroupedOrderStrategyProjection::HashFallback(
GroupedPlanFallbackReason::GroupKeyOrderPrefixMismatch,
);
}
GroupedTopKOrderTermAdmissibility::UnsupportedExpression => {
return GroupedOrderStrategyProjection::HashFallback(
GroupedPlanFallbackReason::GroupKeyOrderExpressionNotAdmissible,
);
}
}
}
GroupedOrderStrategyProjection::TopK
}
fn grouped_access_path_proves_group_order<K>(
group_fields: &[FieldSlot],
access: &AccessPlan<K>,
) -> bool {
let executable = access.executable_contract();
let Some(path) = executable.as_path() else {
return false;
};
let Some(details) = path
.index_prefix_details()
.or_else(|| path.index_range_details())
else {
return false;
};
let index = details.index();
let prefix_len = details.slot_arity();
let index_fields = index.fields();
let mut cursor = 0usize;
for group_field in group_fields {
while cursor < prefix_len
&& cursor < index_fields.len()
&& index_fields[cursor] != group_field.field()
{
cursor = cursor.saturating_add(1);
}
if cursor >= index_fields.len() || index_fields[cursor] != group_field.field() {
return false;
}
cursor = cursor.saturating_add(1);
}
true
}