use crate::{
db::{
Db,
access::lower_access,
data::{DataStore, DecodedDataStoreKey},
executor::projection::covering::{
CoveringProjectionMetricsRecorder,
contracts::{
AccessPlannedQuery, CoveringExistingRowMode, CoveringHybridReadExecutionPlan,
CoveringReadField, CoveringReadFieldSource,
},
shared::{
access_preserves_primary_key_order_for_covering_window, apply_covering_page_window,
covering_projection_component_indices, covering_scan_window,
decode_hybrid_covering_components,
},
},
executor::{
CoveringProjectionComponentRows, EntityAuthority, ExecutorPlanError,
resolve_covering_projection_components_from_lowered_specs, terminal::RowLayout,
},
index::predicate::IndexPredicateExecution,
},
error::InternalError,
traits::CanisterKind,
value::Value,
};
use std::collections::BTreeMap;
pub(super) fn try_execute_hybrid_covering_projection_rows_with_plan_for_canister<C>(
db: &Db<C>,
authority: EntityAuthority,
plan: &AccessPlannedQuery,
metrics: CoveringProjectionMetricsRecorder,
hybrid: &CoveringHybridReadExecutionPlan,
index_predicate_execution: Option<IndexPredicateExecution<'_>>,
) -> Result<Option<Vec<Vec<Value>>>, InternalError>
where
C: CanisterKind,
{
if plan.has_residual_filter_expr() {
return Ok(None);
}
if plan.has_residual_filter_predicate()
&& (!hybrid.strict_predicate_compatible || index_predicate_execution.is_none())
{
return Ok(None);
}
if plan.access.as_index_prefix_contract_path().is_none()
&& plan.access.as_index_branch_set_spec_path().is_none()
&& plan.access.as_index_range_path().is_none()
{
return Ok(None);
}
let component_indices = covering_projection_component_indices(hybrid.fields.as_slice());
let row_field_slots = hybrid_projection_row_field_slots(hybrid.fields.as_slice());
let store = db.recovered_store(authority.store_path())?;
let lowered_access =
lower_access(authority.entity_tag(), &plan.access).map_err(|err| match err {
crate::db::access::LoweredAccessError::IndexPrefix => {
ExecutorPlanError::lowered_index_prefix_spec_invalid().into_internal_error()
}
crate::db::access::LoweredAccessError::IndexRange => {
ExecutorPlanError::lowered_index_range_spec_invalid().into_internal_error()
}
})?;
let index_prefix_specs = lowered_access.index_prefix_specs();
let index_range_specs = lowered_access.index_range_specs();
let row_presence_proven = hybrid.existing_row_mode == CoveringExistingRowMode::ProvenByPlanner;
let scan_window = covering_scan_window(
hybrid.order_contract,
access_preserves_primary_key_order_for_covering_window(plan, hybrid.order_contract),
row_presence_proven,
plan.scalar_plan().distinct,
plan.scalar_plan().page.as_ref(),
);
let raw_pairs = resolve_covering_projection_components_from_lowered_specs(
authority.entity_tag(),
index_prefix_specs,
index_range_specs,
scan_window.direction,
scan_window.limit,
component_indices.as_slice(),
index_predicate_execution,
|store_path| db.recovered_store(store_path),
)?;
metrics.record_hybrid_path_hit();
let row_layout = authority.row_layout();
store.with_data(|data_store| {
let projected_rows = if row_presence_proven {
execute_hybrid_covering_projection_with_proven_rows(
&row_layout,
data_store,
plan,
hybrid,
component_indices.as_slice(),
row_field_slots.as_slice(),
scan_window.page_skip_count,
scan_window.page_window_applied,
raw_pairs,
metrics,
)?
} else {
execute_hybrid_covering_projection_with_checked_rows(
&row_layout,
data_store,
plan,
hybrid,
component_indices.as_slice(),
row_field_slots.as_slice(),
scan_window.page_skip_count,
scan_window.page_window_applied,
raw_pairs,
metrics,
)?
};
Ok(Some(projected_rows))
})
}
#[expect(clippy::too_many_arguments)]
fn execute_hybrid_covering_projection_with_proven_rows(
row_layout: &RowLayout,
data_store: &DataStore,
plan: &AccessPlannedQuery,
hybrid: &CoveringHybridReadExecutionPlan,
component_indices: &[usize],
row_field_slots: &[usize],
page_skip_count: usize,
page_window_applied: bool,
raw_pairs: CoveringProjectionComponentRows,
metrics: CoveringProjectionMetricsRecorder,
) -> Result<Vec<Vec<Value>>, InternalError> {
let mut keyed_components = Vec::with_capacity(raw_pairs.len().saturating_sub(page_skip_count));
for (data_key, _existence_witness, components) in raw_pairs.into_iter().skip(page_skip_count) {
keyed_components.push((data_key, components));
}
crate::db::executor::reorder_covering_projection_pairs(
hybrid.order_contract,
keyed_components.as_mut_slice(),
);
apply_covering_page_window(
plan.scalar_plan().distinct,
plan.scalar_plan().page.as_ref(),
page_window_applied,
&mut keyed_components,
);
let mut projected_rows = Vec::with_capacity(keyed_components.len());
for (data_key, components) in keyed_components {
let sparse_row_fields = read_hybrid_projection_row_fields_from_store(
row_layout,
data_store,
&data_key,
row_field_slots,
)?
.ok_or_else(InternalError::query_executor_invariant)?;
let decoded_components = decode_hybrid_covering_components(component_indices, components)?;
let projected_row = project_hybrid_covering_row(
&data_key,
hybrid.fields.as_slice(),
decoded_components,
sparse_row_fields,
metrics,
)?;
projected_rows.push(projected_row);
}
Ok(projected_rows)
}
#[expect(clippy::too_many_arguments)]
fn execute_hybrid_covering_projection_with_checked_rows(
row_layout: &RowLayout,
data_store: &DataStore,
plan: &AccessPlannedQuery,
hybrid: &CoveringHybridReadExecutionPlan,
component_indices: &[usize],
row_field_slots: &[usize],
page_skip_count: usize,
page_window_applied: bool,
raw_pairs: CoveringProjectionComponentRows,
metrics: CoveringProjectionMetricsRecorder,
) -> Result<Vec<Vec<Value>>, InternalError> {
let mut projected_rows = Vec::with_capacity(raw_pairs.len().saturating_sub(page_skip_count));
let mut projected_row_count = 0usize;
for (data_key, _existence_witness, components) in raw_pairs {
let sparse_row_fields = read_hybrid_projection_row_fields_from_store(
row_layout,
data_store,
&data_key,
row_field_slots,
)?;
let Some(sparse_row_fields) = sparse_row_fields else {
continue;
};
if projected_row_count < page_skip_count {
projected_row_count = projected_row_count.saturating_add(1);
continue;
}
let decoded_components = decode_hybrid_covering_components(component_indices, components)?;
let projected_row = project_hybrid_covering_row(
&data_key,
hybrid.fields.as_slice(),
decoded_components,
sparse_row_fields,
metrics,
)?;
projected_rows.push((data_key, projected_row));
projected_row_count = projected_row_count.saturating_add(1);
}
crate::db::executor::reorder_covering_projection_pairs(
hybrid.order_contract,
projected_rows.as_mut_slice(),
);
apply_covering_page_window(
plan.scalar_plan().distinct,
plan.scalar_plan().page.as_ref(),
page_window_applied,
&mut projected_rows,
);
Ok(projected_rows
.into_iter()
.map(|(_data_key, row)| row)
.collect())
}
fn hybrid_projection_row_field_slots(fields: &[CoveringReadField]) -> Vec<usize> {
let mut row_field_slots = Vec::with_capacity(fields.len());
for field in fields {
if !matches!(field.source, CoveringReadFieldSource::RowField) {
continue;
}
if row_field_slots.contains(&field.field_slot.index()) {
continue;
}
row_field_slots.push(field.field_slot.index());
}
row_field_slots
}
fn read_hybrid_projection_row_fields_from_store(
row_layout: &RowLayout,
data_store: &DataStore,
data_key: &DecodedDataStoreKey,
row_field_slots: &[usize],
) -> Result<Option<BTreeMap<usize, Value>>, InternalError> {
if row_field_slots.is_empty() {
return Ok(Some(BTreeMap::new()));
}
let raw_key = data_key.to_raw()?;
let Some(raw_row) = data_store.get(&raw_key) else {
return Ok(None);
};
if let [required_slot] = row_field_slots {
let Some(value) =
row_layout.decode_required_value_from_data_key(&raw_row, data_key, *required_slot)?
else {
return Err(InternalError::query_executor_invariant());
};
let mut row_fields = BTreeMap::new();
row_fields.insert(*required_slot, value);
return Ok(Some(row_fields));
}
let decoded =
row_layout.decode_indexed_values_from_data_key(&raw_row, data_key, row_field_slots)?;
let mut row_fields = BTreeMap::new();
for (slot, value) in row_field_slots.iter().copied().zip(decoded) {
let Some(value) = value else {
return Err(InternalError::query_executor_invariant());
};
row_fields.insert(slot, value);
}
Ok(Some(row_fields))
}
fn project_hybrid_covering_row(
data_key: &DecodedDataStoreKey,
fields: &[CoveringReadField],
mut decoded_components: BTreeMap<usize, Value>,
mut row_fields: BTreeMap<usize, Value>,
metrics: CoveringProjectionMetricsRecorder,
) -> Result<Vec<Value>, InternalError> {
let mut projected = Vec::with_capacity(fields.len());
let mut remaining_index_component_uses = covering_index_component_use_counts(fields);
let mut remaining_row_field_uses = covering_row_field_use_counts(fields);
for field in fields {
let value = match &field.source {
CoveringReadFieldSource::IndexComponent { component_index }
| CoveringReadFieldSource::IndexExpressionComponent { component_index } => {
metrics.record_hybrid_index_field_access();
take_or_clone_last_covering_value(
&mut decoded_components,
&mut remaining_index_component_uses,
*component_index,
)?
}
CoveringReadFieldSource::PrimaryKey { component_index } => {
data_key.primary_key_component_runtime_value(*component_index)?
}
CoveringReadFieldSource::Constant(value) => value.clone(),
CoveringReadFieldSource::RowField => {
metrics.record_hybrid_row_field_access();
take_or_clone_last_covering_value(
&mut row_fields,
&mut remaining_row_field_uses,
field.field_slot.index(),
)?
}
};
projected.push(value);
}
Ok(projected)
}
fn covering_index_component_use_counts(fields: &[CoveringReadField]) -> BTreeMap<usize, usize> {
let mut counts = BTreeMap::new();
for field in fields {
let component_index = match &field.source {
CoveringReadFieldSource::IndexComponent { component_index }
| CoveringReadFieldSource::IndexExpressionComponent { component_index } => {
component_index
}
CoveringReadFieldSource::PrimaryKey { .. }
| CoveringReadFieldSource::Constant(_)
| CoveringReadFieldSource::RowField => continue,
};
*counts.entry(*component_index).or_insert(0) += 1;
}
counts
}
fn covering_row_field_use_counts(fields: &[CoveringReadField]) -> BTreeMap<usize, usize> {
let mut counts = BTreeMap::new();
for field in fields {
if !matches!(field.source, CoveringReadFieldSource::RowField) {
continue;
}
*counts.entry(field.field_slot.index()).or_insert(0) += 1;
}
counts
}
fn take_or_clone_last_covering_value(
values: &mut BTreeMap<usize, Value>,
remaining_uses: &mut BTreeMap<usize, usize>,
slot: usize,
) -> Result<Value, InternalError> {
let Some(remaining) = remaining_uses.get_mut(&slot) else {
return Err(InternalError::query_executor_invariant());
};
*remaining = remaining.saturating_sub(1);
if *remaining == 0 {
return values
.remove(&slot)
.ok_or_else(InternalError::query_executor_invariant);
}
values
.get(&slot)
.cloned()
.ok_or_else(InternalError::query_executor_invariant)
}