icydb-core 0.69.1

IcyDB — A type-safe, embedded ORM and schema system for the Internet Computer
Documentation
//! Module: executor::route::terminal
//! Responsibility: route-owned terminal fast-path contracts.
//! Does not own: terminal execution mechanics.
//! Boundary: canonical terminal eligibility derivation consumed by load/aggregate terminals.

use crate::{
    db::{
        access::single_path_capabilities,
        direction::Direction,
        executor::{ExecutionPreparation, preparation::slot_map_for_model_plan},
        predicate::IndexPredicateCapability,
        query::plan::{
            AccessPlannedQuery, CoveringExistingRowMode, CoveringProjectionOrder,
            CoveringReadExecutionPlan, CoveringReadFieldSource, covering_read_execution_plan,
            index_covering_existing_rows_terminal_eligible,
        },
        registry::StoreHandle,
    },
    model::entity::EntityModel,
};
///
/// BytesTerminalFastPathContract
///
/// Route-owned `bytes()` fast-path contract.
///

#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(in crate::db::executor) enum BytesTerminalFastPathContract {
    PrimaryKeyWindow(Direction),
    OrderedKeyStreamWindow(Direction),
}

///
/// CountTerminalFastPathContract
///
/// Route-owned `count()` fast-path contract.
///

#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(in crate::db::executor) enum CountTerminalFastPathContract {
    PrimaryKeyCardinality,
    PrimaryKeyExistingRows(Direction),
    IndexCoveringExistingRows(Direction),
}

///
/// ExistsTerminalFastPathContract
///
/// Route-owned `exists()` fast-path contract.
///

#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(in crate::db::executor) enum ExistsTerminalFastPathContract {
    IndexCoveringExistingRows(Direction),
}

///
/// LoadTerminalFastPathContract
///
/// Route-owned scalar load terminal fast-path contract.
/// This keeps planner-selected covering-read eligibility explicit so EXPLAIN
/// and later runtime consumers do not rediscover it ad hoc.
///

#[derive(Clone, Debug, Eq, PartialEq)]
pub(in crate::db::executor) enum LoadTerminalFastPathContract {
    CoveringRead(CoveringReadExecutionPlan),
}

///
/// SecondaryWitnessValidatedCoveringCohort
///
/// Route-owned classifier for the explicit secondary witness-backed covering
/// cohorts.
/// Each variant names one admitted single-field family so widening stays
/// centralized in one owner instead of growing more structural booleans.
///

#[derive(Clone, Copy, Debug, Eq, PartialEq)]
enum SecondaryWitnessValidatedCoveringCohort {
    OrderOnlySingleField,
    EqualityPrefixPrimaryKeyOrder,
    BoundedRangeSingleField,
    CompositeEqualityPrefixSuffixOrder,
}

impl SecondaryWitnessValidatedCoveringCohort {
    // Return whether one planner-owned covering-order contract matches this
    // explicit witness-backed secondary cohort.
    const fn matches_order_contract(self, order_contract: CoveringProjectionOrder) -> bool {
        matches!(
            (self, order_contract),
            (
                Self::OrderOnlySingleField | Self::BoundedRangeSingleField,
                CoveringProjectionOrder::IndexOrder(_)
            ) | (
                Self::CompositeEqualityPrefixSuffixOrder,
                CoveringProjectionOrder::IndexOrder(Direction::Asc)
            ) | (
                Self::EqualityPrefixPrimaryKeyOrder,
                CoveringProjectionOrder::PrimaryKeyOrder(_)
            )
        )
    }

    // Return whether one covering field-source layout matches this explicit
    // witness-backed secondary cohort.
    const fn matches_field_source_counts(
        self,
        field_count: usize,
        component_field_count: usize,
        constant_field_count: usize,
        expected_component_field_present: bool,
    ) -> bool {
        match self {
            Self::OrderOnlySingleField | Self::BoundedRangeSingleField => {
                field_count == 2
                    && component_field_count == 1
                    && constant_field_count == 0
                    && expected_component_field_present
            }
            Self::EqualityPrefixPrimaryKeyOrder => {
                field_count == 2
                    && component_field_count == 0
                    && constant_field_count == 1
                    && !expected_component_field_present
            }
            Self::CompositeEqualityPrefixSuffixOrder => {
                field_count == 3
                    && component_field_count == 1
                    && constant_field_count == 1
                    && expected_component_field_present
            }
        }
    }
}

