use crate::{
db::{
direction::Direction,
executor::{
EntityAuthority, ExecutionPlan, ExecutionPreparation, LoweredIndexPrefixSpec,
LoweredIndexRangeSpec, StoreResolver,
aggregate::{
AggregateKind, field::FieldSlot, projection::ScalarProjectionBoundaryRequest,
},
pipeline::contracts::GroupedRouteStage,
route::AggregateRouteShape,
traversal::row_read_consistency_for_plan,
},
index::IndexPredicateProgram,
predicate::MissingRowPolicy,
query::plan::{AccessPlannedQuery, CoveringProjectionContext, OrderSpec, PageSpec},
registry::StoreHandle,
},
error::InternalError,
value::Value,
};
pub(in crate::db::executor) struct AggregateFastPathInputs<'exec> {
pub(in crate::db::executor) logical_plan: &'exec AccessPlannedQuery,
pub(in crate::db::executor) authority: EntityAuthority,
pub(in crate::db::executor) store: StoreHandle,
pub(in crate::db::executor) route_plan: &'exec ExecutionPlan,
pub(in crate::db::executor) index_prefix_specs: &'exec [LoweredIndexPrefixSpec],
pub(in crate::db::executor) index_range_specs: &'exec [LoweredIndexRangeSpec],
pub(in crate::db::executor) index_predicate_program: Option<&'exec IndexPredicateProgram>,
pub(in crate::db::executor) direction: Direction,
pub(in crate::db::executor) physical_fetch_hint: Option<usize>,
pub(in crate::db::executor) kind: super::AggregateKind,
pub(in crate::db::executor) fold_mode: super::AggregateFoldMode,
}
impl AggregateFastPathInputs<'_> {
#[must_use]
pub(in crate::db::executor) const fn consistency(&self) -> MissingRowPolicy {
row_read_consistency_for_plan(self.logical_plan)
}
}
#[derive(Clone, Debug)]
pub(in crate::db::executor) struct PreparedAggregateTargetField {
target_field_name: String,
field_slot: FieldSlot,
target_field_known: bool,
target_field_orderable: bool,
target_field_is_primary_key: bool,
}
impl PreparedAggregateTargetField {
#[expect(
clippy::missing_const_for_fn,
reason = "constructs one owned String-backed descriptor for runtime handoff"
)]
#[must_use]
pub(in crate::db::executor) fn new(
target_field_name: String,
field_slot: FieldSlot,
target_field_known: bool,
target_field_orderable: bool,
target_field_is_primary_key: bool,
) -> Self {
Self {
target_field_name,
field_slot,
target_field_known,
target_field_orderable,
target_field_is_primary_key,
}
}
#[expect(
clippy::missing_const_for_fn,
reason = "String::as_str is kept on the ordinary method boundary for readability"
)]
#[must_use]
pub(in crate::db::executor) fn target_field_name(&self) -> &str {
self.target_field_name.as_str()
}
#[must_use]
pub(in crate::db::executor) const fn field_slot(&self) -> FieldSlot {
self.field_slot
}
}
#[derive(Clone, Debug)]
pub(in crate::db::executor) struct PreparedAggregateSpec {
kind: AggregateKind,
target_field: Option<PreparedAggregateTargetField>,
}
impl PreparedAggregateSpec {
#[must_use]
pub(in crate::db::executor) const fn terminal(kind: AggregateKind) -> Self {
Self {
kind,
target_field: None,
}
}
#[must_use]
pub(in crate::db::executor) const fn field_target(
kind: AggregateKind,
target_field: PreparedAggregateTargetField,
) -> Self {
Self {
kind,
target_field: Some(target_field),
}
}
#[must_use]
pub(in crate::db::executor) const fn kind(&self) -> AggregateKind {
self.kind
}
#[must_use]
pub(in crate::db::executor) const fn target_field(
&self,
) -> Option<&PreparedAggregateTargetField> {
self.target_field.as_ref()
}
#[must_use]
pub(in crate::db::executor) fn route_shape(&self) -> AggregateRouteShape<'_> {
let Some(target_field) = self.target_field.as_ref() else {
return AggregateRouteShape::new_resolved(self.kind, None, true, false, false);
};
AggregateRouteShape::new_resolved(
self.kind,
Some(target_field.target_field_name()),
target_field.target_field_known,
target_field.target_field_orderable,
target_field.target_field_is_primary_key,
)
}
}
#[derive(Clone)]
pub(in crate::db::executor) struct AggregateExecutionDescriptor {
pub(in crate::db::executor) aggregate: PreparedAggregateSpec,
pub(in crate::db::executor) direction: Direction,
pub(in crate::db::executor) route_plan: ExecutionPlan,
}
pub(in crate::db::executor) struct PreparedAggregateExecutionState<'ctx> {
pub(in crate::db::executor) descriptor: AggregateExecutionDescriptor,
pub(in crate::db::executor) prepared: PreparedAggregateStreamingInputs<'ctx>,
}
pub(in crate::db::executor) struct PreparedAggregateStreamingInputs<'ctx> {
pub(in crate::db::executor) store_resolver: StoreResolver<'ctx>,
pub(in crate::db::executor) authority: EntityAuthority,
pub(in crate::db::executor) store: StoreHandle,
pub(in crate::db::executor) logical_plan: AccessPlannedQuery,
pub(in crate::db::executor) execution_preparation: ExecutionPreparation,
pub(in crate::db::executor) index_prefix_specs: Vec<LoweredIndexPrefixSpec>,
pub(in crate::db::executor) index_range_specs: Vec<LoweredIndexRangeSpec>,
}
impl PreparedAggregateStreamingInputs<'_> {
#[must_use]
pub(in crate::db::executor) fn window_is_provably_empty(&self) -> bool {
self.page_spec().is_some_and(|page| page.limit == Some(0))
|| self
.logical_plan
.access
.resolve_strategy()
.as_path()
.is_some_and(|path| path.capabilities().is_by_keys_empty())
}
#[must_use]
pub(in crate::db::executor) const fn order_spec(&self) -> Option<&OrderSpec> {
self.logical_plan.scalar_plan().order.as_ref()
}
#[must_use]
pub(in crate::db::executor) const fn page_spec(&self) -> Option<&PageSpec> {
self.logical_plan.scalar_plan().page.as_ref()
}
#[must_use]
pub(in crate::db::executor) const fn has_predicate(&self) -> bool {
self.logical_plan.scalar_plan().predicate.is_some()
}
#[must_use]
pub(in crate::db::executor) const fn consistency(&self) -> MissingRowPolicy {
row_read_consistency_for_plan(&self.logical_plan)
}
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(in crate::db::executor) enum PreparedScalarNumericOp {
Sum,
Avg,
}
impl PreparedScalarNumericOp {
fn invariant(message: impl Into<String>) -> InternalError {
InternalError::query_executor_invariant(message)
}
#[must_use]
pub(in crate::db::executor) const fn aggregate_kind(self) -> AggregateKind {
match self {
Self::Sum => AggregateKind::Sum,
Self::Avg => AggregateKind::Avg,
}
}
#[must_use]
pub(in crate::db::executor) const fn aggregate_name(self) -> &'static str {
match self {
Self::Sum => "SUM",
Self::Avg => "AVG",
}
}
pub(in crate::db::executor) fn avg_divisor_conversion_invariant(self) -> InternalError {
let message = match self {
Self::Avg => "numeric field AVG divisor conversion overflowed decimal bounds",
Self::Sum => "AVG divisor conversion invariant is only valid for AVG numeric ops",
};
Self::invariant(message)
}
pub(in crate::db::executor) fn grouped_distinct_output_type_mismatch(
self,
value: &Value,
) -> InternalError {
Self::invariant(format!(
"global {}(DISTINCT field) grouped output type mismatch: {value:?}",
self.aggregate_name(),
))
}
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(in crate::db::executor) enum PreparedScalarNumericAggregateStrategy {
Streaming,
Materialized,
}
pub(in crate::db::executor) enum PreparedScalarNumericPayload<'ctx> {
Aggregate {
strategy: PreparedScalarNumericAggregateStrategy,
prepared: Box<PreparedAggregateStreamingInputs<'ctx>>,
},
GlobalDistinct {
route: Box<GroupedRouteStage>,
},
}
pub(in crate::db::executor) struct PreparedScalarNumericBoundary<'ctx> {
pub(in crate::db::executor) target_field_name: String,
pub(in crate::db::executor) field_slot: FieldSlot,
pub(in crate::db::executor) op: PreparedScalarNumericOp,
pub(in crate::db::executor) payload: PreparedScalarNumericPayload<'ctx>,
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(in crate::db::executor) enum PreparedCoveringDistinctStrategy {
Adjacent,
PreserveFirst,
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(in crate::db::executor) enum PreparedScalarProjectionOp {
Values,
DistinctValues,
CountDistinct,
ValuesWithIds,
TerminalValue { terminal_kind: AggregateKind },
}
impl PreparedScalarProjectionOp {
fn invariant(message: impl Into<String>) -> InternalError {
InternalError::query_executor_invariant(message)
}
pub(in crate::db::executor) fn covering_distinct_strategy_required(self) -> InternalError {
let message = match self {
Self::DistinctValues => {
"covering DISTINCT projection requires prepared distinct strategy"
}
Self::CountDistinct => {
"covering COUNT DISTINCT projection requires prepared distinct strategy"
}
Self::Values | Self::ValuesWithIds | Self::TerminalValue { .. } => {
"covering DISTINCT strategy requirement is only valid for DISTINCT projection ops"
}
};
Self::invariant(message)
}
pub(in crate::db::executor) fn constant_covering_strategy_unsupported(self) -> InternalError {
let message = match self {
Self::ValuesWithIds => {
"values-with-ids projection cannot execute constant covering strategy"
}
Self::Values
| Self::DistinctValues
| Self::CountDistinct
| Self::TerminalValue { .. } => {
"constant covering projection rejection is only valid for values-with-ids"
}
};
Self::invariant(message)
}
pub(in crate::db::executor) fn materialized_branch_unreachable(self) -> InternalError {
let message = match self {
Self::TerminalValue { .. } => {
"terminal value projection materialized branch must execute before row materialization"
}
Self::Values | Self::DistinctValues | Self::CountDistinct | Self::ValuesWithIds => {
"materialized branch terminal-value invariant is only valid for terminal-value projection ops"
}
};
Self::invariant(message)
}
pub(in crate::db::executor) fn validate_terminal_value_kind(self) -> Result<(), InternalError> {
match self {
Self::TerminalValue { terminal_kind }
if !matches!(terminal_kind, AggregateKind::First | AggregateKind::Last) =>
{
Err(Self::invariant(
"terminal value projection requires FIRST/LAST aggregate kind",
))
}
Self::Values
| Self::DistinctValues
| Self::CountDistinct
| Self::ValuesWithIds
| Self::TerminalValue { .. } => Ok(()),
}
}
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(in crate::db::executor) struct ScalarProjectionWindow {
pub(in crate::db::executor) offset: usize,
pub(in crate::db::executor) limit: Option<usize>,
}
#[derive(Clone, Debug)]
pub(in crate::db::executor) enum PreparedScalarProjectionStrategy {
Materialized,
CoveringIndex {
context: CoveringProjectionContext,
window: ScalarProjectionWindow,
distinct: Option<PreparedCoveringDistinctStrategy>,
},
CoveringConstant {
value: Value,
},
}
pub(in crate::db::executor) struct PreparedScalarProjectionBoundary<'ctx> {
pub(in crate::db::executor) target_field_name: String,
pub(in crate::db::executor) field_slot: FieldSlot,
pub(in crate::db::executor) op: PreparedScalarProjectionOp,
pub(in crate::db::executor) strategy: PreparedScalarProjectionStrategy,
pub(in crate::db::executor) prepared: PreparedAggregateStreamingInputs<'ctx>,
}
#[derive(Clone, Debug)]
pub(in crate::db::executor) enum PreparedScalarTerminalOp {
Count,
Exists,
IdTerminal {
kind: AggregateKind,
},
IdBySlot {
kind: AggregateKind,
target_field_name: String,
field_slot: FieldSlot,
},
}
impl PreparedScalarTerminalOp {
fn invariant(message: impl Into<String>) -> InternalError {
InternalError::query_executor_invariant(message)
}
#[must_use]
pub(in crate::db::executor) const fn aggregate_kind(&self) -> AggregateKind {
match self {
Self::Count => AggregateKind::Count,
Self::Exists => AggregateKind::Exists,
Self::IdTerminal { kind } | Self::IdBySlot { kind, .. } => *kind,
}
}
pub(in crate::db::executor) fn validate_kernel_request_kind(
&self,
) -> Result<(), InternalError> {
match self {
Self::Count
| Self::Exists
| Self::IdTerminal {
kind:
AggregateKind::Min | AggregateKind::Max | AggregateKind::First | AggregateKind::Last,
}
| Self::IdBySlot {
kind: AggregateKind::Min | AggregateKind::Max,
..
} => Ok(()),
Self::IdTerminal { .. } => Err(Self::invariant(
"id terminal aggregate request requires MIN/MAX/FIRST/LAST kind",
)),
Self::IdBySlot { .. } => Err(Self::invariant(
"id-by-slot aggregate request requires MIN/MAX kind",
)),
}
}
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(in crate::db::executor) enum PreparedScalarTerminalStrategy {
KernelAggregate,
CountPrimaryKeyCardinality,
ExistingRows { direction: Direction },
}
pub(in crate::db::executor) struct PreparedScalarTerminalBoundary<'ctx> {
pub(in crate::db::executor) op: PreparedScalarTerminalOp,
pub(in crate::db::executor) strategy: PreparedScalarTerminalStrategy,
pub(in crate::db::executor) prepared: PreparedAggregateStreamingInputs<'ctx>,
}
#[derive(Clone, Debug)]
pub(in crate::db::executor) enum PreparedOrderSensitiveTerminalOp {
ResponseOrder {
kind: AggregateKind,
},
FieldOrder {
target_field_name: String,
field_slot: FieldSlot,
op: PreparedFieldOrderSensitiveTerminalOp,
},
}
pub(in crate::db::executor) struct PreparedOrderSensitiveTerminalBoundary<'ctx> {
pub(in crate::db::executor) op: PreparedOrderSensitiveTerminalOp,
pub(in crate::db::executor) prepared: PreparedAggregateStreamingInputs<'ctx>,
}
#[derive(Clone, Debug)]
pub(in crate::db::executor) enum PreparedFieldOrderSensitiveTerminalOp {
Nth { nth: usize },
Median,
MinMax,
}
impl PreparedScalarProjectionOp {
pub(in crate::db::executor) const fn from_request(
request: ScalarProjectionBoundaryRequest,
) -> Self {
match request {
ScalarProjectionBoundaryRequest::Values => Self::Values,
ScalarProjectionBoundaryRequest::DistinctValues => Self::DistinctValues,
ScalarProjectionBoundaryRequest::CountDistinct => Self::CountDistinct,
ScalarProjectionBoundaryRequest::ValuesWithIds => Self::ValuesWithIds,
ScalarProjectionBoundaryRequest::TerminalValue { terminal_kind } => {
Self::TerminalValue { terminal_kind }
}
}
}
}