use crate::{
db::{
access::single_path_capabilities,
direction::Direction,
executor::{ExecutionPreparation, preparation::slot_map_for_model_plan},
predicate::IndexPredicateCapability,
query::plan::{
AccessPlannedQuery, CoveringExistingRowMode, CoveringProjectionOrder,
CoveringReadExecutionPlan, CoveringReadFieldSource, covering_read_execution_plan,
index_covering_existing_rows_terminal_eligible,
},
registry::StoreHandle,
},
model::entity::EntityModel,
};
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(in crate::db::executor) enum BytesTerminalFastPathContract {
PrimaryKeyWindow(Direction),
OrderedKeyStreamWindow(Direction),
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(in crate::db::executor) enum CountTerminalFastPathContract {
PrimaryKeyCardinality,
PrimaryKeyExistingRows(Direction),
IndexCoveringExistingRows(Direction),
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(in crate::db::executor) enum ExistsTerminalFastPathContract {
IndexCoveringExistingRows(Direction),
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub(in crate::db::executor) enum LoadTerminalFastPathContract {
CoveringRead(CoveringReadExecutionPlan),
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
enum SecondaryWitnessValidatedCoveringCohort {
OrderOnlySingleField,
EqualityPrefixPrimaryKeyOrder,
BoundedRangeSingleField,
CompositeEqualityPrefixSuffixOrder,
}
impl SecondaryWitnessValidatedCoveringCohort {
const fn matches_order_contract(self, order_contract: CoveringProjectionOrder) -> bool {
matches!(
(self, order_contract),
(
Self::OrderOnlySingleField | Self::BoundedRangeSingleField,
CoveringProjectionOrder::IndexOrder(_)
) | (
Self::CompositeEqualityPrefixSuffixOrder,
CoveringProjectionOrder::IndexOrder(Direction::Asc)
) | (
Self::EqualityPrefixPrimaryKeyOrder,
CoveringProjectionOrder::PrimaryKeyOrder(_)
)
)
}
const fn matches_field_source_counts(
self,
field_count: usize,
component_field_count: usize,
constant_field_count: usize,
expected_component_field_present: bool,
) -> bool {
match self {
Self::OrderOnlySingleField | Self::BoundedRangeSingleField => {
field_count == 2
&& component_field_count == 1
&& constant_field_count == 0
&& expected_component_field_present
}
Self::EqualityPrefixPrimaryKeyOrder => {
field_count == 2
&& component_field_count == 0
&& constant_field_count == 1
&& !expected_component_field_present
}
Self::CompositeEqualityPrefixSuffixOrder => {
field_count == 3
&& component_field_count == 1
&& constant_field_count == 1
&& expected_component_field_present
}
}
}
}
pub(in crate::db::executor) fn promote_load_terminal_fast_path_with_secondary_authority_witness(
store: StoreHandle,
model: &'static EntityModel,
plan: &AccessPlannedQuery,
load_terminal_fast_path: &mut Option<LoadTerminalFastPathContract>,
) {
let Some(LoadTerminalFastPathContract::CoveringRead(covering)) = load_terminal_fast_path else {
return;
};
if !store.secondary_covering_authoritative()
|| !secondary_witness_validated_covering_eligible(model, plan, covering)
{
return;
}
covering.existing_row_mode = CoveringExistingRowMode::WitnessValidated;
}
fn plan_has_predicate(plan: &AccessPlannedQuery) -> bool {
plan.has_residual_predicate()
}
fn plan_has_no_predicate_or_distinct(plan: &AccessPlannedQuery) -> bool {
!plan_has_predicate(plan) && !plan.scalar_plan().distinct
}
fn unordered_or_primary_key_order_direction_for_model(
model: &EntityModel,
plan: &AccessPlannedQuery,
) -> Option<Direction> {
let Some(order) = plan.scalar_plan().order.as_ref() else {
return Some(Direction::Asc);
};
order
.primary_key_only_direction(model.primary_key().name)
.map(|direction| match direction {
crate::db::query::plan::OrderDirection::Asc => Direction::Asc,
crate::db::query::plan::OrderDirection::Desc => Direction::Desc,
})
}
pub(in crate::db::executor) fn derive_count_terminal_fast_path_contract_for_model(
model: &EntityModel,
plan: &AccessPlannedQuery,
strict_predicate_compatible: bool,
) -> Option<CountTerminalFastPathContract> {
let access_strategy = plan.access.resolve_strategy();
let capabilities = access_strategy.as_path().map(single_path_capabilities)?;
(plan_has_no_predicate_or_distinct(plan)
&& capabilities.supports_count_terminal_primary_key_cardinality())
.then_some(CountTerminalFastPathContract::PrimaryKeyCardinality)
.or_else(|| {
let direction = unordered_or_primary_key_order_direction_for_model(model, plan)?;
(!plan_has_predicate(plan)
&& capabilities.supports_count_terminal_primary_key_existing_rows())
.then_some(CountTerminalFastPathContract::PrimaryKeyExistingRows(
direction,
))
})
.or_else(|| {
index_covering_existing_rows_terminal_eligible(plan, strict_predicate_compatible).then_some(
CountTerminalFastPathContract::IndexCoveringExistingRows(Direction::Asc),
)
})
}
pub(in crate::db::executor) fn derive_exists_terminal_fast_path_contract_for_model(
plan: &AccessPlannedQuery,
strict_predicate_compatible: bool,
) -> Option<ExistsTerminalFastPathContract> {
index_covering_existing_rows_terminal_eligible(plan, strict_predicate_compatible).then_some(
ExistsTerminalFastPathContract::IndexCoveringExistingRows(Direction::Asc),
)
}
pub(in crate::db::executor) fn derive_load_terminal_fast_path_contract_for_model(
model: &EntityModel,
plan: &AccessPlannedQuery,
strict_predicate_compatible: bool,
) -> Option<LoadTerminalFastPathContract> {
covering_read_execution_plan(
model,
plan,
model.primary_key.name,
strict_predicate_compatible,
)
.map(LoadTerminalFastPathContract::CoveringRead)
}
pub(in crate::db::executor) fn derive_load_terminal_fast_path_contract_for_model_plan(
model: &'static EntityModel,
plan: &AccessPlannedQuery,
) -> Option<LoadTerminalFastPathContract> {
if !plan.scalar_plan().mode.is_load() {
return None;
}
let execution_preparation =
ExecutionPreparation::from_plan(model, plan, slot_map_for_model_plan(model, plan));
let strict_predicate_compatible = !plan.has_residual_predicate()
|| execution_preparation
.predicate_capability_profile()
.is_some_and(|profile| profile.index() == IndexPredicateCapability::FullyIndexable);
derive_load_terminal_fast_path_contract_for_model(model, plan, strict_predicate_compatible)
}
fn secondary_witness_validated_covering_eligible(
model: &'static EntityModel,
plan: &AccessPlannedQuery,
covering: &CoveringReadExecutionPlan,
) -> bool {
if !plan.scalar_plan().mode.is_load()
|| !plan_predicate_is_absent_or_fully_indexable(model, plan)
|| plan.scalar_plan().distinct
|| plan
.scalar_plan()
.page
.as_ref()
.is_some_and(|page| page.offset != 0)
|| covering.existing_row_mode != CoveringExistingRowMode::RequiresRowPresenceCheck
{
return false;
}
let Some(cohort) = secondary_witness_validated_covering_cohort(plan, covering) else {
return false;
};
let Some(primary_key_slot) = model
.fields
.iter()
.position(|field| field.name == model.primary_key().name)
else {
return false;
};
let mut component_field_count = 0usize;
let mut constant_field_count = 0usize;
let mut expected_component_field_present = false;
for field in &covering.fields {
match field.source {
CoveringReadFieldSource::PrimaryKey => {
if field.field_slot.index != primary_key_slot {
return false;
}
}
CoveringReadFieldSource::IndexComponent { component_index } => {
if component_index != match cohort {
SecondaryWitnessValidatedCoveringCohort::CompositeEqualityPrefixSuffixOrder => {
1
}
_ => 0,
} {
return false;
}
component_field_count = component_field_count.saturating_add(1);
expected_component_field_present = true;
}
CoveringReadFieldSource::Constant(_) => {
constant_field_count = constant_field_count.saturating_add(1);
}
}
}
cohort.matches_field_source_counts(
covering.fields.len(),
component_field_count,
constant_field_count,
expected_component_field_present,
)
}
fn plan_predicate_is_absent_or_fully_indexable(
model: &'static EntityModel,
plan: &AccessPlannedQuery,
) -> bool {
if plan.scalar_plan().predicate.is_none() {
return true;
}
let execution_preparation =
ExecutionPreparation::from_plan(model, plan, slot_map_for_model_plan(model, plan));
execution_preparation
.predicate_capability_profile()
.is_some_and(|profile| profile.index() == IndexPredicateCapability::FullyIndexable)
}
fn secondary_witness_validated_covering_cohort(
plan: &AccessPlannedQuery,
covering: &CoveringReadExecutionPlan,
) -> Option<SecondaryWitnessValidatedCoveringCohort> {
if let Some((index, prefix_values)) = plan.access.as_index_prefix_path() {
let cohort = match prefix_values.len() {
0 if index.fields().len() == 1 => {
Some(SecondaryWitnessValidatedCoveringCohort::OrderOnlySingleField)
}
1 if index.fields().len() == 1 => {
Some(SecondaryWitnessValidatedCoveringCohort::EqualityPrefixPrimaryKeyOrder)
}
1 if index.fields().len() == 2 => {
Some(SecondaryWitnessValidatedCoveringCohort::CompositeEqualityPrefixSuffixOrder)
}
_ => None,
};
return cohort.filter(|cohort| cohort.matches_order_contract(covering.order_contract));
}
if let Some((index, prefix_values, _, _)) = plan.access.as_index_range_path() {
if index.fields().len() != 1 || !prefix_values.is_empty() {
return None;
}
let cohort = SecondaryWitnessValidatedCoveringCohort::BoundedRangeSingleField;
return cohort
.matches_order_contract(covering.order_contract)
.then_some(cohort);
}
None
}