use crate::{
db::{
executor::{StructuralAggregateTerminal, StructuralAggregateTerminalKind},
query::plan::{AggregateKind, FieldSlot, expr::Expr, resolve_aggregate_target_field_slot},
sql::lowering::{
SqlLoweringError,
aggregate::{
identity::{
PreparedAggregateIdentity, PreparedAggregateTarget, PreparedAggregateTerminal,
aggregate_input_from_identity,
},
lowering::validate_model_bound_scalar_expr,
terminal::{AggregateInput, SqlGlobalAggregateTerminal},
},
},
},
model::entity::EntityModel,
};
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(crate) enum PreparedSqlScalarAggregateRuntimeDescriptor {
CountRows,
CountField,
NumericField { kind: AggregateKind },
ExtremalWinnerField { kind: AggregateKind },
}
pub(crate) type PreparedSqlScalarAggregateDescriptorShape =
PreparedSqlScalarAggregateRuntimeDescriptor;
impl PreparedSqlScalarAggregateRuntimeDescriptor {
#[must_use]
pub(crate) const fn runtime_descriptor(self) -> Self {
self
}
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub(crate) struct PreparedSqlScalarAggregateStrategy {
identity: PreparedAggregateIdentity,
filter_expr: Option<Expr>,
}
impl PreparedSqlScalarAggregateStrategy {
const fn from_identity(identity: PreparedAggregateIdentity, filter_expr: Option<Expr>) -> Self {
Self {
identity,
filter_expr,
}
}
pub(in crate::db::sql::lowering::aggregate) fn from_lowered_terminal(
model: &'static EntityModel,
terminal: SqlGlobalAggregateTerminal,
) -> Result<Self, SqlLoweringError> {
let (identity, filter_expr) =
PreparedAggregateTerminal::from_terminal(terminal).into_parts();
let kind = identity.kind();
let distinct_input = identity.distinct();
let target = match aggregate_input_from_identity(identity) {
AggregateInput::Rows => PreparedAggregateTarget::Rows,
AggregateInput::Field(field) => {
let target_slot = resolve_aggregate_target_field_slot(model, field.as_str())
.map_err(SqlLoweringError::from)?;
PreparedAggregateTarget::Field(target_slot)
}
AggregateInput::Expr(input_expr) => {
validate_model_bound_scalar_expr(
model,
&input_expr,
SqlLoweringError::unsupported_aggregate_input_expressions,
)?;
PreparedAggregateTarget::Expr(input_expr)
}
};
Ok(Self::from_identity(
PreparedAggregateIdentity::from_parts(kind, target, distinct_input),
filter_expr,
))
}
#[must_use]
pub(crate) const fn target_slot(&self) -> Option<&FieldSlot> {
self.identity.target_slot()
}
#[cfg(test)]
#[must_use]
pub(crate) const fn input_expr(&self) -> Option<&Expr> {
self.identity.input_expr()
}
#[must_use]
pub(crate) const fn filter_expr(&self) -> Option<&Expr> {
self.filter_expr.as_ref()
}
#[cfg(test)]
#[must_use]
pub(crate) const fn is_distinct(&self) -> bool {
self.identity.distinct_input()
}
#[cfg(test)]
#[must_use]
pub(crate) const fn descriptor_shape(&self) -> PreparedSqlScalarAggregateDescriptorShape {
self.prepared_descriptor_shape()
}
#[cfg(test)]
#[must_use]
pub(crate) const fn runtime_descriptor(&self) -> PreparedSqlScalarAggregateRuntimeDescriptor {
self.descriptor_shape().runtime_descriptor()
}
#[must_use]
pub(crate) const fn aggregate_kind(&self) -> AggregateKind {
self.identity.aggregate_kind()
}
const fn prepared_descriptor_shape(&self) -> PreparedSqlScalarAggregateDescriptorShape {
match &self.identity {
PreparedAggregateIdentity::Count {
target: PreparedAggregateTarget::Rows,
..
} => PreparedSqlScalarAggregateDescriptorShape::CountRows,
PreparedAggregateIdentity::Count { .. } => {
PreparedSqlScalarAggregateDescriptorShape::CountField
}
PreparedAggregateIdentity::Sum { .. } => {
PreparedSqlScalarAggregateDescriptorShape::NumericField {
kind: AggregateKind::Sum,
}
}
PreparedAggregateIdentity::Avg { .. } => {
PreparedSqlScalarAggregateDescriptorShape::NumericField {
kind: AggregateKind::Avg,
}
}
PreparedAggregateIdentity::Min { .. } => {
PreparedSqlScalarAggregateDescriptorShape::ExtremalWinnerField {
kind: AggregateKind::Min,
}
}
PreparedAggregateIdentity::Max { .. } => {
PreparedSqlScalarAggregateDescriptorShape::ExtremalWinnerField {
kind: AggregateKind::Max,
}
}
}
}
pub(in crate::db) fn into_executor_terminal(
self,
) -> Result<StructuralAggregateTerminal, &'static str> {
let descriptor_shape = self.prepared_descriptor_shape();
let Self {
identity,
filter_expr,
} = self;
let distinct_input = identity.distinct_input();
let (target_slot, input_expr) = identity.into_executor_parts();
let kind = match descriptor_shape.runtime_descriptor() {
PreparedSqlScalarAggregateRuntimeDescriptor::CountRows => {
StructuralAggregateTerminalKind::CountRows
}
PreparedSqlScalarAggregateRuntimeDescriptor::CountField => {
StructuralAggregateTerminalKind::CountValues
}
PreparedSqlScalarAggregateRuntimeDescriptor::NumericField {
kind: AggregateKind::Sum,
} => StructuralAggregateTerminalKind::Sum,
PreparedSqlScalarAggregateRuntimeDescriptor::NumericField {
kind: AggregateKind::Avg,
} => StructuralAggregateTerminalKind::Avg,
PreparedSqlScalarAggregateRuntimeDescriptor::ExtremalWinnerField {
kind: AggregateKind::Min,
} => StructuralAggregateTerminalKind::Min,
PreparedSqlScalarAggregateRuntimeDescriptor::ExtremalWinnerField {
kind: AggregateKind::Max,
} => StructuralAggregateTerminalKind::Max,
PreparedSqlScalarAggregateRuntimeDescriptor::NumericField { .. }
| PreparedSqlScalarAggregateRuntimeDescriptor::ExtremalWinnerField { .. } => {
return Err("prepared SQL scalar aggregate strategy drifted outside SQL support");
}
};
Ok(StructuralAggregateTerminal::new(
kind,
target_slot,
input_expr,
filter_expr,
distinct_input,
))
}
#[must_use]
pub(crate) fn projected_field(&self) -> Option<&str> {
self.target_slot().map(FieldSlot::field)
}
}