use crate::{
db::{
data::DataKey,
direction::Direction,
executor::{
aggregate::contracts::{
error::GroupError,
grouped::ExecutionContext,
spec::{AggregateKind, ScalarAggregateOutput},
},
group::{CanonicalKey, GroupKey, GroupKeySet, KeyCanonicalError},
},
},
error::InternalError,
types::Decimal,
value::{StorageKey, Value},
};
#[derive(Clone, Copy, Debug)]
pub(in crate::db::executor) enum FoldControl {
Continue,
Break,
}
type ScalarTerminalUpdateDispatch =
fn(&mut ScalarTerminalAggregateState, Option<StorageKey>) -> Result<FoldControl, InternalError>;
type GroupedTerminalUpdateDispatch = fn(
&mut GroupedTerminalAggregateState,
Option<StorageKey>,
) -> Result<FoldControl, InternalError>;
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
enum AggregateReducerClass {
Count,
SumLike,
Exists,
Min,
Max,
First,
Last,
}
impl AggregateKind {
const fn reducer_class(self) -> AggregateReducerClass {
match self {
Self::Count => AggregateReducerClass::Count,
Self::Sum | Self::Avg => AggregateReducerClass::SumLike,
Self::Exists => AggregateReducerClass::Exists,
Self::Min => AggregateReducerClass::Min,
Self::Max => AggregateReducerClass::Max,
Self::First => AggregateReducerClass::First,
Self::Last => AggregateReducerClass::Last,
}
}
}
pub(in crate::db::executor) enum ScalarAggregateReducerState {
Count(u32),
Sum(Option<Decimal>),
Exists(bool),
Min(Option<StorageKey>),
Max(Option<StorageKey>),
First(Option<StorageKey>),
Last(Option<StorageKey>),
}
impl ScalarAggregateReducerState {
fn state_mismatch(kind: &'static str) -> InternalError {
InternalError::query_executor_invariant(format!("aggregate reducer {kind} state mismatch"))
}
#[must_use]
pub(in crate::db::executor) const fn for_kind(kind: AggregateKind) -> Self {
match kind.reducer_class() {
AggregateReducerClass::Count => Self::Count(0),
AggregateReducerClass::SumLike => Self::Sum(None),
AggregateReducerClass::Exists => Self::Exists(false),
AggregateReducerClass::Min => Self::Min(None),
AggregateReducerClass::Max => Self::Max(None),
AggregateReducerClass::First => Self::First(None),
AggregateReducerClass::Last => Self::Last(None),
}
}
fn increment_count(&mut self) -> Result<(), InternalError> {
match self {
Self::Count(count) => {
*count = count.saturating_add(1);
Ok(())
}
_ => Err(Self::state_mismatch("COUNT")),
}
}
fn set_exists_true(&mut self) -> Result<(), InternalError> {
match self {
Self::Exists(exists) => {
*exists = true;
Ok(())
}
_ => Err(Self::state_mismatch("EXISTS")),
}
}
fn update_min_value(&mut self, key: StorageKey) -> Result<(), InternalError> {
match self {
Self::Min(min_key) => {
let replace = match min_key.as_ref() {
Some(current) => key < *current,
None => true,
};
if replace {
*min_key = Some(key);
}
Ok(())
}
_ => Err(Self::state_mismatch("MIN")),
}
}
fn update_max_value(&mut self, key: StorageKey) -> Result<(), InternalError> {
match self {
Self::Max(max_key) => {
let replace = match max_key.as_ref() {
Some(current) => key > *current,
None => true,
};
if replace {
*max_key = Some(key);
}
Ok(())
}
_ => Err(Self::state_mismatch("MAX")),
}
}
fn set_first(&mut self, key: StorageKey) -> Result<(), InternalError> {
match self {
Self::First(first_key) => {
*first_key = Some(key);
Ok(())
}
_ => Err(Self::state_mismatch("FIRST")),
}
}
fn set_last(&mut self, key: StorageKey) -> Result<(), InternalError> {
match self {
Self::Last(last_key) => {
*last_key = Some(key);
Ok(())
}
_ => Err(Self::state_mismatch("LAST")),
}
}
#[must_use]
pub(in crate::db::executor) const fn into_output(self) -> ScalarAggregateOutput {
match self {
Self::Count(value) => ScalarAggregateOutput::Count(value),
Self::Sum(value) => ScalarAggregateOutput::Sum(value),
Self::Exists(value) => ScalarAggregateOutput::Exists(value),
Self::Min(value) => ScalarAggregateOutput::Min(value),
Self::Max(value) => ScalarAggregateOutput::Max(value),
Self::First(value) => ScalarAggregateOutput::First(value),
Self::Last(value) => ScalarAggregateOutput::Last(value),
}
}
}
enum GroupedAggregateReducerState {
Count(u32),
Sum(Option<Decimal>),
Exists(bool),
Min(Option<StorageKey>),
Max(Option<StorageKey>),
First(Option<StorageKey>),
Last(Option<StorageKey>),
}
impl GroupedAggregateReducerState {
fn state_mismatch(kind: &'static str) -> InternalError {
InternalError::query_executor_invariant(format!(
"grouped aggregate reducer {kind} state mismatch"
))
}
#[must_use]
const fn for_kind(kind: AggregateKind) -> Self {
match kind.reducer_class() {
AggregateReducerClass::Count => Self::Count(0),
AggregateReducerClass::SumLike => Self::Sum(None),
AggregateReducerClass::Exists => Self::Exists(false),
AggregateReducerClass::Min => Self::Min(None),
AggregateReducerClass::Max => Self::Max(None),
AggregateReducerClass::First => Self::First(None),
AggregateReducerClass::Last => Self::Last(None),
}
}
fn increment_count(&mut self) -> Result<(), InternalError> {
match self {
Self::Count(count) => {
*count = count.saturating_add(1);
Ok(())
}
_ => Err(Self::state_mismatch("COUNT")),
}
}
fn set_exists_true(&mut self) -> Result<(), InternalError> {
match self {
Self::Exists(exists) => {
*exists = true;
Ok(())
}
_ => Err(Self::state_mismatch("EXISTS")),
}
}
fn update_min_value(&mut self, key: StorageKey) -> Result<(), InternalError> {
match self {
Self::Min(min_key) => {
let replace = match min_key.as_ref() {
Some(current) => key < *current,
None => true,
};
if replace {
*min_key = Some(key);
}
Ok(())
}
_ => Err(Self::state_mismatch("MIN")),
}
}
fn update_max_value(&mut self, key: StorageKey) -> Result<(), InternalError> {
match self {
Self::Max(max_key) => {
let replace = match max_key.as_ref() {
Some(current) => key > *current,
None => true,
};
if replace {
*max_key = Some(key);
}
Ok(())
}
_ => Err(Self::state_mismatch("MAX")),
}
}
fn set_first(&mut self, key: StorageKey) -> Result<(), InternalError> {
match self {
Self::First(first_key) => {
*first_key = Some(key);
Ok(())
}
_ => Err(Self::state_mismatch("FIRST")),
}
}
fn set_last(&mut self, key: StorageKey) -> Result<(), InternalError> {
match self {
Self::Last(last_key) => {
*last_key = Some(key);
Ok(())
}
_ => Err(Self::state_mismatch("LAST")),
}
}
#[must_use]
fn into_value(self) -> Value {
match self {
Self::Count(value) => Value::Uint(u64::from(value)),
Self::Sum(value) => value.map_or(Value::Null, Value::Decimal),
Self::Exists(value) => Value::Bool(value),
Self::Min(value) | Self::Max(value) | Self::First(value) | Self::Last(value) => {
value.map_or(Value::Null, |key| key.as_value())
}
}
}
}
pub(in crate::db::executor) trait ScalarAggregateState {
fn apply(&mut self, key: &DataKey) -> Result<FoldControl, InternalError>;
fn finalize(self) -> ScalarAggregateOutput;
}
pub(in crate::db::executor) struct ScalarTerminalAggregateState {
direction: Direction,
distinct: bool,
distinct_keys: Option<GroupKeySet>,
requires_storage_key: bool,
terminal_update_dispatch: ScalarTerminalUpdateDispatch,
reducer: ScalarAggregateReducerState,
}
impl ScalarAggregateState for ScalarTerminalAggregateState {
fn apply(&mut self, key: &DataKey) -> Result<FoldControl, InternalError> {
if self.distinct && !record_distinct_key(self.distinct_keys.as_mut(), key)? {
return Ok(FoldControl::Continue);
}
self.apply_terminal_update(key)
}
fn finalize(self) -> ScalarAggregateOutput {
self.reducer.into_output()
}
}
pub(in crate::db::executor) struct GroupedTerminalAggregateState {
direction: Direction,
distinct: bool,
max_distinct_values_per_group: u64,
distinct_keys: Option<GroupKeySet>,
requires_storage_key: bool,
terminal_update_dispatch: GroupedTerminalUpdateDispatch,
reducer: GroupedAggregateReducerState,
}
impl GroupedTerminalAggregateState {
fn field_target_execution_required(kind: &'static str) -> InternalError {
InternalError::query_executor_invariant(format!(
"grouped aggregate reducer {kind} requires field-target execution path"
))
}
fn storage_key_required(kind: &'static str) -> InternalError {
InternalError::query_executor_invariant(format!(
"grouped aggregate reducer {kind} update requires storage key"
))
}
pub(in crate::db::executor) fn apply(
&mut self,
key: &DataKey,
execution_context: &mut ExecutionContext,
) -> Result<FoldControl, GroupError> {
if self.distinct
&& !record_grouped_distinct_key(
self.distinct_keys.as_mut(),
key,
execution_context,
self.max_distinct_values_per_group,
)?
{
return Ok(FoldControl::Continue);
}
self.apply_terminal_update(key).map_err(GroupError::from)
}
#[must_use]
pub(in crate::db::executor) fn finalize(self) -> Value {
self.reducer.into_value()
}
const fn terminal_update_dispatch_for_kind(
kind: AggregateKind,
) -> GroupedTerminalUpdateDispatch {
match kind.reducer_class() {
AggregateReducerClass::Count => Self::apply_count,
AggregateReducerClass::SumLike => Self::apply_sum_like_unsupported,
AggregateReducerClass::Exists => Self::apply_exists,
AggregateReducerClass::Min => Self::apply_min,
AggregateReducerClass::Max => Self::apply_max,
AggregateReducerClass::First => Self::apply_first,
AggregateReducerClass::Last => Self::apply_last,
}
}
fn apply_terminal_update(&mut self, key: &DataKey) -> Result<FoldControl, InternalError> {
let storage_key = self.requires_storage_key.then_some(key.storage_key());
(self.terminal_update_dispatch)(self, storage_key)
}
fn apply_count(&mut self, _key: Option<StorageKey>) -> Result<FoldControl, InternalError> {
self.reducer.increment_count()?;
Ok(FoldControl::Continue)
}
fn apply_exists(&mut self, _key: Option<StorageKey>) -> Result<FoldControl, InternalError> {
self.reducer.set_exists_true()?;
Ok(FoldControl::Break)
}
fn apply_sum_like_unsupported(
_state: &mut Self,
_key: Option<StorageKey>,
) -> Result<FoldControl, InternalError> {
Err(Self::field_target_execution_required("SUM/AVG"))
}
fn apply_max(&mut self, key: Option<StorageKey>) -> Result<FoldControl, InternalError> {
let Some(key) = key else {
return Err(Self::storage_key_required("MAX"));
};
self.reducer.update_max_value(key)?;
Ok(if self.direction == Direction::Desc {
FoldControl::Break
} else {
FoldControl::Continue
})
}
fn apply_first(&mut self, key: Option<StorageKey>) -> Result<FoldControl, InternalError> {
let Some(key) = key else {
return Err(Self::storage_key_required("FIRST"));
};
self.reducer.set_first(key)?;
Ok(FoldControl::Break)
}
fn apply_last(&mut self, key: Option<StorageKey>) -> Result<FoldControl, InternalError> {
let Some(key) = key else {
return Err(Self::storage_key_required("LAST"));
};
self.reducer.set_last(key)?;
Ok(FoldControl::Continue)
}
fn apply_min(&mut self, key: Option<StorageKey>) -> Result<FoldControl, InternalError> {
let Some(key) = key else {
return Err(Self::storage_key_required("MIN"));
};
self.reducer.update_min_value(key)?;
Ok(if self.direction == Direction::Asc {
FoldControl::Break
} else {
FoldControl::Continue
})
}
}
pub(in crate::db::executor) struct AggregateStateFactory;
impl AggregateStateFactory {
#[must_use]
pub(in crate::db::executor) const fn create_scalar_terminal(
kind: AggregateKind,
direction: Direction,
distinct: bool,
) -> ScalarTerminalAggregateState {
ScalarTerminalAggregateState {
direction,
distinct,
distinct_keys: if distinct {
Some(GroupKeySet::new())
} else {
None
},
requires_storage_key: kind.requires_decoded_id(),
terminal_update_dispatch:
ScalarTerminalAggregateState::terminal_update_dispatch_for_kind(kind),
reducer: ScalarAggregateReducerState::for_kind(kind),
}
}
#[must_use]
pub(in crate::db::executor) const fn create_grouped_terminal(
kind: AggregateKind,
direction: Direction,
distinct: bool,
max_distinct_values_per_group: u64,
) -> GroupedTerminalAggregateState {
GroupedTerminalAggregateState {
direction,
distinct,
max_distinct_values_per_group,
distinct_keys: if distinct {
Some(GroupKeySet::new())
} else {
None
},
requires_storage_key: kind.requires_decoded_id(),
terminal_update_dispatch:
GroupedTerminalAggregateState::terminal_update_dispatch_for_kind(kind),
reducer: GroupedAggregateReducerState::for_kind(kind),
}
}
}
impl ScalarTerminalAggregateState {
fn field_target_execution_required(kind: &'static str) -> InternalError {
InternalError::query_executor_invariant(format!(
"aggregate reducer {kind} requires field-target execution path"
))
}
fn storage_key_required(kind: &'static str) -> InternalError {
InternalError::query_executor_invariant(format!(
"aggregate reducer {kind} update requires storage key"
))
}
const fn terminal_update_dispatch_for_kind(
kind: AggregateKind,
) -> ScalarTerminalUpdateDispatch {
match kind.reducer_class() {
AggregateReducerClass::Count => Self::apply_count,
AggregateReducerClass::SumLike => Self::apply_sum_like_unsupported,
AggregateReducerClass::Exists => Self::apply_exists,
AggregateReducerClass::Min => Self::apply_min,
AggregateReducerClass::Max => Self::apply_max,
AggregateReducerClass::First => Self::apply_first,
AggregateReducerClass::Last => Self::apply_last,
}
}
fn apply_terminal_update(&mut self, key: &DataKey) -> Result<FoldControl, InternalError> {
let storage_key = self.requires_storage_key.then_some(key.storage_key());
(self.terminal_update_dispatch)(self, storage_key)
}
fn apply_count(&mut self, _key: Option<StorageKey>) -> Result<FoldControl, InternalError> {
self.reducer.increment_count()?;
Ok(FoldControl::Continue)
}
fn apply_exists(&mut self, _key: Option<StorageKey>) -> Result<FoldControl, InternalError> {
self.reducer.set_exists_true()?;
Ok(FoldControl::Break)
}
fn apply_sum_like_unsupported(
_state: &mut Self,
_key: Option<StorageKey>,
) -> Result<FoldControl, InternalError> {
Err(Self::field_target_execution_required("SUM/AVG"))
}
fn apply_max(&mut self, key: Option<StorageKey>) -> Result<FoldControl, InternalError> {
let Some(key) = key else {
return Err(Self::storage_key_required("MAX"));
};
self.reducer.update_max_value(key)?;
Ok(if self.direction == Direction::Desc {
FoldControl::Break
} else {
FoldControl::Continue
})
}
fn apply_first(&mut self, key: Option<StorageKey>) -> Result<FoldControl, InternalError> {
let Some(key) = key else {
return Err(Self::storage_key_required("FIRST"));
};
self.reducer.set_first(key)?;
Ok(FoldControl::Break)
}
fn apply_last(&mut self, key: Option<StorageKey>) -> Result<FoldControl, InternalError> {
let Some(key) = key else {
return Err(Self::storage_key_required("LAST"));
};
self.reducer.set_last(key)?;
Ok(FoldControl::Continue)
}
fn apply_min(&mut self, key: Option<StorageKey>) -> Result<FoldControl, InternalError> {
let Some(key) = key else {
return Err(Self::storage_key_required("MIN"));
};
self.reducer.update_min_value(key)?;
Ok(if self.direction == Direction::Asc {
FoldControl::Break
} else {
FoldControl::Continue
})
}
}
fn record_distinct_key(
distinct_keys: Option<&mut GroupKeySet>,
key: &DataKey,
) -> Result<bool, InternalError> {
let Some(distinct_keys) = distinct_keys else {
return Ok(true);
};
let canonical_key = canonical_key_from_data_key(key)?;
Ok(distinct_keys.insert_key(canonical_key))
}
fn record_grouped_distinct_key(
distinct_keys: Option<&mut GroupKeySet>,
key: &DataKey,
execution_context: &mut ExecutionContext,
max_distinct_values_per_group: u64,
) -> Result<bool, GroupError> {
let Some(distinct_keys) = distinct_keys else {
return Ok(true);
};
let canonical_key = canonical_key_from_data_key(key).map_err(GroupError::from)?;
execution_context.admit_distinct_key(
distinct_keys,
max_distinct_values_per_group,
canonical_key,
)
}
fn canonical_key_from_data_key(key: &DataKey) -> Result<GroupKey, InternalError> {
key.storage_key()
.as_value()
.canonical_key()
.map_err(KeyCanonicalError::into_internal_error)
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(in crate::db::executor) enum AggregateFoldMode {
ExistingRows,
KeysOnly,
}