use crate::{
db::{
data::{DataKey, DataRow},
direction::Direction,
executor::{
AccessScanContinuationInput, AccessStreamBindings, ExecutionKernel, ExecutionPlan,
KeyStreamLoopControl,
aggregate::{
AggregateKind, PreparedAggregateStreamingInputs, ScalarAggregateOutput,
field::{
AggregateFieldValueError, FieldSlot, apply_aggregate_direction,
compare_orderable_field_values_with_slot,
extract_orderable_field_value_from_decoded_slot,
},
},
pipeline::contracts::{
ExecutionInputs, ExecutionRuntimeAdapter, PreparedExecutionProjection,
ProjectionMaterializationMode,
},
plan_metrics::record_rows_scanned_for_path,
read_data_row_with_consistency_from_store,
route::aggregate_extrema_direction,
terminal::{RowDecoder, RowLayout},
},
index::IndexCompilePolicy,
predicate::MissingRowPolicy,
registry::StoreHandle,
},
error::InternalError,
value::{StorageKey, Value},
};
use std::cmp::Ordering;
#[derive(Clone, Copy)]
struct FieldExtremaFoldSpec<'a> {
target_field: &'a str,
field_slot: FieldSlot,
kind: AggregateKind,
direction: Direction,
}
impl FieldExtremaFoldSpec<'_> {
fn materialized_reduction_requires_extrema() -> InternalError {
InternalError::query_executor_invariant(
"materialized field-extrema reduction requires MIN/MAX terminal",
)
}
fn materialized_reduction_reached_non_extrema() -> InternalError {
InternalError::query_executor_invariant(
"materialized field-extrema reduction reached non-extrema terminal",
)
}
fn execution_requires_extrema() -> InternalError {
InternalError::query_executor_invariant(
"field-target aggregate execution requires MIN/MAX terminal",
)
}
fn route_fast_path_required() -> InternalError {
InternalError::query_executor_invariant(
"field-target aggregate streaming requires route-eligible field-extrema fast path",
)
}
fn direction_requires_extrema() -> InternalError {
InternalError::query_executor_invariant(
"field-target aggregate direction requires MIN/MAX terminal",
)
}
fn fold_reached_non_extrema() -> InternalError {
InternalError::query_executor_invariant("field-extrema fold reached non-extrema terminal")
}
fn fold_direction_mismatch() -> InternalError {
InternalError::query_executor_invariant(
"field-extrema fold direction must match aggregate terminal semantics",
)
}
fn extrema_direction(&self) -> Result<Direction, InternalError> {
aggregate_extrema_direction(self.kind).ok_or_else(Self::direction_requires_extrema)
}
fn finalize_output(
&self,
selected_key: Option<StorageKey>,
) -> Result<ScalarAggregateOutput, InternalError> {
self.kind
.extrema_output(selected_key)
.ok_or_else(Self::fold_reached_non_extrema)
}
}
impl ExecutionKernel {
pub(in crate::db::executor::aggregate) fn aggregate_field_extrema_from_materialized(
rows: Vec<DataRow>,
row_layout: &RowLayout,
kind: AggregateKind,
target_field: &str,
field_slot: FieldSlot,
) -> Result<ScalarAggregateOutput, InternalError> {
if !kind.is_extrema() {
return Err(FieldExtremaFoldSpec::materialized_reduction_requires_extrema());
}
let compare_direction = aggregate_extrema_direction(kind)
.ok_or_else(FieldExtremaFoldSpec::materialized_reduction_reached_non_extrema)?;
let mut selected: Option<(StorageKey, Value)> = None;
for (data_key, raw_row) in rows {
let candidate_key = data_key.storage_key();
let candidate_value = RowDecoder::decode_required_slot_value(
row_layout,
candidate_key,
&raw_row,
field_slot.index,
)?;
let candidate_value = extract_orderable_field_value_from_decoded_slot(
target_field,
field_slot,
candidate_value,
)
.map_err(AggregateFieldValueError::into_internal_error)?;
let should_replace = match selected.as_ref() {
Some((current_key, current_value)) => {
let field_order = compare_orderable_field_values_with_slot(
target_field,
field_slot,
&candidate_value,
current_value,
)
.map_err(AggregateFieldValueError::into_internal_error)?;
let directional_field_order =
apply_aggregate_direction(field_order, compare_direction);
directional_field_order == Ordering::Less
|| (directional_field_order == Ordering::Equal
&& candidate_key < *current_key)
}
None => true,
};
if should_replace {
selected = Some((candidate_key, candidate_value));
}
}
let selected_key = selected.map(|(key, _)| key);
kind.extrema_output(selected_key)
.ok_or_else(FieldExtremaFoldSpec::materialized_reduction_reached_non_extrema)
}
pub(in crate::db::executor::aggregate) fn execute_field_target_extrema_aggregate(
prepared: &PreparedAggregateStreamingInputs<'_>,
kind: AggregateKind,
target_field: &str,
field_slot: crate::db::executor::aggregate::field::FieldSlot,
direction: Direction,
route_plan: &crate::db::executor::ExecutionPlan,
) -> Result<ScalarAggregateOutput, InternalError> {
let field_fast_path_eligible = if kind == AggregateKind::Min {
route_plan.field_min_fast_path_eligible()
} else if kind == AggregateKind::Max {
route_plan.field_max_fast_path_eligible()
} else {
return Err(FieldExtremaFoldSpec::execution_requires_extrema());
};
if !field_fast_path_eligible {
return Err(FieldExtremaFoldSpec::route_fast_path_required());
}
let spec = FieldExtremaFoldSpec {
target_field,
field_slot,
kind,
direction,
};
let consistency = prepared.consistency();
let (probe_output, probe_rows_scanned) = Self::fold_field_target_extrema_for_route_plan(
prepared,
consistency,
route_plan,
&spec,
)?;
if !Self::field_extrema_probe_may_be_inconclusive(
consistency,
spec.kind,
route_plan.aggregate_seek_fetch_hint(),
&probe_output,
probe_rows_scanned,
) {
record_rows_scanned_for_path(prepared.authority.entity_path(), probe_rows_scanned);
return Ok(probe_output);
}
let mut fallback_route_plan = route_plan.clone();
fallback_route_plan.scan_hints.physical_fetch_hint = None;
fallback_route_plan.index_range_limit_spec = None;
fallback_route_plan.aggregate_seek_spec = None;
let (fallback_output, fallback_rows_scanned) =
Self::fold_field_target_extrema_for_route_plan(
prepared,
consistency,
&fallback_route_plan,
&spec,
)?;
let total_rows_scanned = probe_rows_scanned.saturating_add(fallback_rows_scanned);
record_rows_scanned_for_path(prepared.authority.entity_path(), total_rows_scanned);
Ok(fallback_output)
}
fn fold_field_target_extrema_for_route_plan(
prepared: &PreparedAggregateStreamingInputs<'_>,
consistency: MissingRowPolicy,
route_plan: &ExecutionPlan,
spec: &FieldExtremaFoldSpec<'_>,
) -> Result<(ScalarAggregateOutput, usize), InternalError> {
let row_layout = prepared.authority.row_layout();
let runtime = ExecutionRuntimeAdapter::from_stream_runtime_parts(
&prepared.logical_plan.access,
crate::db::executor::TraversalRuntime::new(
prepared.store,
prepared.authority.entity_tag(),
),
);
let execution_inputs = ExecutionInputs::new_prepared(
&runtime,
&prepared.logical_plan,
AccessStreamBindings {
index_prefix_specs: prepared.index_prefix_specs.as_slice(),
index_range_specs: prepared.index_range_specs.as_slice(),
continuation: AccessScanContinuationInput::new(None, spec.direction),
},
&prepared.execution_preparation,
ProjectionMaterializationMode::SharedValidation,
PreparedExecutionProjection::empty(),
false,
);
let mut resolved = execution_inputs
.resolve_execution_key_stream(route_plan, IndexCompilePolicy::StrictAllOrNone)?;
let (aggregate_output, keys_scanned) = Self::fold_streaming_field_extrema(
prepared.store,
&row_layout,
consistency,
resolved.key_stream_mut(),
spec,
)?;
let rows_scanned = resolved.rows_scanned_override().unwrap_or(keys_scanned);
Ok((aggregate_output, rows_scanned))
}
fn fold_streaming_field_extrema<S>(
store: StoreHandle,
row_layout: &RowLayout,
consistency: MissingRowPolicy,
key_stream: &mut S,
spec: &FieldExtremaFoldSpec<'_>,
) -> Result<(ScalarAggregateOutput, usize), InternalError>
where
S: crate::db::executor::OrderedKeyStream + ?Sized,
{
if spec.direction != spec.extrema_direction()? {
return Err(FieldExtremaFoldSpec::fold_direction_mismatch());
}
let mut keys_scanned = 0usize;
let mut selected: Option<(StorageKey, Value)> = None;
loop {
let Some(key) = key_stream.next_key()? else {
break;
};
match Self::fold_streaming_field_extrema_key(
store,
row_layout,
consistency,
key,
spec,
&mut keys_scanned,
&mut selected,
)? {
KeyStreamLoopControl::Skip | KeyStreamLoopControl::Emit => {}
KeyStreamLoopControl::Stop => break,
}
}
let selected_key = selected.map(|(key, _)| key);
let output = spec.finalize_output(selected_key)?;
Ok((output, keys_scanned))
}
fn fold_streaming_field_extrema_key(
store: StoreHandle,
row_layout: &RowLayout,
consistency: MissingRowPolicy,
data_key: DataKey,
spec: &FieldExtremaFoldSpec<'_>,
keys_scanned: &mut usize,
selected: &mut Option<(StorageKey, Value)>,
) -> Result<KeyStreamLoopControl, InternalError> {
*keys_scanned = keys_scanned.saturating_add(1);
let Some(row) = read_data_row_with_consistency_from_store(store, &data_key, consistency)?
else {
return Ok(KeyStreamLoopControl::Emit);
};
let key = data_key.storage_key();
let value =
RowDecoder::decode_required_slot_value(row_layout, key, &row.1, spec.field_slot.index)?;
let value = extract_orderable_field_value_from_decoded_slot(
spec.target_field,
spec.field_slot,
value,
)
.map_err(AggregateFieldValueError::into_internal_error)?;
let selected_was_empty = selected.is_none();
let candidate_replaces = match selected.as_ref() {
Some((current_key, current_value)) => {
let field_order = compare_orderable_field_values_with_slot(
spec.target_field,
spec.field_slot,
&value,
current_value,
)
.map_err(AggregateFieldValueError::into_internal_error)?;
let directional_field_order =
apply_aggregate_direction(field_order, spec.direction);
directional_field_order == Ordering::Less
|| (directional_field_order == Ordering::Equal && key < *current_key)
}
None => true,
};
if candidate_replaces {
*selected = Some((key, value));
if selected_was_empty && matches!(spec.kind, AggregateKind::Min) {
return Ok(KeyStreamLoopControl::Stop);
}
return Ok(KeyStreamLoopControl::Emit);
}
let Some((_, current_value)) = selected.as_ref() else {
return Ok(KeyStreamLoopControl::Emit);
};
let field_order = compare_orderable_field_values_with_slot(
spec.target_field,
spec.field_slot,
&value,
current_value,
)
.map_err(AggregateFieldValueError::into_internal_error)?;
let directional_field_order = apply_aggregate_direction(field_order, spec.direction);
if directional_field_order == Ordering::Greater {
return Ok(KeyStreamLoopControl::Stop);
}
Ok(KeyStreamLoopControl::Emit)
}
const fn field_extrema_probe_may_be_inconclusive(
consistency: MissingRowPolicy,
kind: AggregateKind,
probe_fetch_hint: Option<usize>,
probe_output: &ScalarAggregateOutput,
probe_rows_scanned: usize,
) -> bool {
if !matches!(consistency, MissingRowPolicy::Ignore) {
return false;
}
if !kind.is_extrema() {
return false;
}
let Some(fetch) = probe_fetch_hint else {
return false;
};
if fetch == 0 || probe_rows_scanned < fetch {
return false;
}
kind.is_unresolved_extrema_output(probe_output)
}
}