// Promote one narrow secondary covering cohort onto witness-backed authority
// when the resolved store pair is synchronized and the route contract is
// otherwise already explicit covering-read.
pub(in crate::db::executor) fn promote_load_terminal_fast_path_with_secondary_authority_witness(
    store: StoreHandle,
    model: &'static EntityModel,
    plan: &AccessPlannedQuery,
    load_terminal_fast_path: &mut Option<LoadTerminalFastPathContract>,
) {
    let Some(LoadTerminalFastPathContract::CoveringRead(covering)) = load_terminal_fast_path else {
        return;
    };
    if !store.secondary_covering_authoritative()
        || !secondary_witness_validated_covering_eligible(model, plan, covering)
    {
        return;
    }

    covering.existing_row_mode = CoveringExistingRowMode::WitnessValidated;
}

// Return whether the structural plan still carries a residual predicate.
fn plan_has_predicate(plan: &AccessPlannedQuery) -> bool {
    plan.has_residual_predicate()
}

// Return whether the structural plan clears both residual-predicate and DISTINCT gates.
fn plan_has_no_predicate_or_distinct(plan: &AccessPlannedQuery) -> bool {
    !plan_has_predicate(plan) && !plan.scalar_plan().distinct
}

// Return one canonical scan direction for unordered plans or primary-key-only ordering.
fn unordered_or_primary_key_order_direction_for_model(
    model: &EntityModel,
    plan: &AccessPlannedQuery,
) -> Option<Direction> {
    let Some(order) = plan.scalar_plan().order.as_ref() else {
        return Some(Direction::Asc);
    };

    order
        .primary_key_only_direction(model.primary_key().name)
        .map(|direction| match direction {
            crate::db::query::plan::OrderDirection::Asc => Direction::Asc,
            crate::db::query::plan::OrderDirection::Desc => Direction::Desc,
        })
}

/// Derive one route-owned `count()` terminal fast-path contract from structural plan state.
pub(in crate::db::executor) fn derive_count_terminal_fast_path_contract_for_model(
    model: &EntityModel,
    plan: &AccessPlannedQuery,
    strict_predicate_compatible: bool,
) -> Option<CountTerminalFastPathContract> {
    let access_strategy = plan.access.resolve_strategy();
    let capabilities = access_strategy.as_path().map(single_path_capabilities)?;

    (plan_has_no_predicate_or_distinct(plan)
        && capabilities.supports_count_terminal_primary_key_cardinality())
    .then_some(CountTerminalFastPathContract::PrimaryKeyCardinality)
    .or_else(|| {
        let direction = unordered_or_primary_key_order_direction_for_model(model, plan)?;
        (!plan_has_predicate(plan)
            && capabilities.supports_count_terminal_primary_key_existing_rows())
        .then_some(CountTerminalFastPathContract::PrimaryKeyExistingRows(
            direction,
        ))
    })
    .or_else(|| {
        index_covering_existing_rows_terminal_eligible(plan, strict_predicate_compatible).then_some(
            CountTerminalFastPathContract::IndexCoveringExistingRows(Direction::Asc),
        )
    })
}

/// Derive one route-owned `exists()` terminal fast-path contract from structural plan state.
pub(in crate::db::executor) fn derive_exists_terminal_fast_path_contract_for_model(
    plan: &AccessPlannedQuery,
    strict_predicate_compatible: bool,
) -> Option<ExistsTerminalFastPathContract> {
    index_covering_existing_rows_terminal_eligible(plan, strict_predicate_compatible).then_some(
        ExistsTerminalFastPathContract::IndexCoveringExistingRows(Direction::Asc),
    )
}

/// Derive one route-owned scalar load terminal fast-path contract from the
/// planner-owned covering-read contract.
pub(in crate::db::executor) fn derive_load_terminal_fast_path_contract_for_model(
    model: &EntityModel,
    plan: &AccessPlannedQuery,
    strict_predicate_compatible: bool,
) -> Option<LoadTerminalFastPathContract> {
    covering_read_execution_plan(
        model,
        plan,
        model.primary_key.name,
        strict_predicate_compatible,
    )
    .map(LoadTerminalFastPathContract::CoveringRead)
}

/// Derive one route-owned scalar load terminal fast-path contract directly from
/// one structural model + plan boundary.
pub(in crate::db::executor) fn derive_load_terminal_fast_path_contract_for_model_plan(
    model: &'static EntityModel,
    plan: &AccessPlannedQuery,
) -> Option<LoadTerminalFastPathContract> {
    if !plan.scalar_plan().mode.is_load() {
        return None;
    }

    let execution_preparation =
        ExecutionPreparation::from_plan(model, plan, slot_map_for_model_plan(model, plan));
    let strict_predicate_compatible = !plan.has_residual_predicate()
        || execution_preparation
            .predicate_capability_profile()
            .is_some_and(|profile| profile.index() == IndexPredicateCapability::FullyIndexable);

    derive_load_terminal_fast_path_contract_for_model(model, plan, strict_predicate_compatible)
}

