use crate::{
db::{
access::AccessPathKind,
direction::Direction,
executor::{
EntityAuthority, ExecutionPlan, ExecutionPreparation, LoweredIndexPrefixSpec,
LoweredIndexRangeSpec, StoreResolver,
aggregate::{
AggregateKind, field::FieldSlot, projection::ScalarProjectionBoundaryRequest,
},
pipeline::contracts::GroupedRouteStage,
traversal::row_read_consistency_for_plan,
},
index::IndexPredicateProgram,
predicate::MissingRowPolicy,
query::{
builder::AggregateExpr,
plan::{
AccessPlannedQuery, CoveringProjectionContext, ExecutionOrderContract,
OrderDirection, OrderSpec, PageSpec,
},
},
registry::StoreHandle,
},
error::InternalError,
value::Value,
};
pub(in crate::db::executor) struct AggregateFastPathInputs<'exec> {
pub(in crate::db::executor) logical_plan: &'exec AccessPlannedQuery,
pub(in crate::db::executor) authority: EntityAuthority,
pub(in crate::db::executor) store: StoreHandle,
pub(in crate::db::executor) route_plan: &'exec ExecutionPlan,
pub(in crate::db::executor) index_prefix_specs: &'exec [LoweredIndexPrefixSpec],
pub(in crate::db::executor) index_range_specs: &'exec [LoweredIndexRangeSpec],
pub(in crate::db::executor) index_predicate_program: Option<&'exec IndexPredicateProgram>,
pub(in crate::db::executor) direction: Direction,
pub(in crate::db::executor) physical_fetch_hint: Option<usize>,
pub(in crate::db::executor) kind: super::AggregateKind,
pub(in crate::db::executor) fold_mode: super::AggregateFoldMode,
}
impl AggregateFastPathInputs<'_> {
#[must_use]
pub(in crate::db::executor) const fn consistency(&self) -> MissingRowPolicy {
row_read_consistency_for_plan(self.logical_plan)
}
}
#[derive(Clone)]
pub(in crate::db::executor) struct AggregateExecutionDescriptor {
pub(in crate::db::executor) aggregate: AggregateExpr,
pub(in crate::db::executor) direction: Direction,
pub(in crate::db::executor) route_plan: ExecutionPlan,
pub(in crate::db::executor) execution_preparation: ExecutionPreparation,
}
pub(in crate::db::executor) struct PreparedAggregateExecutionState<'ctx> {
pub(in crate::db::executor) descriptor: AggregateExecutionDescriptor,
pub(in crate::db::executor) prepared: PreparedAggregateStreamingInputs<'ctx>,
}
pub(in crate::db::executor) struct PreparedAggregateStreamingInputs<'ctx> {
pub(in crate::db::executor) store_resolver: StoreResolver<'ctx>,
pub(in crate::db::executor) authority: EntityAuthority,
pub(in crate::db::executor) store: StoreHandle,
pub(in crate::db::executor) logical_plan: AccessPlannedQuery,
pub(in crate::db::executor) index_prefix_specs: Vec<LoweredIndexPrefixSpec>,
pub(in crate::db::executor) index_range_specs: Vec<LoweredIndexRangeSpec>,
}
pub(in crate::db::executor) struct PreparedAggregateStreamingInputsCore {
pub(in crate::db::executor) authority: EntityAuthority,
pub(in crate::db::executor) store: StoreHandle,
pub(in crate::db::executor) logical_plan: AccessPlannedQuery,
pub(in crate::db::executor) index_prefix_specs: Vec<LoweredIndexPrefixSpec>,
pub(in crate::db::executor) index_range_specs: Vec<LoweredIndexRangeSpec>,
}
impl PreparedAggregateStreamingInputs<'_> {
#[must_use]
pub(in crate::db::executor) fn window_is_provably_empty(&self) -> bool {
self.page_spec().is_some_and(|page| page.limit == Some(0))
|| self
.logical_plan
.access
.resolve_strategy()
.as_path()
.is_some_and(|path| path.capabilities().is_by_keys_empty())
}
#[must_use]
pub(in crate::db::executor) const fn order_spec(&self) -> Option<&OrderSpec> {
self.logical_plan.scalar_plan().order.as_ref()
}
#[must_use]
pub(in crate::db::executor) const fn page_spec(&self) -> Option<&PageSpec> {
self.logical_plan.scalar_plan().page.as_ref()
}
#[must_use]
pub(in crate::db::executor) fn has_predicate(&self) -> bool {
self.logical_plan.has_residual_predicate()
}
#[must_use]
pub(in crate::db::executor) const fn is_distinct(&self) -> bool {
self.logical_plan.scalar_plan().distinct
}
#[must_use]
pub(in crate::db::executor) fn has_no_predicate_or_distinct(&self) -> bool {
!self.has_predicate() && !self.is_distinct()
}
#[must_use]
pub(in crate::db::executor) fn explicit_primary_key_order_direction(
&self,
primary_key_name: &'static str,
) -> Option<Direction> {
let order = self.order_spec()?;
order
.primary_key_only_direction(primary_key_name)
.map(|direction| match direction {
OrderDirection::Asc => Direction::Asc,
OrderDirection::Desc => Direction::Desc,
})
}
#[must_use]
pub(in crate::db::executor) const fn consistency(&self) -> MissingRowPolicy {
row_read_consistency_for_plan(&self.logical_plan)
}
#[must_use]
pub(in crate::db::executor) fn into_core(self) -> PreparedAggregateStreamingInputsCore {
PreparedAggregateStreamingInputsCore {
authority: self.authority,
store: self.store,
logical_plan: self.logical_plan,
index_prefix_specs: self.index_prefix_specs,
index_range_specs: self.index_range_specs,
}
}
}
impl PreparedAggregateStreamingInputsCore {
#[must_use]
pub(in crate::db::executor) const fn order_spec(&self) -> Option<&OrderSpec> {
self.logical_plan.scalar_plan().order.as_ref()
}
#[must_use]
pub(in crate::db::executor) const fn consistency(&self) -> MissingRowPolicy {
row_read_consistency_for_plan(&self.logical_plan)
}
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(in crate::db::executor) enum PreparedScalarNumericOp {
Sum,
Avg,
}
impl PreparedScalarNumericOp {
#[must_use]
pub(in crate::db::executor) const fn aggregate_kind(self) -> AggregateKind {
match self {
Self::Sum => AggregateKind::Sum,
Self::Avg => AggregateKind::Avg,
}
}
#[must_use]
pub(in crate::db::executor) const fn aggregate_name(self) -> &'static str {
match self {
Self::Sum => "SUM",
Self::Avg => "AVG",
}
}
pub(in crate::db::executor) fn avg_divisor_conversion_invariant(self) -> InternalError {
let message = match self {
Self::Avg => "numeric field AVG divisor conversion overflowed decimal bounds",
Self::Sum => "AVG divisor conversion invariant is only valid for AVG numeric ops",
};
InternalError::query_executor_invariant(message)
}
pub(in crate::db::executor) fn grouped_distinct_output_type_mismatch(
self,
value: &Value,
) -> InternalError {
InternalError::query_executor_invariant(format!(
"global {}(DISTINCT field) grouped output type mismatch: {value:?}",
self.aggregate_name(),
))
}
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(in crate::db::executor) enum PreparedScalarNumericStrategy {
Streaming,
Materialized,
GlobalDistinctGrouped,
}
#[derive(Clone, Debug)]
pub(in crate::db::executor) struct PreparedScalarNumericBoundary {
pub(in crate::db::executor) target_field_name: String,
pub(in crate::db::executor) field_slot: FieldSlot,
pub(in crate::db::executor) op: PreparedScalarNumericOp,
pub(in crate::db::executor) strategy: PreparedScalarNumericStrategy,
}
impl PreparedScalarNumericBoundary {
pub(in crate::db::executor) fn direct_execution_global_distinct_required(
&self,
) -> InternalError {
InternalError::query_executor_invariant(format!(
"numeric aggregate direct execution reached {} strategy",
self.strategy.kind_label(),
))
}
}
pub(in crate::db::executor) enum PreparedScalarNumericExecutionState<'ctx> {
Aggregate {
boundary: PreparedScalarNumericBoundary,
prepared: Box<PreparedAggregateStreamingInputs<'ctx>>,
},
GlobalDistinct {
boundary: PreparedScalarNumericBoundary,
route: Box<GroupedRouteStage>,
},
}
impl PreparedScalarNumericStrategy {
const fn kind_label(self) -> &'static str {
match self {
Self::Streaming => "streaming",
Self::Materialized => "materialized",
Self::GlobalDistinctGrouped => "global DISTINCT grouped",
}
}
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(in crate::db::executor) enum PreparedCoveringDistinctStrategy {
Adjacent,
PreserveFirst,
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(in crate::db::executor) enum PreparedScalarProjectionOp {
Values,
DistinctValues,
CountNonNull,
CountDistinct,
ValuesWithIds,
TerminalValue { terminal_kind: AggregateKind },
}
impl PreparedScalarProjectionOp {
pub(in crate::db::executor) fn covering_distinct_strategy_required(self) -> InternalError {
let message = match self {
Self::DistinctValues => {
"covering DISTINCT projection requires prepared distinct strategy"
}
Self::CountDistinct => {
"covering COUNT DISTINCT projection requires prepared distinct strategy"
}
Self::Values
| Self::CountNonNull
| Self::ValuesWithIds
| Self::TerminalValue { .. } => {
"covering DISTINCT strategy requirement is only valid for DISTINCT projection ops"
}
};
InternalError::query_executor_invariant(message)
}
pub(in crate::db::executor) fn constant_covering_strategy_unsupported(self) -> InternalError {
let message = match self {
Self::ValuesWithIds => {
"values-with-ids projection cannot execute constant covering strategy"
}
Self::Values
| Self::DistinctValues
| Self::CountNonNull
| Self::CountDistinct
| Self::TerminalValue { .. } => {
"constant covering projection rejection is only valid for values-with-ids"
}
};
InternalError::query_executor_invariant(message)
}
pub(in crate::db::executor) fn materialized_branch_unreachable(self) -> InternalError {
let message = match self {
Self::TerminalValue { .. } => {
"terminal value projection materialized branch must execute before row materialization"
}
Self::Values
| Self::DistinctValues
| Self::CountNonNull
| Self::CountDistinct
| Self::ValuesWithIds => {
"materialized branch terminal-value invariant is only valid for terminal-value projection ops"
}
};
InternalError::query_executor_invariant(message)
}
pub(in crate::db::executor) fn validate_terminal_value_kind(self) -> Result<(), InternalError> {
match self {
Self::TerminalValue { terminal_kind }
if !matches!(terminal_kind, AggregateKind::First | AggregateKind::Last) =>
{
Err(InternalError::query_executor_invariant(
"terminal value projection requires FIRST/LAST aggregate kind",
))
}
Self::Values
| Self::DistinctValues
| Self::CountNonNull
| Self::CountDistinct
| Self::ValuesWithIds
| Self::TerminalValue { .. } => Ok(()),
}
}
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(in crate::db::executor) struct ScalarProjectionWindow {
pub(in crate::db::executor) offset: usize,
pub(in crate::db::executor) limit: Option<usize>,
}
#[derive(Clone, Debug)]
pub(in crate::db::executor) enum PreparedScalarProjectionStrategy {
Materialized,
StreamingCountNonNull {
direction: Direction,
},
CoveringIndex {
context: CoveringProjectionContext,
window: ScalarProjectionWindow,
distinct: Option<PreparedCoveringDistinctStrategy>,
},
CoveringConstant {
value: Value,
},
}
#[derive(Clone, Debug)]
pub(in crate::db::executor) struct PreparedScalarProjectionBoundary {
pub(in crate::db::executor) target_field_name: String,
pub(in crate::db::executor) field_slot: FieldSlot,
pub(in crate::db::executor) op: PreparedScalarProjectionOp,
pub(in crate::db::executor) strategy: PreparedScalarProjectionStrategy,
}
pub(in crate::db::executor) struct PreparedScalarProjectionExecutionState<'ctx> {
pub(in crate::db::executor) boundary: PreparedScalarProjectionBoundary,
pub(in crate::db::executor) prepared: PreparedAggregateStreamingInputs<'ctx>,
}
impl PreparedAggregateStreamingInputs<'_> {
#[must_use]
pub(in crate::db::executor) fn supports_streaming_existing_row_field_fold(&self) -> bool {
if !self.has_no_predicate_or_distinct() {
return false;
}
let access_strategy = self.logical_plan.access.resolve_strategy();
let Some(path) = access_strategy.as_path() else {
return false;
};
let path_kind = path.capabilities().kind();
if !Self::streaming_existing_row_field_path_safe(path_kind) {
return false;
}
self.streaming_existing_row_field_page_window_safe(path_kind)
}
#[must_use]
pub(in crate::db::executor) fn streaming_existing_row_field_direction(&self) -> Direction {
ExecutionOrderContract::from_plan(false, self.order_spec()).primary_scan_direction()
}
#[must_use]
const fn streaming_existing_row_field_path_safe(path_kind: AccessPathKind) -> bool {
path_kind.supports_streaming_numeric_fold()
}
#[must_use]
fn streaming_existing_row_field_page_window_safe(&self, path_kind: AccessPathKind) -> bool {
if self.page_spec().is_none() {
return true;
}
let Some(_order) = self.order_spec() else {
return false;
};
if self
.explicit_primary_key_order_direction(self.authority.model().primary_key.name)
.is_none()
{
return false;
}
path_kind.supports_streaming_numeric_fold_for_paged_primary_key_window()
}
}
#[derive(Clone, Debug)]
pub(in crate::db::executor) enum PreparedScalarTerminalOp {
Count,
Exists,
IdTerminal {
kind: AggregateKind,
},
IdBySlot {
kind: AggregateKind,
target_field_name: String,
},
}
impl PreparedScalarTerminalOp {
#[must_use]
pub(in crate::db::executor) const fn aggregate_kind(&self) -> AggregateKind {
match self {
Self::Count => AggregateKind::Count,
Self::Exists => AggregateKind::Exists,
Self::IdTerminal { kind } | Self::IdBySlot { kind, .. } => *kind,
}
}
pub(in crate::db::executor) fn validate_kernel_request_kind(
&self,
) -> Result<(), InternalError> {
match self {
Self::Count
| Self::Exists
| Self::IdTerminal {
kind:
AggregateKind::Min | AggregateKind::Max | AggregateKind::First | AggregateKind::Last,
}
| Self::IdBySlot {
kind: AggregateKind::Min | AggregateKind::Max,
..
} => Ok(()),
Self::IdTerminal { .. } => Err(InternalError::query_executor_invariant(
"id terminal aggregate request requires MIN/MAX/FIRST/LAST kind",
)),
Self::IdBySlot { .. } => Err(InternalError::query_executor_invariant(
"id-by-slot aggregate request requires MIN/MAX kind",
)),
}
}
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(in crate::db::executor) enum PreparedScalarTerminalStrategy {
KernelAggregate,
CountPrimaryKeyCardinality,
CountExistingRows {
direction: Direction,
covering: bool,
},
ExistsExistingRows {
direction: Direction,
},
}
#[derive(Clone, Debug)]
pub(in crate::db::executor) struct PreparedScalarTerminalBoundary {
pub(in crate::db::executor) op: PreparedScalarTerminalOp,
pub(in crate::db::executor) strategy: PreparedScalarTerminalStrategy,
}
pub(in crate::db::executor) struct PreparedScalarTerminalExecutionState<'ctx> {
pub(in crate::db::executor) boundary: PreparedScalarTerminalBoundary,
pub(in crate::db::executor) prepared: PreparedAggregateStreamingInputs<'ctx>,
}
impl PreparedScalarProjectionOp {
pub(in crate::db::executor) const fn from_request(
request: ScalarProjectionBoundaryRequest,
) -> Self {
match request {
ScalarProjectionBoundaryRequest::Values => Self::Values,
ScalarProjectionBoundaryRequest::DistinctValues => Self::DistinctValues,
ScalarProjectionBoundaryRequest::CountNonNull => Self::CountNonNull,
ScalarProjectionBoundaryRequest::CountDistinct => Self::CountDistinct,
ScalarProjectionBoundaryRequest::ValuesWithIds => Self::ValuesWithIds,
ScalarProjectionBoundaryRequest::TerminalValue { terminal_kind } => {
Self::TerminalValue { terminal_kind }
}
}
}
}