use crate::{
db::{
access::single_path_capabilities,
direction::Direction,
executor::{ExecutionPreparation, preparation::slot_map_for_model_plan},
predicate::IndexPredicateCapability,
query::plan::{
AccessPlannedQuery, CoveringExistingRowMode, CoveringProjectionOrder,
CoveringReadExecutionPlan, CoveringReadFieldSource, covering_read_execution_plan,
index_covering_existing_rows_terminal_eligible,
},
registry::StoreHandle,
},
model::entity::EntityModel,
};
use std::ops::Bound;
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(in crate::db::executor) enum BytesTerminalFastPathContract {
PrimaryKeyWindow(Direction),
OrderedKeyStreamWindow(Direction),
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(in crate::db::executor) enum CountTerminalFastPathContract {
PrimaryKeyCardinality,
PrimaryKeyExistingRows(Direction),
IndexCoveringExistingRows(Direction),
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(in crate::db::executor) enum ExistsTerminalFastPathContract {
IndexCoveringExistingRows(Direction),
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub(in crate::db::executor) enum LoadTerminalFastPathContract {
CoveringRead(CoveringReadExecutionPlan),
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
enum SecondaryWitnessValidatedCoveringCohort {
OrderOnlySingleField,
CompositeOrderOnly,
EqualityPrefixPrimaryKeyOrder,
BoundedRangeSingleField,
CompositeEqualityPrefixSuffixOrder,
CompositeBoundedRangeSuffixOrder,
}
impl SecondaryWitnessValidatedCoveringCohort {
const fn matches_order_contract(self, order_contract: CoveringProjectionOrder) -> bool {
matches!(
(self, order_contract),
(
Self::OrderOnlySingleField | Self::BoundedRangeSingleField,
CoveringProjectionOrder::IndexOrder(_)
) | (
Self::CompositeOrderOnly
| Self::CompositeEqualityPrefixSuffixOrder
| Self::CompositeBoundedRangeSuffixOrder,
CoveringProjectionOrder::IndexOrder(Direction::Asc | Direction::Desc)
) | (
Self::EqualityPrefixPrimaryKeyOrder,
CoveringProjectionOrder::PrimaryKeyOrder(_)
)
)
}
const fn matches_field_source_counts(
self,
field_count: usize,
component_field_count: usize,
constant_field_count: usize,
) -> bool {
if field_count == 0 {
return false;
}
match self {
Self::OrderOnlySingleField | Self::BoundedRangeSingleField => {
component_field_count <= 1
&& constant_field_count == 0
&& component_field_count <= field_count
}
Self::CompositeOrderOnly => {
component_field_count <= 2
&& constant_field_count == 0
&& component_field_count <= field_count
}
Self::EqualityPrefixPrimaryKeyOrder => {
component_field_count == 0 && constant_field_count <= 1
}
Self::CompositeEqualityPrefixSuffixOrder | Self::CompositeBoundedRangeSuffixOrder => {
component_field_count <= 1
&& constant_field_count <= 1
&& component_field_count.saturating_add(constant_field_count) <= field_count
}
}
}
const fn component_index_supported(self, component_index: usize) -> bool {
match self {
Self::OrderOnlySingleField | Self::BoundedRangeSingleField => component_index == 0,
Self::CompositeOrderOnly => component_index <= 1,
Self::EqualityPrefixPrimaryKeyOrder => false,
Self::CompositeEqualityPrefixSuffixOrder | Self::CompositeBoundedRangeSuffixOrder => {
component_index == 1
}
}
}
}
pub(in crate::db::executor) fn promote_load_terminal_fast_path_with_secondary_authority_witness(
store: StoreHandle,
model: &'static EntityModel,
plan: &AccessPlannedQuery,
load_terminal_fast_path: &mut Option<LoadTerminalFastPathContract>,
) {
let Some(LoadTerminalFastPathContract::CoveringRead(covering)) = load_terminal_fast_path else {
return;
};
if !store.secondary_covering_authoritative()
|| !secondary_witness_validated_covering_eligible(model, plan, covering)
{
return;
}
covering.existing_row_mode = CoveringExistingRowMode::WitnessValidated;
}
pub(in crate::db::executor) fn promote_load_terminal_fast_path_with_storage_existence_witness(
store: StoreHandle,
plan: &AccessPlannedQuery,
load_terminal_fast_path: &mut Option<LoadTerminalFastPathContract>,
) {
let Some(LoadTerminalFastPathContract::CoveringRead(covering)) = load_terminal_fast_path else {
return;
};
if store.secondary_covering_authoritative()
|| !store.secondary_existence_witness_authoritative()
|| !secondary_storage_existence_witness_covering_eligible(plan, covering)
{
return;
}
covering.existing_row_mode = CoveringExistingRowMode::StorageExistenceWitness;
}
fn plan_has_predicate(plan: &AccessPlannedQuery) -> bool {
plan.has_residual_predicate()
}
const fn plan_has_no_distinct(plan: &AccessPlannedQuery) -> bool {
!plan.scalar_plan().distinct
}
fn secondary_storage_existence_witness_covering_eligible(
plan: &AccessPlannedQuery,
covering: &CoveringReadExecutionPlan,
) -> bool {
if !plan_has_no_distinct(plan)
|| plan_has_predicate(plan)
|| covering.existing_row_mode != CoveringExistingRowMode::RequiresRowPresenceCheck
{
return false;
}
if secondary_storage_existence_witness_equality_prefix_covering_eligible(plan, covering) {
return true;
}
let Some(index_field_count) = storage_existence_witness_index_field_count(plan) else {
return false;
};
let mut component_zero_count = 0usize;
let mut component_one_count = 0usize;
let mut primary_key_count = 0usize;
for field in &covering.fields {
match field.source {
CoveringReadFieldSource::IndexComponent { component_index: 0 } => {
component_zero_count = component_zero_count.saturating_add(1);
}
CoveringReadFieldSource::IndexComponent { component_index: 1 } => {
component_one_count = component_one_count.saturating_add(1);
}
CoveringReadFieldSource::PrimaryKey => {
primary_key_count = primary_key_count.saturating_add(1);
}
_ => return false,
}
}
match index_field_count {
1 => {
matches!(
covering.order_contract,
CoveringProjectionOrder::IndexOrder(_)
) && component_zero_count == 1
&& component_one_count == 0
&& primary_key_count <= 1
&& covering.fields.len() == component_zero_count + primary_key_count
}
2 => {
let full_composite = matches!(
covering.order_contract,
CoveringProjectionOrder::IndexOrder(Direction::Asc | Direction::Desc)
) && component_zero_count == 1
&& component_one_count == 1
&& primary_key_count <= 1
&& covering.fields.len()
== component_zero_count + component_one_count + primary_key_count;
let leading_component_plus_pk = matches!(
covering.order_contract,
CoveringProjectionOrder::IndexOrder(Direction::Asc | Direction::Desc)
) && component_zero_count == 1
&& component_one_count == 0
&& primary_key_count == 1
&& covering.fields.len() == component_zero_count + primary_key_count;
full_composite || leading_component_plus_pk
}
_ => false,
}
}
fn secondary_storage_existence_witness_equality_prefix_covering_eligible(
plan: &AccessPlannedQuery,
covering: &CoveringReadExecutionPlan,
) -> bool {
let Some((index, prefix_values)) = plan.access.as_index_prefix_path() else {
return false;
};
if index.fields().len() != 2 || prefix_values.len() != 1 {
return false;
}
let mut component_zero_count = 0usize;
let mut component_one_count = 0usize;
let mut constant_count = 0usize;
let mut primary_key_count = 0usize;
for field in &covering.fields {
match field.source {
CoveringReadFieldSource::IndexComponent { component_index: 0 } => {
component_zero_count = component_zero_count.saturating_add(1);
}
CoveringReadFieldSource::IndexComponent { component_index: 1 } => {
component_one_count = component_one_count.saturating_add(1);
}
CoveringReadFieldSource::IndexComponent { component_index: _ } => {
return false;
}
CoveringReadFieldSource::Constant(_) => {
constant_count = constant_count.saturating_add(1);
}
CoveringReadFieldSource::PrimaryKey => {
primary_key_count = primary_key_count.saturating_add(1);
}
}
}
let suffix_order = matches!(
covering.order_contract,
CoveringProjectionOrder::IndexOrder(Direction::Asc | Direction::Desc)
);
let equality_prefix_suffix_order = suffix_order
&& component_zero_count == 0
&& component_one_count == 1
&& constant_count == 1
&& primary_key_count == 1
&& covering.fields.len()
== component_one_count
.saturating_add(constant_count)
.saturating_add(primary_key_count);
let equality_prefix_constant_plus_pk = matches!(
covering.order_contract,
CoveringProjectionOrder::IndexOrder(Direction::Asc | Direction::Desc)
) && component_zero_count == 0
&& component_one_count == 0
&& constant_count == 1
&& primary_key_count == 1
&& covering.fields.len() == constant_count.saturating_add(primary_key_count);
equality_prefix_suffix_order || equality_prefix_constant_plus_pk
}
fn storage_existence_witness_index_field_count(plan: &AccessPlannedQuery) -> Option<usize> {
match plan.access.as_index_prefix_path() {
Some((index, [])) => Some(index.fields().len()),
_ => match plan.access.as_index_range_path() {
Some((index, [], Bound::Unbounded, Bound::Unbounded)) => Some(index.fields().len()),
_ => None,
},
}
}
fn unordered_or_primary_key_order_direction_for_model(
model: &EntityModel,
plan: &AccessPlannedQuery,
) -> Option<Direction> {
let Some(order) = plan.scalar_plan().order.as_ref() else {
return Some(Direction::Asc);
};
order
.primary_key_only_direction(model.primary_key().name)
.map(|direction| match direction {
crate::db::query::plan::OrderDirection::Asc => Direction::Asc,
crate::db::query::plan::OrderDirection::Desc => Direction::Desc,
})
}
pub(in crate::db::executor) fn derive_count_terminal_fast_path_contract_for_model(
model: &EntityModel,
plan: &AccessPlannedQuery,
strict_predicate_compatible: bool,
) -> Option<CountTerminalFastPathContract> {
let access_strategy = plan.access.resolve_strategy();
let capabilities = access_strategy.as_path().map(single_path_capabilities)?;
(plan_has_no_distinct(plan)
&& !plan_has_predicate(plan)
&& capabilities.supports_count_terminal_primary_key_cardinality())
.then_some(CountTerminalFastPathContract::PrimaryKeyCardinality)
.or_else(|| {
let direction = unordered_or_primary_key_order_direction_for_model(model, plan)?;
(!plan_has_predicate(plan)
&& capabilities.supports_count_terminal_primary_key_existing_rows())
.then_some(CountTerminalFastPathContract::PrimaryKeyExistingRows(
direction,
))
})
.or_else(|| {
index_covering_existing_rows_terminal_eligible(plan, strict_predicate_compatible).then_some(
CountTerminalFastPathContract::IndexCoveringExistingRows(Direction::Asc),
)
})
}
pub(in crate::db::executor) fn derive_exists_terminal_fast_path_contract_for_model(
plan: &AccessPlannedQuery,
strict_predicate_compatible: bool,
) -> Option<ExistsTerminalFastPathContract> {
index_covering_existing_rows_terminal_eligible(plan, strict_predicate_compatible).then_some(
ExistsTerminalFastPathContract::IndexCoveringExistingRows(Direction::Asc),
)
}
pub(in crate::db::executor) fn derive_load_terminal_fast_path_contract_for_model(
model: &EntityModel,
plan: &AccessPlannedQuery,
strict_predicate_compatible: bool,
) -> Option<LoadTerminalFastPathContract> {
covering_read_execution_plan(
model,
plan,
model.primary_key.name,
strict_predicate_compatible,
)
.map(LoadTerminalFastPathContract::CoveringRead)
}
pub(in crate::db::executor) fn derive_load_terminal_fast_path_contract_for_model_plan(
model: &'static EntityModel,
plan: &AccessPlannedQuery,
) -> Option<LoadTerminalFastPathContract> {
if !plan.scalar_plan().mode.is_load() {
return None;
}
let execution_preparation =
ExecutionPreparation::from_plan(model, plan, slot_map_for_model_plan(model, plan));
let strict_predicate_compatible = !plan.has_residual_predicate()
|| execution_preparation
.predicate_capability_profile()
.is_some_and(|profile| profile.index() == IndexPredicateCapability::FullyIndexable);
derive_load_terminal_fast_path_contract_for_model(model, plan, strict_predicate_compatible)
}
fn secondary_witness_validated_covering_eligible(
model: &'static EntityModel,
plan: &AccessPlannedQuery,
covering: &CoveringReadExecutionPlan,
) -> bool {
if !plan.scalar_plan().mode.is_load()
|| !plan_predicate_is_absent_or_fully_indexable(model, plan)
|| plan.scalar_plan().distinct
|| covering.existing_row_mode != CoveringExistingRowMode::RequiresRowPresenceCheck
{
return false;
}
let Some(cohort) = secondary_witness_validated_covering_cohort(plan, covering) else {
return false;
};
let Some(primary_key_slot) = model
.fields
.iter()
.position(|field| field.name == model.primary_key().name)
else {
return false;
};
let mut component_field_count = 0usize;
let mut constant_field_count = 0usize;
for field in &covering.fields {
match field.source {
CoveringReadFieldSource::PrimaryKey => {
if field.field_slot.index != primary_key_slot {
return false;
}
}
CoveringReadFieldSource::IndexComponent { component_index } => {
if !cohort.component_index_supported(component_index) {
return false;
}
component_field_count = component_field_count.saturating_add(1);
}
CoveringReadFieldSource::Constant(_) => {
constant_field_count = constant_field_count.saturating_add(1);
}
}
}
cohort.matches_field_source_counts(
covering.fields.len(),
component_field_count,
constant_field_count,
)
}
fn plan_predicate_is_absent_or_fully_indexable(
model: &'static EntityModel,
plan: &AccessPlannedQuery,
) -> bool {
if plan.scalar_plan().predicate.is_none() {
return true;
}
let execution_preparation =
ExecutionPreparation::from_plan(model, plan, slot_map_for_model_plan(model, plan));
execution_preparation
.predicate_capability_profile()
.is_some_and(|profile| profile.index() == IndexPredicateCapability::FullyIndexable)
}
fn secondary_witness_validated_covering_cohort(
plan: &AccessPlannedQuery,
covering: &CoveringReadExecutionPlan,
) -> Option<SecondaryWitnessValidatedCoveringCohort> {
if let Some((index, prefix_values)) = plan.access.as_index_prefix_path() {
let cohort = match prefix_values.len() {
0 if index.fields().len() == 1 => {
Some(SecondaryWitnessValidatedCoveringCohort::OrderOnlySingleField)
}
0 if index.fields().len() == 2 => {
Some(SecondaryWitnessValidatedCoveringCohort::CompositeOrderOnly)
}
1 if index.fields().len() == 1 => {
Some(SecondaryWitnessValidatedCoveringCohort::EqualityPrefixPrimaryKeyOrder)
}
1 if index.fields().len() == 2 => {
Some(SecondaryWitnessValidatedCoveringCohort::CompositeEqualityPrefixSuffixOrder)
}
_ => None,
};
return cohort.filter(|cohort| cohort.matches_order_contract(covering.order_contract));
}
if let Some((index, prefix_values, _, _)) = plan.access.as_index_range_path() {
let cohort = match (index.fields().len(), prefix_values.len()) {
(1, 0) => Some(SecondaryWitnessValidatedCoveringCohort::BoundedRangeSingleField),
(2, 0) => Some(SecondaryWitnessValidatedCoveringCohort::CompositeOrderOnly),
(2, 1) => {
Some(SecondaryWitnessValidatedCoveringCohort::CompositeBoundedRangeSuffixOrder)
}
_ => None,
}?;
return cohort
.matches_order_contract(covering.order_contract)
.then_some(cohort);
}
None
}