// Return whether one covering-read contract matches the first explicit
// witness-backed secondary authority cohort.
fn secondary_witness_validated_covering_eligible(
    model: &'static EntityModel,
    plan: &AccessPlannedQuery,
    covering: &CoveringReadExecutionPlan,
) -> bool {
    // Phase 1: keep the first witness-backed cohort extremely narrow and
    // single-field so the authority upgrade stays explicit.
    if !plan.scalar_plan().mode.is_load()
        || !plan_predicate_is_absent_or_fully_indexable(model, plan)
        || plan.scalar_plan().distinct
        || plan
            .scalar_plan()
            .page
            .as_ref()
            .is_some_and(|page| page.offset != 0)
        || covering.existing_row_mode != CoveringExistingRowMode::RequiresRowPresenceCheck
    {
        return false;
    }

    // Phase 2: classify one explicit single-field secondary witness cohort.
    // The classifier is the policy owner; the checks below only validate that
    // the current covering contract actually matches the admitted cohort.
    let Some(cohort) = secondary_witness_validated_covering_cohort(plan, covering) else {
        return false;
    };

    // Phase 3: require the narrow two-field layout that current runtime
    // covering execution knows how to emit under witness-backed authority.
    let Some(primary_key_slot) = model
        .fields
        .iter()
        .position(|field| field.name == model.primary_key().name)
    else {
        return false;
    };
    let mut component_field_count = 0usize;
    let mut constant_field_count = 0usize;
    let mut expected_component_field_present = false;
    for field in &covering.fields {
        match field.source {
            CoveringReadFieldSource::PrimaryKey => {
                if field.field_slot.index != primary_key_slot {
                    return false;
                }
            }
            CoveringReadFieldSource::IndexComponent { component_index } => {
                if component_index != match cohort {
                    SecondaryWitnessValidatedCoveringCohort::CompositeEqualityPrefixSuffixOrder => {
                        1
                    }
                    _ => 0,
                } {
                    return false;
                }
                component_field_count = component_field_count.saturating_add(1);
                expected_component_field_present = true;
            }
            CoveringReadFieldSource::Constant(_) => {
                constant_field_count = constant_field_count.saturating_add(1);
            }
        }
    }

    cohort.matches_field_source_counts(
        covering.fields.len(),
        component_field_count,
        constant_field_count,
        expected_component_field_present,
    )
}

// Return whether the current scalar predicate is either absent or fully
// index-compatible on the chosen access route.
fn plan_predicate_is_absent_or_fully_indexable(
    model: &'static EntityModel,
    plan: &AccessPlannedQuery,
) -> bool {
    if plan.scalar_plan().predicate.is_none() {
        return true;
    }

    let execution_preparation =
        ExecutionPreparation::from_plan(model, plan, slot_map_for_model_plan(model, plan));

    execution_preparation
        .predicate_capability_profile()
        .is_some_and(|profile| profile.index() == IndexPredicateCapability::FullyIndexable)
}

// Classify one explicit secondary witness-backed covering cohort from the
// structural access route plus planner-owned covering order contract.
fn secondary_witness_validated_covering_cohort(
    plan: &AccessPlannedQuery,
    covering: &CoveringReadExecutionPlan,
) -> Option<SecondaryWitnessValidatedCoveringCohort> {
    if let Some((index, prefix_values)) = plan.access.as_index_prefix_path() {
        let cohort = match prefix_values.len() {
            0 if index.fields().len() == 1 => {
                Some(SecondaryWitnessValidatedCoveringCohort::OrderOnlySingleField)
            }
            1 if index.fields().len() == 1 => {
                Some(SecondaryWitnessValidatedCoveringCohort::EqualityPrefixPrimaryKeyOrder)
            }
            1 if index.fields().len() == 2 => {
                Some(SecondaryWitnessValidatedCoveringCohort::CompositeEqualityPrefixSuffixOrder)
            }
            _ => None,
        };

        return cohort.filter(|cohort| cohort.matches_order_contract(covering.order_contract));
    }

    if let Some((index, prefix_values, _, _)) = plan.access.as_index_range_path() {
        if index.fields().len() != 1 || !prefix_values.is_empty() {
            return None;
        }

        let cohort = SecondaryWitnessValidatedCoveringCohort::BoundedRangeSingleField;

        return cohort
            .matches_order_contract(covering.order_contract)
            .then_some(cohort);
    }

    None
}