use crate::{
db::{
data::DataKey,
direction::Direction,
executor::{
aggregate::{
contracts::{
error::GroupError,
grouped::ExecutionContext,
spec::{AggregateKind, ScalarAggregateOutput, ScalarTerminalKind},
},
field::{
AggregateFieldValueError, FieldSlot as AggregateFieldSlot,
compare_orderable_field_values_with_slot,
},
reducer_core::{ValueReducerState, finalize_count},
},
group::{CanonicalKey, GroupKey, GroupKeySet, KeyCanonicalError},
pipeline::runtime::RowView,
projection::{
ProjectionEvalError, ScalarProjectionExpr,
eval_scalar_projection_expr_with_value_reader,
},
},
numeric::coerce_numeric_decimal,
query::plan::FieldSlot,
query::plan::expr::collapse_true_only_boolean_admission,
},
error::InternalError,
types::Decimal,
value::{StorageKey, Value, storage_key_as_runtime_value},
};
#[derive(Clone, Copy, Debug)]
pub(in crate::db::executor) enum FoldControl {
Continue,
Break,
}
enum AggregateInputValue {
Null,
Value(Value),
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
enum ExtremumKind {
Min,
Max,
}
impl ExtremumKind {
const fn expression_label(self) -> &'static str {
match self {
Self::Min => "MIN(expr)",
Self::Max => "MAX(expr)",
}
}
const fn field_label(self) -> &'static str {
match self {
Self::Min => "MIN(field)",
Self::Max => "MAX(field)",
}
}
const fn storage_key_label(self) -> &'static str {
match self {
Self::Min => "MIN",
Self::Max => "MAX",
}
}
}
impl AggregateKind {
const fn sum_like_input_label(self) -> Option<&'static str> {
match self {
Self::Sum => Some("SUM(input)"),
Self::Avg => Some("AVG(input)"),
Self::Count | Self::Exists | Self::Min | Self::Max | Self::First | Self::Last => None,
}
}
fn apply_sum_like_decimal(
self,
reducer: &mut GroupedAggregateReducerState,
decimal: Decimal,
) -> Result<(), InternalError> {
match self {
Self::Sum => reducer.add_sum_value(decimal),
Self::Avg => reducer.add_average_value(decimal),
Self::Count | Self::Exists | Self::Min | Self::Max | Self::First | Self::Last => Err(
GroupedTerminalAggregateState::field_target_execution_required("SUM/AVG(input)"),
),
}
}
}
pub(in crate::db::executor) enum ScalarAggregateReducerState {
Count(u32),
Exists(bool),
Min(Option<StorageKey>),
Max(Option<StorageKey>),
First(Option<StorageKey>),
Last(Option<StorageKey>),
}
impl ScalarAggregateReducerState {
fn state_mismatch(kind: &'static str) -> InternalError {
InternalError::query_executor_invariant(format!("aggregate reducer {kind} state mismatch"))
}
#[must_use]
pub(in crate::db::executor) const fn for_terminal_kind(kind: ScalarTerminalKind) -> Self {
match kind {
ScalarTerminalKind::Count => Self::Count(0),
ScalarTerminalKind::Exists => Self::Exists(false),
ScalarTerminalKind::Min => Self::Min(None),
ScalarTerminalKind::Max => Self::Max(None),
ScalarTerminalKind::First => Self::First(None),
ScalarTerminalKind::Last => Self::Last(None),
}
}
fn increment_count(&mut self) -> Result<(), InternalError> {
match self {
Self::Count(count) => {
*count = count.saturating_add(1);
Ok(())
}
_ => Err(Self::state_mismatch("COUNT")),
}
}
fn set_exists_true(&mut self) -> Result<(), InternalError> {
match self {
Self::Exists(exists) => {
*exists = true;
Ok(())
}
_ => Err(Self::state_mismatch("EXISTS")),
}
}
fn update_min_value(&mut self, key: StorageKey) -> Result<(), InternalError> {
match self {
Self::Min(min_key) => {
let replace = match min_key.as_ref() {
Some(current) => key < *current,
None => true,
};
if replace {
*min_key = Some(key);
}
Ok(())
}
_ => Err(Self::state_mismatch("MIN")),
}
}
fn update_max_value(&mut self, key: StorageKey) -> Result<(), InternalError> {
match self {
Self::Max(max_key) => {
let replace = match max_key.as_ref() {
Some(current) => key > *current,
None => true,
};
if replace {
*max_key = Some(key);
}
Ok(())
}
_ => Err(Self::state_mismatch("MAX")),
}
}
fn set_first(&mut self, key: StorageKey) -> Result<(), InternalError> {
match self {
Self::First(first_key) => {
*first_key = Some(key);
Ok(())
}
_ => Err(Self::state_mismatch("FIRST")),
}
}
fn set_last(&mut self, key: StorageKey) -> Result<(), InternalError> {
match self {
Self::Last(last_key) => {
*last_key = Some(key);
Ok(())
}
_ => Err(Self::state_mismatch("LAST")),
}
}
#[must_use]
pub(in crate::db::executor) fn into_output(self) -> ScalarAggregateOutput {
match self {
Self::Count(value) => {
let Value::Uint(count) = finalize_count(u64::from(value)) else {
unreachable!("COUNT finalization must produce Uint")
};
ScalarAggregateOutput::Count(u32::try_from(count).unwrap_or(u32::MAX))
}
Self::Exists(value) => ScalarAggregateOutput::Exists(value),
Self::Min(value) => ScalarAggregateOutput::Min(value),
Self::Max(value) => ScalarAggregateOutput::Max(value),
Self::First(value) => ScalarAggregateOutput::First(value),
Self::Last(value) => ScalarAggregateOutput::Last(value),
}
}
}
enum GroupedAggregateReducerState {
Count(u32),
Sum(ValueReducerState),
Avg(ValueReducerState),
Exists(bool),
Min(ValueReducerState),
Max(ValueReducerState),
First(Option<Value>),
Last(Option<Value>),
}
impl GroupedAggregateReducerState {
fn state_mismatch(kind: &'static str) -> InternalError {
InternalError::query_executor_invariant(format!(
"grouped aggregate reducer {kind} state mismatch"
))
}
#[must_use]
const fn for_kind(kind: AggregateKind) -> Self {
match kind {
AggregateKind::Count => Self::Count(0),
AggregateKind::Sum => Self::Sum(ValueReducerState::sum()),
AggregateKind::Avg => Self::Avg(ValueReducerState::avg()),
AggregateKind::Exists => Self::Exists(false),
AggregateKind::Min => Self::Min(ValueReducerState::min()),
AggregateKind::Max => Self::Max(ValueReducerState::max()),
AggregateKind::First => Self::First(None),
AggregateKind::Last => Self::Last(None),
}
}
fn increment_count(&mut self) -> Result<(), InternalError> {
match self {
Self::Count(count) => {
*count = count.saturating_add(1);
Ok(())
}
_ => Err(Self::state_mismatch("COUNT")),
}
}
fn add_sum_value(&mut self, value: Decimal) -> Result<(), InternalError> {
match self {
Self::Sum(reducer) => reducer.ingest_decimal(value),
_ => Err(Self::state_mismatch("SUM")),
}
}
fn add_average_value(&mut self, value: Decimal) -> Result<(), InternalError> {
match self {
Self::Avg(reducer) => reducer.ingest_decimal(value),
_ => Err(Self::state_mismatch("AVG")),
}
}
fn set_exists_true(&mut self) -> Result<(), InternalError> {
match self {
Self::Exists(exists) => {
*exists = true;
Ok(())
}
_ => Err(Self::state_mismatch("EXISTS")),
}
}
fn update_min_value(&mut self, value: Value) -> Result<(), InternalError> {
match self {
Self::Min(reducer) => reducer.ingest_canonical_ordered_owned(value),
_ => Err(Self::state_mismatch("MIN")),
}
}
fn update_max_value(&mut self, value: Value) -> Result<(), InternalError> {
match self {
Self::Max(reducer) => reducer.ingest_canonical_ordered_owned(value),
_ => Err(Self::state_mismatch("MAX")),
}
}
fn ingest_min_value(&mut self, value: Value) -> Result<(), InternalError> {
match self {
Self::Min(reducer) => reducer.ingest_owned(value),
_ => Err(Self::state_mismatch("MIN")),
}
}
fn ingest_max_value(&mut self, value: Value) -> Result<(), InternalError> {
match self {
Self::Max(reducer) => reducer.ingest_owned(value),
_ => Err(Self::state_mismatch("MAX")),
}
}
fn replace_min_value(&mut self, value: Value) -> Result<(), InternalError> {
match self {
Self::Min(reducer) => reducer.replace_selected(value),
_ => Err(Self::state_mismatch("MIN")),
}
}
fn replace_max_value(&mut self, value: Value) -> Result<(), InternalError> {
match self {
Self::Max(reducer) => reducer.replace_selected(value),
_ => Err(Self::state_mismatch("MAX")),
}
}
fn min_value(&self) -> Result<Option<&Value>, InternalError> {
match self {
Self::Min(reducer) => Ok(reducer.selected()),
_ => Err(Self::state_mismatch("MIN")),
}
}
fn max_value(&self) -> Result<Option<&Value>, InternalError> {
match self {
Self::Max(reducer) => Ok(reducer.selected()),
_ => Err(Self::state_mismatch("MAX")),
}
}
fn set_first(&mut self, key: StorageKey) -> Result<(), InternalError> {
match self {
Self::First(first_key) => {
*first_key = Some(storage_key_as_runtime_value(&key));
Ok(())
}
_ => Err(Self::state_mismatch("FIRST")),
}
}
fn set_last(&mut self, key: StorageKey) -> Result<(), InternalError> {
match self {
Self::Last(last_key) => {
*last_key = Some(storage_key_as_runtime_value(&key));
Ok(())
}
_ => Err(Self::state_mismatch("LAST")),
}
}
fn into_value(self) -> Result<Value, InternalError> {
match self {
Self::Count(value) => Ok(finalize_count(u64::from(value))),
Self::Sum(reducer) | Self::Avg(reducer) | Self::Min(reducer) | Self::Max(reducer) => {
reducer.finalize()
}
Self::Exists(value) => Ok(Value::Bool(value)),
Self::First(value) | Self::Last(value) => Ok(value.unwrap_or(Value::Null)),
}
}
}
pub(in crate::db::executor) trait ScalarAggregateState {
fn apply(&mut self, key: &DataKey) -> Result<FoldControl, InternalError>;
fn finalize(self) -> ScalarAggregateOutput;
}
pub(in crate::db::executor) struct ScalarTerminalAggregateState {
kind: ScalarTerminalKind,
direction: Direction,
distinct: bool,
distinct_keys: Option<GroupKeySet>,
requires_storage_key: bool,
reducer: ScalarAggregateReducerState,
}
impl ScalarAggregateState for ScalarTerminalAggregateState {
fn apply(&mut self, key: &DataKey) -> Result<FoldControl, InternalError> {
if self.distinct && !record_distinct_key(self.distinct_keys.as_mut(), key)? {
return Ok(FoldControl::Continue);
}
self.apply_terminal_update(key)
}
fn finalize(self) -> ScalarAggregateOutput {
self.reducer.into_output()
}
}
pub(in crate::db::executor) struct GroupedTerminalAggregateState {
kind: AggregateKind,
direction: Direction,
distinct_mode: GroupedDistinctExecutionMode,
max_distinct_values_per_group: u64,
distinct_keys: Option<GroupKeySet>,
target_field: Option<FieldSlot>,
compiled_input_expr: Option<ScalarProjectionExpr>,
compiled_filter_expr: Option<ScalarProjectionExpr>,
requires_storage_key: bool,
reducer: GroupedAggregateReducerState,
}
impl GroupedTerminalAggregateState {
fn field_target_execution_required(kind: &'static str) -> InternalError {
InternalError::query_executor_invariant(format!(
"grouped aggregate reducer {kind} requires field-target execution path"
))
}
fn storage_key_required(kind: &'static str) -> InternalError {
InternalError::query_executor_invariant(format!(
"grouped aggregate reducer {kind} update requires storage key"
))
}
fn sum_field_requires_numeric_value(field: &str, value: &Value) -> InternalError {
InternalError::query_executor_invariant(format!(
"grouped aggregate reducer SUM(field) requires numeric field '{field}', found value {value:?}"
))
}
fn input_expression_evaluation_failed(err: ProjectionEvalError) -> InternalError {
if let ProjectionEvalError::Numeric(err) = err {
return err.into_internal_error();
}
InternalError::query_invalid_logical_plan(format!(
"grouped aggregate input expression evaluation failed: {err}",
))
}
fn filter_expression_evaluation_failed(err: ProjectionEvalError) -> InternalError {
if let ProjectionEvalError::Numeric(err) = err {
return err.into_internal_error();
}
InternalError::query_invalid_logical_plan(format!(
"grouped aggregate filter expression evaluation failed: {err}",
))
}
fn evaluate_input_value(
&self,
row_view: Option<&RowView>,
) -> Result<Option<Value>, InternalError> {
let Some(row_view) = row_view else {
return Err(Self::field_target_execution_required(
"grouped aggregate input expression",
));
};
if let Some(compiled_input_expr) = self.compiled_input_expr.as_ref() {
let mut slot_error = None;
let value =
eval_scalar_projection_expr_with_value_reader(compiled_input_expr, &mut |slot| {
match row_view.slot_value(slot) {
Ok(value) => value.map(std::borrow::Cow::into_owned),
Err(err) => {
slot_error = Some(err);
None
}
}
});
if let Some(err) = slot_error {
return Err(err);
}
let value = value.map_err(Self::input_expression_evaluation_failed)?;
return Ok(Some(value));
}
let Some(target_field) = self.target_field.as_ref() else {
return Ok(None);
};
Ok(Some(row_view.require_slot_owned(target_field.index())?))
}
fn target_field_value<'a>(
&self,
row_view: Option<&'a RowView>,
label: &'static str,
) -> Result<std::borrow::Cow<'a, Value>, InternalError> {
let Some(target_field) = self.target_field.as_ref() else {
return Err(Self::field_target_execution_required(label));
};
let Some(row_view) = row_view else {
return Err(Self::field_target_execution_required(label));
};
row_view.require_slot_value(target_field.index())
}
fn resolve_input_value(
&self,
row_view: Option<&RowView>,
label: &'static str,
) -> Result<AggregateInputValue, InternalError> {
let value = if self.compiled_input_expr.is_some() {
self.evaluate_input_value(row_view)?
.ok_or_else(|| Self::field_target_execution_required(label))?
} else if self.target_field.is_some() {
self.target_field_value(row_view, label)?.into_owned()
} else {
return Err(Self::field_target_execution_required(label));
};
Ok(if matches!(value, Value::Null) {
AggregateInputValue::Null
} else {
AggregateInputValue::Value(value)
})
}
fn admits_filter_row(&self, row_view: Option<&RowView>) -> Result<bool, InternalError> {
let Some(compiled_filter_expr) = self.compiled_filter_expr.as_ref() else {
return Ok(true);
};
let Some(row_view) = row_view else {
return Err(Self::field_target_execution_required(
"grouped aggregate filter expression",
));
};
let mut slot_error = None;
let value =
eval_scalar_projection_expr_with_value_reader(compiled_filter_expr, &mut |slot| {
match row_view.slot_value(slot) {
Ok(value) => value.map(std::borrow::Cow::into_owned),
Err(err) => {
slot_error = Some(err);
None
}
}
});
if let Some(err) = slot_error {
return Err(err);
}
let value = value.map_err(Self::filter_expression_evaluation_failed)?;
collapse_true_only_boolean_admission(value, |found| {
InternalError::query_invalid_logical_plan(format!(
"grouped aggregate filter expression produced non-boolean value: {:?}",
found.as_ref(),
))
})
}
pub(in crate::db::executor) fn apply_with_row_view(
&mut self,
key: &DataKey,
row_view: Option<&RowView>,
execution_context: &mut ExecutionContext,
) -> Result<FoldControl, GroupError> {
if !self.admits_filter_row(row_view).map_err(GroupError::from)? {
return Ok(FoldControl::Continue);
}
if !self.admit_distinct(key, row_view, execution_context)? {
return Ok(FoldControl::Continue);
}
self.apply_terminal_update(key, row_view)
.map_err(GroupError::from)
}
pub(in crate::db::executor) fn finalize(self) -> Result<Value, InternalError> {
self.reducer.into_value()
}
fn apply_terminal_update(
&mut self,
key: &DataKey,
row_view: Option<&RowView>,
) -> Result<FoldControl, InternalError> {
let storage_key = self.requires_storage_key.then_some(key.storage_key());
match self.kind {
AggregateKind::Count => self.apply_count(storage_key, row_view),
AggregateKind::Sum | AggregateKind::Avg => self.apply_sum_like(storage_key, row_view),
AggregateKind::Exists => self.apply_exists(storage_key, row_view),
AggregateKind::Min => self.apply_extremum(ExtremumKind::Min, storage_key, row_view),
AggregateKind::Max => self.apply_extremum(ExtremumKind::Max, storage_key, row_view),
AggregateKind::First => self.apply_first(storage_key, row_view),
AggregateKind::Last => self.apply_last(storage_key, row_view),
}
}
fn admit_distinct(
&mut self,
key: &DataKey,
row_view: Option<&RowView>,
execution_context: &mut ExecutionContext,
) -> Result<bool, GroupError> {
if !self.distinct_mode.enabled() {
return Ok(true);
}
let uses_value_dedup = self.distinct_mode.uses_value_dedup()
&& (self.compiled_input_expr.is_some() || self.target_field.is_some());
let canonical_key = if uses_value_dedup {
let input_value = self
.resolve_input_value(row_view, "COUNT/SUM/AVG(DISTINCT input)")
.map_err(GroupError::from)?;
let AggregateInputValue::Value(value) = input_value else {
return Ok(false);
};
value
.canonical_key()
.map_err(KeyCanonicalError::into_internal_error)
.map_err(GroupError::from)?
} else {
canonical_key_from_data_key(key).map_err(GroupError::from)?
};
let Some(distinct_keys) = self.distinct_keys.as_mut() else {
return Ok(true);
};
execution_context.admit_distinct_key(
distinct_keys,
self.max_distinct_values_per_group,
canonical_key,
)
}
fn apply_count(
&mut self,
_key: Option<StorageKey>,
row_view: Option<&RowView>,
) -> Result<FoldControl, InternalError> {
if (self.compiled_input_expr.is_some() || self.target_field.is_some())
&& matches!(
self.resolve_input_value(row_view, "COUNT(input)")?,
AggregateInputValue::Null
)
{
return Ok(FoldControl::Continue);
}
self.reducer.increment_count()?;
Ok(FoldControl::Continue)
}
fn apply_exists(
&mut self,
_key: Option<StorageKey>,
_row_view: Option<&RowView>,
) -> Result<FoldControl, InternalError> {
self.reducer.set_exists_true()?;
Ok(FoldControl::Break)
}
fn apply_sum_like(
&mut self,
_key: Option<StorageKey>,
row_view: Option<&RowView>,
) -> Result<FoldControl, InternalError> {
let Some(kind_label) = self.kind.sum_like_input_label() else {
return Err(Self::field_target_execution_required("SUM/AVG(input)"));
};
let AggregateInputValue::Value(value) = self.resolve_input_value(row_view, kind_label)?
else {
return Ok(FoldControl::Continue);
};
let Some(decimal) = coerce_numeric_decimal(&value) else {
return Err(match self.target_field.as_ref() {
Some(target_field) => {
Self::sum_field_requires_numeric_value(target_field.field(), &value)
}
None => InternalError::query_executor_invariant(format!(
"grouped aggregate reducer {kind_label} requires numeric expression input, found value {value:?}",
)),
});
};
self.kind
.apply_sum_like_decimal(&mut self.reducer, decimal)?;
Ok(FoldControl::Continue)
}
fn apply_extremum(
&mut self,
kind: ExtremumKind,
key: Option<StorageKey>,
row_view: Option<&RowView>,
) -> Result<FoldControl, InternalError> {
if self.compiled_input_expr.is_some() {
let AggregateInputValue::Value(value) =
self.resolve_input_value(row_view, kind.expression_label())?
else {
return Ok(FoldControl::Continue);
};
match kind {
ExtremumKind::Min => self.reducer.ingest_min_value(value)?,
ExtremumKind::Max => self.reducer.ingest_max_value(value)?,
}
} else if let Some(target_field) = self.target_field.as_ref() {
let Some(target_kind) = target_field.kind() else {
return Err(Self::field_target_execution_required(kind.field_label()));
};
let AggregateInputValue::Value(value) =
self.resolve_input_value(row_view, kind.field_label())?
else {
return Ok(FoldControl::Continue);
};
let aggregate_field_slot = AggregateFieldSlot {
index: target_field.index(),
kind: target_kind,
};
let current = match kind {
ExtremumKind::Min => self.reducer.min_value()?,
ExtremumKind::Max => self.reducer.max_value()?,
};
let replace = match current {
Some(current) => {
let ordering = compare_orderable_field_values_with_slot(
target_field.field(),
aggregate_field_slot,
&value,
current,
)
.map_err(AggregateFieldValueError::into_internal_error)?;
match kind {
ExtremumKind::Min => ordering.is_lt(),
ExtremumKind::Max => ordering.is_gt(),
}
}
None => true,
};
if replace {
match kind {
ExtremumKind::Min => self.reducer.replace_min_value(value)?,
ExtremumKind::Max => self.reducer.replace_max_value(value)?,
}
}
} else {
let Some(key) = key else {
return Err(Self::storage_key_required(kind.storage_key_label()));
};
let value = storage_key_as_runtime_value(&key);
match kind {
ExtremumKind::Min => self.reducer.update_min_value(value)?,
ExtremumKind::Max => self.reducer.update_max_value(value)?,
}
}
Ok(match (kind, self.direction) {
(ExtremumKind::Min, Direction::Asc) | (ExtremumKind::Max, Direction::Desc) => {
FoldControl::Break
}
_ => FoldControl::Continue,
})
}
fn apply_first(
&mut self,
key: Option<StorageKey>,
_row_view: Option<&RowView>,
) -> Result<FoldControl, InternalError> {
let Some(key) = key else {
return Err(Self::storage_key_required("FIRST"));
};
self.reducer.set_first(key)?;
Ok(FoldControl::Break)
}
fn apply_last(
&mut self,
key: Option<StorageKey>,
_row_view: Option<&RowView>,
) -> Result<FoldControl, InternalError> {
let Some(key) = key else {
return Err(Self::storage_key_required("LAST"));
};
self.reducer.set_last(key)?;
Ok(FoldControl::Continue)
}
}
pub(in crate::db::executor) struct AggregateStateFactory;
impl AggregateStateFactory {
#[must_use]
pub(in crate::db::executor) fn create_scalar_terminal(
kind: ScalarTerminalKind,
direction: Direction,
distinct: bool,
) -> ScalarTerminalAggregateState {
ScalarTerminalAggregateState {
kind,
direction,
distinct,
distinct_keys: if distinct {
Some(GroupKeySet::new())
} else {
None
},
requires_storage_key: kind.aggregate_kind().requires_decoded_id(),
reducer: ScalarAggregateReducerState::for_terminal_kind(kind),
}
}
#[must_use]
pub(in crate::db::executor) fn create_grouped_terminal(
kind: AggregateKind,
direction: Direction,
distinct_mode: GroupedDistinctExecutionMode,
target_field: Option<FieldSlot>,
compiled_input_expr: Option<ScalarProjectionExpr>,
compiled_filter_expr: Option<ScalarProjectionExpr>,
max_distinct_values_per_group: u64,
) -> GroupedTerminalAggregateState {
GroupedTerminalAggregateState {
kind,
direction,
distinct_mode,
max_distinct_values_per_group,
distinct_keys: if distinct_mode.enabled() {
Some(GroupKeySet::new())
} else {
None
},
target_field,
compiled_input_expr,
compiled_filter_expr,
requires_storage_key: kind.requires_decoded_id(),
reducer: GroupedAggregateReducerState::for_kind(kind),
}
}
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(in crate::db::executor) struct GroupedDistinctExecutionMode {
enabled: bool,
uses_value_dedup: bool,
}
impl GroupedDistinctExecutionMode {
#[must_use]
pub(in crate::db::executor) const fn new(enabled: bool, uses_value_dedup: bool) -> Self {
Self {
enabled,
uses_value_dedup,
}
}
#[must_use]
const fn enabled(self) -> bool {
self.enabled
}
#[must_use]
const fn uses_value_dedup(self) -> bool {
self.uses_value_dedup
}
}
impl ScalarTerminalAggregateState {
fn storage_key_required(kind: &'static str) -> InternalError {
InternalError::query_executor_invariant(format!(
"aggregate reducer {kind} update requires storage key"
))
}
fn apply_terminal_update(&mut self, key: &DataKey) -> Result<FoldControl, InternalError> {
let storage_key = self.requires_storage_key.then_some(key.storage_key());
match self.kind {
ScalarTerminalKind::Count => self.apply_count(storage_key),
ScalarTerminalKind::Exists => self.apply_exists(storage_key),
ScalarTerminalKind::Min => self.apply_min(storage_key),
ScalarTerminalKind::Max => self.apply_max(storage_key),
ScalarTerminalKind::First => self.apply_first(storage_key),
ScalarTerminalKind::Last => self.apply_last(storage_key),
}
}
fn apply_count(&mut self, _key: Option<StorageKey>) -> Result<FoldControl, InternalError> {
self.reducer.increment_count()?;
Ok(FoldControl::Continue)
}
fn apply_exists(&mut self, _key: Option<StorageKey>) -> Result<FoldControl, InternalError> {
self.reducer.set_exists_true()?;
Ok(FoldControl::Break)
}
fn apply_max(&mut self, key: Option<StorageKey>) -> Result<FoldControl, InternalError> {
let Some(key) = key else {
return Err(Self::storage_key_required("MAX"));
};
self.reducer.update_max_value(key)?;
Ok(if self.direction == Direction::Desc {
FoldControl::Break
} else {
FoldControl::Continue
})
}
fn apply_first(&mut self, key: Option<StorageKey>) -> Result<FoldControl, InternalError> {
let Some(key) = key else {
return Err(Self::storage_key_required("FIRST"));
};
self.reducer.set_first(key)?;
Ok(FoldControl::Break)
}
fn apply_last(&mut self, key: Option<StorageKey>) -> Result<FoldControl, InternalError> {
let Some(key) = key else {
return Err(Self::storage_key_required("LAST"));
};
self.reducer.set_last(key)?;
Ok(FoldControl::Continue)
}
fn apply_min(&mut self, key: Option<StorageKey>) -> Result<FoldControl, InternalError> {
let Some(key) = key else {
return Err(Self::storage_key_required("MIN"));
};
self.reducer.update_min_value(key)?;
Ok(if self.direction == Direction::Asc {
FoldControl::Break
} else {
FoldControl::Continue
})
}
}
fn record_distinct_key(
distinct_keys: Option<&mut GroupKeySet>,
key: &DataKey,
) -> Result<bool, InternalError> {
let Some(distinct_keys) = distinct_keys else {
return Ok(true);
};
let canonical_key = canonical_key_from_data_key(key)?;
Ok(distinct_keys.insert_key(canonical_key))
}
fn canonical_key_from_data_key(key: &DataKey) -> Result<GroupKey, InternalError> {
storage_key_as_runtime_value(&key.storage_key())
.canonical_key()
.map_err(KeyCanonicalError::into_internal_error)
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(in crate::db::executor) enum AggregateFoldMode {
ExistingRows,
KeysOnly,
}