mod covering;
mod decode;
use crate::{
db::{
cursor::IndexScanContinuationInput,
cursor::{ContinuationRuntime, LoopAction},
data::{DataKey, DataRow},
direction::Direction,
executor::{
AccessScanContinuationInput, AccessStreamBindings, ExecutableAccess, ExecutablePlan,
ExecutionKernel, ExecutionPreparation, KeyStreamLoopControl, PreparedAggregatePlan,
TraversalRuntime,
aggregate::{
AggregateKind, PreparedAggregateStreamingInputs, PreparedCoveringDistinctStrategy,
PreparedScalarProjectionExecutionState, PreparedScalarProjectionOp,
PreparedScalarProjectionStrategy, ScalarProjectionWindow,
field::{
AggregateFieldValueError, FieldSlot,
extract_orderable_field_value_with_slot_reader,
resolve_any_aggregate_target_slot_from_planner_slot_with_model,
resolve_orderable_aggregate_target_slot_from_planner_slot_with_model,
},
materialized_distinct::insert_materialized_distinct_value,
projection::{
covering::{
CoveringProjectionValues, covering_index_adjacent_distinct_eligible,
covering_index_projection_context, dedup_adjacent_values,
dedup_values_preserving_first, scalar_window_for_covering_projection,
},
decode::decode_covering_projection_component,
},
},
group::GroupKeySet,
pipeline::contracts::LoadExecutor,
plan_metrics::record_rows_scanned_for_path,
preparation::resolved_index_slots_for_access_path,
read_row_with_consistency_from_store,
terminal::{RowDecoder, RowLayout},
},
predicate::MissingRowPolicy,
query::{
builder::AggregateExpr,
plan::{
CoveringProjectionContext, CoveringProjectionOrder, FieldSlot as PlannedFieldSlot,
constant_covering_projection_value_from_access,
},
},
},
error::InternalError,
traits::{EntityKind, EntityValue},
types::Id,
value::Value,
};
use std::cell::RefCell;
type ValueProjection = Vec<(DataKey, Value)>;
type CoveringProjectionPairRows = Vec<(DataKey, Value)>;
type CoveringProjectionPairsResolution = Result<Option<CoveringProjectionPairRows>, InternalError>;
type CoveringProjectionComponentRows = Vec<(DataKey, Vec<u8>)>;
pub(in crate::db) enum ScalarProjectionBoundaryRequest {
Values,
DistinctValues,
CountNonNull,
CountDistinct,
ValuesWithIds,
TerminalValue { terminal_kind: AggregateKind },
}
pub(in crate::db) enum ScalarProjectionBoundaryOutput {
Count(u32),
Values(Vec<Value>),
ValuesWithDataKeys(ValueProjection),
TerminalValue(Option<Value>),
}
impl ScalarProjectionBoundaryOutput {
fn output_kind_mismatch(message: &'static str) -> InternalError {
InternalError::query_executor_invariant(message)
}
pub(in crate::db) fn into_values(self) -> Result<Vec<Value>, InternalError> {
match self {
Self::Values(values) => Ok(values),
_ => Err(Self::output_kind_mismatch(
"scalar projection boundary values output kind mismatch",
)),
}
}
pub(in crate::db) fn into_count(self) -> Result<u32, InternalError> {
match self {
Self::Count(value) => Ok(value),
_ => Err(Self::output_kind_mismatch(
"scalar projection boundary count output kind mismatch",
)),
}
}
pub(in crate::db) fn into_values_with_ids<E>(self) -> Result<Vec<(Id<E>, Value)>, InternalError>
where
E: EntityKind + EntityValue,
{
match self {
Self::ValuesWithDataKeys(values) => values
.into_iter()
.map(|(data_key, value)| Ok((Id::from_key(data_key.try_key::<E>()?), value)))
.collect(),
_ => Err(Self::output_kind_mismatch(
"scalar projection boundary values-with-ids output kind mismatch",
)),
}
}
pub(in crate::db) fn into_terminal_value(self) -> Result<Option<Value>, InternalError> {
match self {
Self::TerminalValue(value) => Ok(value),
_ => Err(Self::output_kind_mismatch(
"scalar projection boundary terminal-value output kind mismatch",
)),
}
}
}
impl<E> LoadExecutor<E>
where
E: EntityKind + EntityValue,
{
pub(in crate::db) fn execute_scalar_extrema_value_boundary(
&self,
plan: ExecutablePlan<E>,
target_field: PlannedFieldSlot,
terminal_kind: AggregateKind,
) -> Result<Option<Value>, InternalError> {
if !terminal_kind.is_extrema() {
return Err(InternalError::query_executor_invariant(
"scalar extrema value boundary requires MIN/MAX aggregate kind",
));
}
let plan = plan.into_prepared_aggregate_plan();
let authority = plan.authority();
let field_slot = resolve_orderable_aggregate_target_slot_from_planner_slot_with_model(
authority.model(),
&target_field,
)
.map_err(AggregateFieldValueError::into_internal_error)?;
let prepared = self.prepare_scalar_aggregate_boundary(plan)?;
self.execute_selected_value_field_projection_with_slot(
prepared,
target_field.field(),
field_slot,
terminal_kind,
)
}
pub(in crate::db) fn execute_scalar_projection_boundary(
&self,
plan: ExecutablePlan<E>,
target_field: PlannedFieldSlot,
request: ScalarProjectionBoundaryRequest,
) -> Result<ScalarProjectionBoundaryOutput, InternalError> {
let prepared = self.prepare_scalar_projection_boundary(
plan.into_prepared_aggregate_plan(),
target_field,
request,
)?;
self.execute_prepared_scalar_projection_boundary(prepared)
}
fn prepare_scalar_projection_boundary(
&self,
plan: PreparedAggregatePlan,
target_field: PlannedFieldSlot,
request: ScalarProjectionBoundaryRequest,
) -> Result<PreparedScalarProjectionExecutionState<'_>, InternalError> {
let target_field_name = target_field.field().to_string();
let authority = plan.authority();
let field_slot = resolve_any_aggregate_target_slot_from_planner_slot_with_model(
authority.model(),
&target_field,
)
.map_err(AggregateFieldValueError::into_internal_error)?;
let prepared = self.prepare_scalar_aggregate_boundary(plan)?;
let op = PreparedScalarProjectionOp::from_request(request);
op.validate_terminal_value_kind()?;
let strategy = Self::prepare_scalar_projection_strategy(&prepared, &target_field_name, op);
Ok(PreparedScalarProjectionExecutionState {
boundary: crate::db::executor::aggregate::PreparedScalarProjectionBoundary {
target_field_name,
field_slot,
op,
strategy,
},
prepared,
})
}
fn execute_prepared_scalar_projection_boundary(
&self,
prepared_state: PreparedScalarProjectionExecutionState<'_>,
) -> Result<ScalarProjectionBoundaryOutput, InternalError> {
let PreparedScalarProjectionExecutionState { boundary, prepared } = prepared_state;
let row_layout = RowLayout::from_model(prepared.authority.model());
match boundary.strategy.clone() {
PreparedScalarProjectionStrategy::Materialized => self
.execute_materialized_scalar_projection_boundary(boundary, prepared, &row_layout),
PreparedScalarProjectionStrategy::StreamingCountNonNull { direction } => {
Self::execute_streaming_count_non_null_scalar_projection_boundary(
boundary,
prepared,
&row_layout,
direction,
)
}
PreparedScalarProjectionStrategy::CoveringIndex {
context,
window,
distinct,
} => self.execute_covering_scalar_projection_boundary(
boundary, prepared, context, window, distinct,
),
PreparedScalarProjectionStrategy::CoveringConstant { value } => {
self.execute_constant_scalar_projection_boundary(boundary, prepared, value)
}
}
}
fn prepare_scalar_projection_strategy(
prepared: &PreparedAggregateStreamingInputs<'_>,
target_field: &str,
op: PreparedScalarProjectionOp,
) -> PreparedScalarProjectionStrategy {
if !prepared.has_predicate()
&& let Some(context) = covering_index_projection_context(
&prepared.logical_plan.access,
prepared.order_spec(),
target_field,
prepared.authority.model().primary_key.name,
)
{
let window = ScalarProjectionWindow {
offset: scalar_window_for_covering_projection(prepared.page_spec()).0,
limit: scalar_window_for_covering_projection(prepared.page_spec()).1,
};
let distinct = match op {
PreparedScalarProjectionOp::DistinctValues
| PreparedScalarProjectionOp::CountDistinct => {
Some(if covering_index_adjacent_distinct_eligible(context) {
PreparedCoveringDistinctStrategy::Adjacent
} else {
PreparedCoveringDistinctStrategy::PreserveFirst
})
}
_ => None,
};
return PreparedScalarProjectionStrategy::CoveringIndex {
context,
window,
distinct,
};
}
match op {
PreparedScalarProjectionOp::Values
| PreparedScalarProjectionOp::DistinctValues
| PreparedScalarProjectionOp::CountDistinct
| PreparedScalarProjectionOp::TerminalValue { .. } => {
if let Some(value) =
Self::constant_covering_projection_value_if_eligible(prepared, target_field)
{
return PreparedScalarProjectionStrategy::CoveringConstant { value };
}
}
PreparedScalarProjectionOp::CountNonNull => {
if let Some(value) =
Self::constant_covering_projection_value_if_eligible(prepared, target_field)
{
return PreparedScalarProjectionStrategy::CoveringConstant { value };
}
if prepared.supports_streaming_existing_row_field_fold() {
return PreparedScalarProjectionStrategy::StreamingCountNonNull {
direction: prepared.streaming_existing_row_field_direction(),
};
}
}
PreparedScalarProjectionOp::ValuesWithIds => {}
}
PreparedScalarProjectionStrategy::Materialized
}
fn execute_covering_scalar_projection_boundary(
&self,
boundary: crate::db::executor::aggregate::PreparedScalarProjectionBoundary,
prepared: PreparedAggregateStreamingInputs<'_>,
context: CoveringProjectionContext,
window: ScalarProjectionWindow,
distinct: Option<PreparedCoveringDistinctStrategy>,
) -> Result<ScalarProjectionBoundaryOutput, InternalError> {
match boundary.op {
PreparedScalarProjectionOp::Values => {
if let Some(covering_projection) =
Self::covering_index_projection_values_with_context_from_prepared(
&prepared, context, window,
)?
{
return Ok(ScalarProjectionBoundaryOutput::Values(
covering_projection.values,
));
}
}
PreparedScalarProjectionOp::DistinctValues => {
if let Some(covering_projection) =
Self::covering_index_projection_values_with_context_from_prepared(
&prepared, context, window,
)?
{
let values = match distinct {
Some(PreparedCoveringDistinctStrategy::Adjacent) => {
dedup_adjacent_values(covering_projection.values)
}
Some(PreparedCoveringDistinctStrategy::PreserveFirst) => {
dedup_values_preserving_first(covering_projection.values)?
}
None => {
return Err(boundary.op.covering_distinct_strategy_required());
}
};
return Ok(ScalarProjectionBoundaryOutput::Values(values));
}
}
PreparedScalarProjectionOp::CountNonNull => {
if let Some(covering_projection) =
Self::covering_index_projection_values_with_context_from_prepared(
&prepared, context, window,
)?
{
let count = covering_projection
.values
.into_iter()
.filter(|value| !matches!(value, Value::Null))
.count();
return Ok(ScalarProjectionBoundaryOutput::Count(
u32::try_from(count).unwrap_or(u32::MAX),
));
}
}
PreparedScalarProjectionOp::CountDistinct => {
if let Some(covering_projection) =
Self::covering_index_projection_values_with_context_from_prepared(
&prepared, context, window,
)?
{
let values = match distinct {
Some(PreparedCoveringDistinctStrategy::Adjacent) => {
dedup_adjacent_values(covering_projection.values)
}
Some(PreparedCoveringDistinctStrategy::PreserveFirst) => {
dedup_values_preserving_first(covering_projection.values)?
}
None => {
return Err(boundary.op.covering_distinct_strategy_required());
}
};
return Ok(ScalarProjectionBoundaryOutput::Count(
u32::try_from(values.len()).unwrap_or(u32::MAX),
));
}
}
PreparedScalarProjectionOp::ValuesWithIds => {
if let Some(values) =
Self::covering_index_projection_values_from_context_structural(
&prepared, context, window,
)?
{
return Ok(ScalarProjectionBoundaryOutput::ValuesWithDataKeys(values));
}
}
PreparedScalarProjectionOp::TerminalValue { terminal_kind } => {
if let Some(covering_projection) =
Self::covering_index_projection_values_with_context_from_prepared(
&prepared, context, window,
)?
{
PreparedScalarProjectionOp::TerminalValue { terminal_kind }
.validate_terminal_value_kind()?;
let value = match terminal_kind {
AggregateKind::First => covering_projection.values.first().cloned(),
AggregateKind::Last => covering_projection.values.last().cloned(),
_ => unreachable!(),
};
return Ok(ScalarProjectionBoundaryOutput::TerminalValue(value));
}
}
}
let row_layout = RowLayout::from_model(prepared.authority.model());
self.execute_materialized_scalar_projection_boundary(boundary, prepared, &row_layout)
}
fn execute_constant_scalar_projection_boundary(
&self,
boundary: crate::db::executor::aggregate::PreparedScalarProjectionBoundary,
prepared: PreparedAggregateStreamingInputs<'_>,
value: Value,
) -> Result<ScalarProjectionBoundaryOutput, InternalError> {
match boundary.op {
PreparedScalarProjectionOp::Values => {
let row_count = self.aggregate_count_from_prepared(prepared)?;
let output_len = usize::try_from(row_count).unwrap_or(usize::MAX);
Ok(ScalarProjectionBoundaryOutput::Values(vec![
value;
output_len
]))
}
PreparedScalarProjectionOp::DistinctValues => {
let has_rows = self.aggregate_exists_from_prepared(prepared)?;
Ok(ScalarProjectionBoundaryOutput::Values(if has_rows {
vec![value]
} else {
Vec::new()
}))
}
PreparedScalarProjectionOp::CountNonNull => Ok(ScalarProjectionBoundaryOutput::Count(
if matches!(value, Value::Null) {
0
} else {
self.aggregate_count_from_prepared(prepared)?
},
)),
PreparedScalarProjectionOp::CountDistinct => {
let has_rows = self.aggregate_exists_from_prepared(prepared)?;
Ok(ScalarProjectionBoundaryOutput::Count(u32::from(has_rows)))
}
PreparedScalarProjectionOp::TerminalValue { .. } => {
let has_rows = self.aggregate_exists_from_prepared(prepared)?;
Ok(ScalarProjectionBoundaryOutput::TerminalValue(
has_rows.then_some(value),
))
}
PreparedScalarProjectionOp::ValuesWithIds => {
Err(boundary.op.constant_covering_strategy_unsupported())
}
}
}
fn execute_materialized_scalar_projection_boundary(
&self,
boundary: crate::db::executor::aggregate::PreparedScalarProjectionBoundary,
prepared: PreparedAggregateStreamingInputs<'_>,
row_layout: &RowLayout,
) -> Result<ScalarProjectionBoundaryOutput, InternalError> {
if let PreparedScalarProjectionOp::TerminalValue { terminal_kind } = boundary.op {
return self
.execute_selected_value_field_projection_with_slot(
prepared,
&boundary.target_field_name,
boundary.field_slot,
terminal_kind,
)
.map(ScalarProjectionBoundaryOutput::TerminalValue);
}
let page = self.execute_scalar_materialized_page_stage(prepared)?;
let (rows, _) = page.into_parts();
match boundary.op {
PreparedScalarProjectionOp::Values => {
let projected_values = Self::project_field_values_from_materialized_structural(
rows,
row_layout,
&boundary.target_field_name,
boundary.field_slot,
)?;
Ok(ScalarProjectionBoundaryOutput::Values(
Self::field_values_from_projection(projected_values),
))
}
PreparedScalarProjectionOp::DistinctValues => {
Self::project_field_values_from_materialized_structural(
rows,
row_layout,
&boundary.target_field_name,
boundary.field_slot,
)
.and_then(Self::project_distinct_field_values_from_materialized)
.map(ScalarProjectionBoundaryOutput::Values)
}
PreparedScalarProjectionOp::CountNonNull => {
Self::count_non_null_field_values_from_materialized_structural(
rows,
row_layout,
&boundary.target_field_name,
boundary.field_slot,
)
.map(ScalarProjectionBoundaryOutput::Count)
}
PreparedScalarProjectionOp::CountDistinct => {
Self::project_field_values_from_materialized_structural(
rows,
row_layout,
&boundary.target_field_name,
boundary.field_slot,
)
.and_then(Self::project_distinct_field_values_from_materialized)
.map(|values| {
ScalarProjectionBoundaryOutput::Count(
u32::try_from(values.len()).unwrap_or(u32::MAX),
)
})
}
PreparedScalarProjectionOp::ValuesWithIds => {
Self::project_field_values_from_materialized_structural(
rows,
row_layout,
&boundary.target_field_name,
boundary.field_slot,
)
.map(ScalarProjectionBoundaryOutput::ValuesWithDataKeys)
}
PreparedScalarProjectionOp::TerminalValue { .. } => {
Err(boundary.op.materialized_branch_unreachable())
}
}
}
fn execute_streaming_count_non_null_scalar_projection_boundary(
boundary: crate::db::executor::aggregate::PreparedScalarProjectionBoundary,
prepared: PreparedAggregateStreamingInputs<'_>,
row_layout: &RowLayout,
direction: Direction,
) -> Result<ScalarProjectionBoundaryOutput, InternalError> {
let consistency = prepared.consistency();
let PreparedAggregateStreamingInputs {
authority,
store,
logical_plan,
index_prefix_specs,
index_range_specs,
..
} = prepared;
let execution_preparation = ExecutionPreparation::from_strict_runtime_plan(
authority.model(),
&logical_plan,
resolved_index_slots_for_access_path(
authority.model(),
logical_plan.access.resolve_strategy().executable(),
),
);
let continuation = RefCell::new(ContinuationRuntime::from_window(
ExecutionKernel::window_cursor_contract(&logical_plan, None),
));
let index_predicate_execution = execution_preparation.strict_mode().map(|program| {
crate::db::index::predicate::IndexPredicateExecution {
program,
rejected_keys_counter: None,
}
});
let access = ExecutableAccess::new(
&logical_plan.access,
AccessStreamBindings::new(
index_prefix_specs.as_slice(),
index_range_specs.as_slice(),
AccessScanContinuationInput::new(None, direction),
),
None,
index_predicate_execution,
);
let runtime = TraversalRuntime::new(store, authority.entity_tag());
let mut key_stream = runtime.ordered_key_stream_from_runtime_access(access)?;
let mut rows_scanned = 0usize;
let mut count = 0u32;
let mut pre_key = || {
Self::loop_control_from_projection_continuation(continuation.borrow_mut().pre_fetch())
};
let mut on_key = |_data_key,
row: Option<crate::db::executor::terminal::page::KernelRow>|
-> Result<KeyStreamLoopControl, InternalError> {
let Some(row) = row else {
return Ok(KeyStreamLoopControl::Emit);
};
rows_scanned = rows_scanned.saturating_add(1);
match continuation.borrow_mut().accept_row() {
LoopAction::Skip => return Ok(KeyStreamLoopControl::Skip),
LoopAction::Emit => {}
LoopAction::Stop => return Ok(KeyStreamLoopControl::Stop),
}
let value = extract_orderable_field_value_with_slot_reader(
&boundary.target_field_name,
boundary.field_slot,
&mut |index| row.slot(index),
)
.map_err(AggregateFieldValueError::into_internal_error)?;
if !matches!(value, Value::Null) {
count = count.saturating_add(1);
}
Ok(KeyStreamLoopControl::Emit)
};
Self::drive_field_row_stream(
store,
row_layout,
consistency,
key_stream.as_mut(),
&mut pre_key,
&mut on_key,
)?;
record_rows_scanned_for_path(authority.entity_path(), rows_scanned);
Ok(ScalarProjectionBoundaryOutput::Count(count))
}
fn execute_selected_value_field_projection_with_slot(
&self,
prepared: PreparedAggregateStreamingInputs<'_>,
target_field: &str,
field_slot: FieldSlot,
terminal_kind: AggregateKind,
) -> Result<Option<Value>, InternalError> {
let consistency = prepared.consistency();
let store = prepared.store;
let entity_tag = prepared.authority.entity_tag();
let row_layout = RowLayout::from_model(prepared.authority.model());
let aggregate = if terminal_kind.is_extrema() {
AggregateExpr::field_target_extrema_for_kind(terminal_kind, target_field)
} else {
AggregateExpr::terminal_for_kind(terminal_kind)
};
let state =
ExecutionKernel::prepare_aggregate_execution_state_from_prepared(prepared, aggregate);
let selected_key = ExecutionKernel::execute_prepared_aggregate_state(self, state)?
.into_optional_id_terminal(
terminal_kind,
"terminal value projection result kind mismatch",
)?;
let Some(selected_key) = selected_key else {
return Ok(None);
};
let key = DataKey::new(entity_tag, selected_key);
let Some(value) = Self::read_field_value_for_aggregate(
store,
&row_layout,
consistency,
&key,
target_field,
field_slot,
)?
else {
return Ok(None);
};
Ok(Some(value))
}
fn field_values_from_projection(projected_values: ValueProjection) -> Vec<Value> {
projected_values
.into_iter()
.map(|(_, value)| value)
.collect()
}
fn project_distinct_field_values_from_materialized(
projected_values: ValueProjection,
) -> Result<Vec<Value>, InternalError> {
project_distinct_field_values_from_structural_projection(projected_values)
}
fn count_non_null_field_values_from_materialized_structural(
rows: Vec<DataRow>,
row_layout: &RowLayout,
target_field: &str,
field_slot: FieldSlot,
) -> Result<u32, InternalError> {
let row_decoder = RowDecoder::structural();
let mut count = 0_u32;
for (data_key, raw_row) in rows {
let row = row_decoder.decode(row_layout, (data_key, raw_row))?;
let value = extract_orderable_field_value_with_slot_reader(
target_field,
field_slot,
&mut |index| row.slot(index),
)
.map_err(AggregateFieldValueError::into_internal_error)?;
if !matches!(value, Value::Null) {
count = count.saturating_add(1);
}
}
Ok(count)
}
fn project_field_values_from_materialized_structural(
rows: Vec<DataRow>,
row_layout: &RowLayout,
target_field: &str,
field_slot: FieldSlot,
) -> Result<ValueProjection, InternalError> {
let row_decoder = RowDecoder::structural();
rows.into_iter()
.map(|(data_key, raw_row)| {
let row = row_decoder.decode(row_layout, (data_key.clone(), raw_row))?;
extract_orderable_field_value_with_slot_reader(
target_field,
field_slot,
&mut |index| row.slot(index),
)
.map(|value| (data_key, value))
.map_err(AggregateFieldValueError::into_internal_error)
})
.collect()
}
fn constant_covering_projection_value_if_eligible(
prepared: &PreparedAggregateStreamingInputs<'_>,
target_field: &str,
) -> Option<Value> {
if !matches!(prepared.consistency(), MissingRowPolicy::Ignore) {
return None;
}
constant_covering_projection_value_from_access(&prepared.logical_plan.access, target_field)
}
fn covering_index_projection_values_with_context_from_prepared(
prepared: &PreparedAggregateStreamingInputs<'_>,
context: CoveringProjectionContext,
window: ScalarProjectionWindow,
) -> Result<Option<CoveringProjectionValues>, InternalError> {
let Some(projected_pairs) =
Self::covering_index_projection_pairs_from_context(prepared, context, window)?
else {
return Ok(None);
};
let values = projected_pairs
.into_iter()
.map(|(_, value)| value)
.collect();
Ok(Some(CoveringProjectionValues { values }))
}
fn covering_index_projection_values_from_context_structural(
prepared: &PreparedAggregateStreamingInputs<'_>,
context: CoveringProjectionContext,
window: ScalarProjectionWindow,
) -> Result<Option<ValueProjection>, InternalError> {
Self::covering_index_projection_pairs_from_context(prepared, context, window)
}
fn covering_index_projection_pairs_from_context(
prepared: &PreparedAggregateStreamingInputs<'_>,
context: CoveringProjectionContext,
window: ScalarProjectionWindow,
) -> CoveringProjectionPairsResolution {
let scan_direction = match context.order_contract {
CoveringProjectionOrder::IndexOrder(direction) => direction,
CoveringProjectionOrder::PrimaryKeyOrder(_) => Direction::Asc,
};
let raw_pairs = Self::read_covering_projection_component_pairs(
prepared,
context.component_index,
scan_direction,
)?;
let mut projected_pairs = Vec::with_capacity(raw_pairs.len());
for (data_key, component_bytes) in raw_pairs {
if read_row_with_consistency_from_store(
prepared.store,
&data_key,
prepared.consistency(),
)?
.is_none()
{
continue;
}
let Some(value) = decode_covering_projection_component(&component_bytes)? else {
return Ok(None);
};
projected_pairs.push((data_key, value));
}
match context.order_contract {
CoveringProjectionOrder::PrimaryKeyOrder(Direction::Asc) => {
projected_pairs.sort_by(|left, right| left.0.cmp(&right.0));
}
CoveringProjectionOrder::PrimaryKeyOrder(Direction::Desc) => {
projected_pairs.sort_by(|left, right| right.0.cmp(&left.0));
}
CoveringProjectionOrder::IndexOrder(Direction::Asc | Direction::Desc) => {}
}
let windowed_pairs = match window.limit {
Some(limit) => projected_pairs
.into_iter()
.skip(window.offset)
.take(limit)
.collect(),
None => projected_pairs.into_iter().skip(window.offset).collect(),
};
Ok(Some(windowed_pairs))
}
fn read_covering_projection_component_pairs(
prepared: &PreparedAggregateStreamingInputs<'_>,
component_index: usize,
direction: Direction,
) -> Result<CoveringProjectionComponentRows, InternalError> {
let continuation = IndexScanContinuationInput::new(None, direction);
let prefix_specs = prepared.index_prefix_specs.as_slice();
if let [spec] = prefix_specs {
return Self::read_covering_projection_component_pairs_for_index_bounds(
prepared.store_resolver,
prepared.authority.entity_tag(),
spec.index(),
(spec.lower(), spec.upper()),
continuation,
component_index,
);
}
if !prefix_specs.is_empty() {
return Err(InternalError::query_executor_invariant(
"covering projection index-prefix path requires one lowered prefix spec",
));
}
let range_specs = prepared.index_range_specs.as_slice();
if let [spec] = range_specs {
return Self::read_covering_projection_component_pairs_for_index_bounds(
prepared.store_resolver,
prepared.authority.entity_tag(),
spec.index(),
(spec.lower(), spec.upper()),
continuation,
component_index,
);
}
if !range_specs.is_empty() {
return Err(InternalError::query_executor_invariant(
"covering projection index-range path requires one lowered range spec",
));
}
Err(InternalError::query_executor_invariant(
"covering projection component scans require index-backed access paths",
))
}
fn aggregate_count_from_prepared(
&self,
prepared: PreparedAggregateStreamingInputs<'_>,
) -> Result<u32, InternalError> {
let state = ExecutionKernel::prepare_aggregate_execution_state_from_prepared(
prepared,
AggregateExpr::terminal_for_kind(AggregateKind::Count),
);
ExecutionKernel::execute_prepared_aggregate_state(self, state)?
.into_count("projection COUNT helper result kind mismatch")
}
fn aggregate_exists_from_prepared(
&self,
prepared: PreparedAggregateStreamingInputs<'_>,
) -> Result<bool, InternalError> {
let state = ExecutionKernel::prepare_aggregate_execution_state_from_prepared(
prepared,
AggregateExpr::terminal_for_kind(AggregateKind::Exists),
);
ExecutionKernel::execute_prepared_aggregate_state(self, state)?
.into_exists("projection EXISTS helper result kind mismatch")
}
fn read_covering_projection_component_pairs_for_index_bounds(
store_resolver: crate::db::executor::StoreResolver<'_>,
entity_tag: crate::types::EntityTag,
index: &crate::model::index::IndexModel,
bounds: (
&std::ops::Bound<crate::db::index::RawIndexKey>,
&std::ops::Bound<crate::db::index::RawIndexKey>,
),
continuation: IndexScanContinuationInput,
component_index: usize,
) -> Result<CoveringProjectionComponentRows, InternalError> {
let store = store_resolver.try_get_store(index.store())?;
store.with_index(|index_store| {
index_store.resolve_data_values_with_component_in_raw_range_limited(
entity_tag,
index,
bounds,
continuation,
usize::MAX,
component_index,
None,
)
})
}
const fn loop_control_from_projection_continuation(action: LoopAction) -> KeyStreamLoopControl {
match action {
LoopAction::Skip => KeyStreamLoopControl::Skip,
LoopAction::Emit => KeyStreamLoopControl::Emit,
LoopAction::Stop => KeyStreamLoopControl::Stop,
}
}
}
fn project_distinct_field_values_from_structural_projection(
projected_values: ValueProjection,
) -> Result<Vec<Value>, InternalError> {
let mut distinct_values = GroupKeySet::default();
let mut distinct_projected_values = Vec::new();
for (_, value) in projected_values {
if !insert_materialized_distinct_value(&mut distinct_values, &value)? {
continue;
}
distinct_projected_values.push(value);
}
Ok(distinct_projected_values)
}