use crate::{
db::{
cursor::CursorBoundary,
executor::{
OrderedKeyStream, ScalarContinuationContext,
pipeline::contracts::{CursorEmissionMode, MaterializedExecutionPayload, PageCursor},
projection::PreparedSlotProjectionValidation,
route::{LoadOrderRouteContract, access_order_satisfied_by_route_contract},
terminal::page::{
KernelRow, KernelRowPayloadMode, ResidualPredicateScanMode, RetainedSlotLayout,
ScalarMaterializationCapabilities, ScalarRowRuntimeHandle,
post_scan::{
FinalPayloadStrategy, StructuralPostScanPageWindowStrategy,
StructuralPostScanTailStrategy, required_prepared_projection_validation,
},
resolved_order_required,
scan::{KernelRowScanRequest, ScalarPageKernelRequest},
},
window::compute_page_keep_count,
},
predicate::{MissingRowPolicy, PredicateProgram},
query::plan::{AccessPlannedQuery, ResolvedOrder},
},
error::InternalError,
};
pub(super) struct ScalarMaterializationPlan<'a> {
direct_data_row_path: Option<DirectDataRowPath<'a>>,
kernel_row_scan_strategy: KernelRowScanStrategy<'a>,
post_access_strategy: PostAccessStrategy<'a>,
post_scan_tail: StructuralPostScanTailStrategy<'a>,
cursor_emission: CursorEmissionMode,
}
impl<'a> ScalarMaterializationPlan<'a> {
pub(super) const fn direct_data_row_path(&self) -> Option<DirectDataRowPath<'a>> {
self.direct_data_row_path
}
pub(super) fn kernel_request<'r>(
&self,
key_stream: &'a mut dyn OrderedKeyStream,
scan_budget_hint: Option<usize>,
load_order_route_contract: LoadOrderRouteContract,
consistency: MissingRowPolicy,
continuation: &'a ScalarContinuationContext,
row_runtime: &'r mut ScalarRowRuntimeHandle<'a>,
) -> ScalarPageKernelRequest<'a, 'r> {
ScalarPageKernelRequest {
key_stream,
scan_budget_hint,
load_order_route_contract,
consistency,
scan_strategy: self.kernel_row_scan_strategy,
continuation,
row_runtime,
}
}
pub(super) const fn post_access_strategy(&self) -> PostAccessStrategy<'a> {
self.post_access_strategy
}
pub(super) const fn cursor_emission(&self) -> CursorEmissionMode {
self.cursor_emission
}
pub(super) fn apply_post_scan_tail(
&self,
plan: &AccessPlannedQuery,
rows: &mut Vec<KernelRow>,
) -> Result<(), InternalError> {
self.post_scan_tail.apply(plan, rows)
}
pub(super) fn finalize_payload(
&self,
rows: Vec<KernelRow>,
next_cursor: Option<PageCursor>,
) -> Result<MaterializedExecutionPayload, InternalError> {
self.post_scan_tail.finalize_payload(rows, next_cursor)
}
}
pub(in crate::db::executor) struct CursorlessShortPathPlan<'a> {
scan_strategy: KernelRowScanStrategy<'a>,
row_keep_cap: Option<usize>,
post_scan_tail: StructuralPostScanTailStrategy<'a>,
}
impl<'a> CursorlessShortPathPlan<'a> {
pub(in crate::db::executor) fn scan_request<'r>(
&self,
key_stream: &'a mut dyn OrderedKeyStream,
scan_budget_hint: Option<usize>,
consistency: MissingRowPolicy,
row_runtime: &'r mut ScalarRowRuntimeHandle<'a>,
) -> KernelRowScanRequest<'a, 'r> {
KernelRowScanRequest {
key_stream,
scan_budget_hint,
consistency,
scan_strategy: self.scan_strategy,
row_keep_cap: self.row_keep_cap,
row_runtime,
}
}
pub(in crate::db::executor) fn materialize_rows(
&self,
plan: &AccessPlannedQuery,
mut rows: Vec<KernelRow>,
) -> Result<(MaterializedExecutionPayload, usize), InternalError> {
self.post_scan_tail.apply(plan, &mut rows)?;
let post_access_rows = rows.len();
let payload = self.post_scan_tail.finalize_payload(rows, None)?;
Ok((payload, post_access_rows))
}
}
struct ResolvedScalarStructuralPolicy<'a> {
residual_predicate_scan_mode: ResidualPredicateScanMode,
kernel_row_scan_strategy: KernelRowScanStrategy<'a>,
projection_validation: Option<&'a PreparedSlotProjectionValidation>,
final_payload_strategy: FinalPayloadStrategy,
}
impl<'a> ResolvedScalarStructuralPolicy<'a> {
const fn residual_predicate_scan_mode(&self) -> ResidualPredicateScanMode {
self.residual_predicate_scan_mode
}
const fn kernel_row_scan_strategy(&self) -> KernelRowScanStrategy<'a> {
self.kernel_row_scan_strategy
}
const fn post_scan_tail(
&self,
page_window_strategy: StructuralPostScanPageWindowStrategy,
) -> StructuralPostScanTailStrategy<'a> {
StructuralPostScanTailStrategy::new(
page_window_strategy,
self.projection_validation,
self.final_payload_strategy,
)
}
}
pub(super) fn resolve_scalar_materialization_plan<'a>(
plan: &'a AccessPlannedQuery,
capabilities: ScalarMaterializationCapabilities<'a>,
) -> Result<ScalarMaterializationPlan<'a>, InternalError> {
let structural_policy = resolve_scalar_structural_policy(
plan,
capabilities,
select_kernel_row_payload_mode(
capabilities.retain_slot_rows,
capabilities.cursor_emission,
capabilities.retained_slot_layout,
),
)?;
let direct_data_row_path = resolve_direct_data_row_path(
plan,
capabilities.validate_projection,
capabilities.retain_slot_rows,
capabilities.retained_slot_layout,
capabilities.predicate_slots,
capabilities.cursor_emission,
structural_policy.residual_predicate_scan_mode(),
)?;
let post_access_strategy = resolve_post_access_strategy(
plan,
capabilities.predicate_slots,
structural_policy.residual_predicate_scan_mode(),
capabilities.cursor_emission,
capabilities.retain_slot_rows,
)?;
Ok(ScalarMaterializationPlan {
direct_data_row_path,
kernel_row_scan_strategy: structural_policy.kernel_row_scan_strategy(),
post_access_strategy,
post_scan_tail: structural_policy
.post_scan_tail(StructuralPostScanPageWindowStrategy::NotPresent),
cursor_emission: capabilities.cursor_emission,
})
}
pub(in crate::db::executor) fn resolve_cursorless_short_path_plan<'a>(
plan: &'a AccessPlannedQuery,
cursor_boundary: Option<&CursorBoundary>,
capabilities: ScalarMaterializationCapabilities<'a>,
) -> Result<Option<CursorlessShortPathPlan<'a>>, InternalError> {
let logical = plan.scalar_plan();
let generic_short_path = logical.mode.is_load()
&& cursor_boundary.is_none()
&& logical.predicate.is_none()
&& logical.order.is_none()
&& logical.page.is_none();
let retained_slot_short_path = logical.mode.is_load()
&& capabilities.retain_slot_rows
&& cursor_boundary.is_none()
&& !logical.distinct
&& (logical.order.is_none() || access_order_satisfied_by_route_contract(plan));
if !(generic_short_path || retained_slot_short_path) {
return Ok(None);
}
let structural_policy = resolve_scalar_structural_policy(
plan,
capabilities,
select_cursorless_short_path_payload_mode(
capabilities.retain_slot_rows,
cursor_boundary,
capabilities.retained_slot_layout,
),
)?;
Ok(Some(CursorlessShortPathPlan {
scan_strategy: structural_policy.kernel_row_scan_strategy(),
row_keep_cap: cursorless_short_path_keep_cap(
plan,
cursor_boundary,
capabilities.retain_slot_rows,
),
post_scan_tail: structural_policy
.post_scan_tail(StructuralPostScanPageWindowStrategy::CursorlessRetainedWindow),
}))
}
fn resolve_scalar_structural_policy<'a>(
plan: &AccessPlannedQuery,
capabilities: ScalarMaterializationCapabilities<'a>,
kernel_payload_mode: KernelRowPayloadMode,
) -> Result<ResolvedScalarStructuralPolicy<'a>, InternalError> {
let residual_predicate_scan_mode = ResidualPredicateScanMode::from_plan_and_layout(
plan.has_residual_predicate(),
capabilities.retained_slot_layout,
);
let kernel_row_scan_strategy = resolve_kernel_row_scan_strategy(
kernel_payload_mode,
capabilities.predicate_slots,
residual_predicate_scan_mode,
capabilities.retained_slot_layout,
)?;
Ok(ResolvedScalarStructuralPolicy {
residual_predicate_scan_mode,
kernel_row_scan_strategy,
projection_validation: if capabilities.validate_projection {
Some(required_prepared_projection_validation(
capabilities.prepared_projection_validation,
)?)
} else {
None
},
final_payload_strategy: FinalPayloadStrategy::from_retain_slot_rows(
capabilities.retain_slot_rows,
),
})
}
#[derive(Clone, Copy)]
pub(super) enum DirectDataRowPath<'a> {
Plain {
row_keep_cap: Option<usize>,
},
Filtered {
row_keep_cap: Option<usize>,
predicate_program: &'a PredicateProgram,
retained_slot_layout: &'a RetainedSlotLayout,
},
MaterializedOrder {
residual_predicate_scan_mode: ResidualPredicateScanMode,
resolved_order: &'a ResolvedOrder,
predicate_program: Option<&'a PredicateProgram>,
retained_slot_layout: Option<&'a RetainedSlotLayout>,
},
}
#[derive(Clone, Copy)]
pub(in crate::db::executor) enum KernelRowScanStrategy<'a> {
DataRows,
RetainedFullRows {
retained_slot_layout: &'a RetainedSlotLayout,
},
RetainedFullRowsFiltered {
predicate_program: &'a PredicateProgram,
retained_slot_layout: &'a RetainedSlotLayout,
},
SlotOnlyRows {
retained_slot_layout: &'a RetainedSlotLayout,
},
SlotOnlyRowsFiltered {
predicate_program: &'a PredicateProgram,
retained_slot_layout: &'a RetainedSlotLayout,
},
}
#[derive(Clone, Copy)]
pub(super) enum PostAccessPredicateStrategy<'a> {
NotPresent,
AppliedDuringScan,
Deferred {
predicate_program: &'a PredicateProgram,
},
}
impl PostAccessPredicateStrategy<'_> {
pub(super) const fn requires_post_access_filtering(self) -> bool {
matches!(self, Self::Deferred { .. })
}
}
#[derive(Clone, Copy)]
pub(super) struct PostAccessStrategy<'a> {
pub(super) predicate_strategy: PostAccessPredicateStrategy<'a>,
pub(super) defer_retained_slot_distinct_window: bool,
}
fn resolve_direct_data_row_path<'a>(
plan: &'a AccessPlannedQuery,
validate_projection: bool,
retain_slot_rows: bool,
retained_slot_layout: Option<&'a RetainedSlotLayout>,
predicate_slots: Option<&'a PredicateProgram>,
cursor_emission: CursorEmissionMode,
residual_predicate_scan_mode: ResidualPredicateScanMode,
) -> Result<Option<DirectDataRowPath<'a>>, InternalError> {
let logical = plan.scalar_plan();
let direct_load_surface_eligible = logical.mode.is_load()
&& !logical.distinct
&& !validate_projection
&& !retain_slot_rows
&& !cursor_emission.enabled();
if !direct_load_surface_eligible {
return Ok(None);
}
if access_order_satisfied_by_route_contract(plan) {
return Ok(match residual_predicate_scan_mode {
ResidualPredicateScanMode::Absent if retained_slot_layout.is_none() => {
Some(DirectDataRowPath::Plain {
row_keep_cap: direct_data_row_keep_cap(plan),
})
}
ResidualPredicateScanMode::AppliedDuringScan => {
predicate_slots.zip(retained_slot_layout).map(
|(predicate_program, retained_slot_layout)| DirectDataRowPath::Filtered {
row_keep_cap: direct_data_row_keep_cap(plan),
predicate_program,
retained_slot_layout,
},
)
}
ResidualPredicateScanMode::Absent | ResidualPredicateScanMode::DeferredPostAccess => {
None
}
});
}
let materialized_order_direct_eligible = logical
.order
.as_ref()
.is_some_and(|order| !order.fields.is_empty())
&& retained_slot_layout.is_some()
&& (matches!(
residual_predicate_scan_mode,
ResidualPredicateScanMode::Absent
) || (matches!(
residual_predicate_scan_mode,
ResidualPredicateScanMode::AppliedDuringScan
) && predicate_slots.is_some()));
if !materialized_order_direct_eligible {
return Ok(None);
}
Ok(Some(DirectDataRowPath::MaterializedOrder {
residual_predicate_scan_mode,
resolved_order: resolved_order_required(plan)?,
predicate_program: predicate_slots,
retained_slot_layout,
}))
}
fn resolve_kernel_row_scan_strategy<'a>(
payload_mode: KernelRowPayloadMode,
predicate_slots: Option<&'a PredicateProgram>,
residual_predicate_scan_mode: ResidualPredicateScanMode,
retained_slot_layout: Option<&'a RetainedSlotLayout>,
) -> Result<KernelRowScanStrategy<'a>, InternalError> {
match (payload_mode, residual_predicate_scan_mode) {
(
KernelRowPayloadMode::DataRowOnly,
ResidualPredicateScanMode::Absent | ResidualPredicateScanMode::DeferredPostAccess,
) => Ok(KernelRowScanStrategy::DataRows),
(KernelRowPayloadMode::DataRowOnly, ResidualPredicateScanMode::AppliedDuringScan) => {
Err(InternalError::query_executor_invariant(
"data-row-only kernel rows must not apply residual predicates during scan",
))
}
(KernelRowPayloadMode::FullRowRetained, ResidualPredicateScanMode::Absent) => {
Ok(KernelRowScanStrategy::RetainedFullRows {
retained_slot_layout: retained_slot_layout.ok_or_else(|| {
InternalError::query_executor_invariant(
"retained full-row kernel rows require one retained-slot layout",
)
})?,
})
}
(KernelRowPayloadMode::FullRowRetained, ResidualPredicateScanMode::AppliedDuringScan) => {
Ok(KernelRowScanStrategy::RetainedFullRowsFiltered {
predicate_program: predicate_slots
.ok_or_else(InternalError::scalar_page_predicate_slots_required)?,
retained_slot_layout: retained_slot_layout.ok_or_else(|| {
InternalError::query_executor_invariant(
"retained full-row kernel rows require one retained-slot layout",
)
})?,
})
}
(KernelRowPayloadMode::FullRowRetained, ResidualPredicateScanMode::DeferredPostAccess) => {
Err(InternalError::query_executor_invariant(
"retained full-row kernel rows must apply residual predicates during scan",
))
}
(KernelRowPayloadMode::SlotsOnly, ResidualPredicateScanMode::Absent) => {
Ok(KernelRowScanStrategy::SlotOnlyRows {
retained_slot_layout: retained_slot_layout.ok_or_else(|| {
InternalError::query_executor_invariant(
"slot-only kernel rows require one retained-slot layout",
)
})?,
})
}
(KernelRowPayloadMode::SlotsOnly, ResidualPredicateScanMode::AppliedDuringScan) => {
Ok(KernelRowScanStrategy::SlotOnlyRowsFiltered {
predicate_program: predicate_slots
.ok_or_else(InternalError::scalar_page_predicate_slots_required)?,
retained_slot_layout: retained_slot_layout.ok_or_else(|| {
InternalError::query_executor_invariant(
"slot-only kernel rows require one retained-slot layout",
)
})?,
})
}
(KernelRowPayloadMode::SlotsOnly, ResidualPredicateScanMode::DeferredPostAccess) => {
Err(InternalError::query_executor_invariant(
"slot-only kernel rows must apply residual predicates during scan",
))
}
}
}
fn resolve_post_access_strategy<'a>(
plan: &AccessPlannedQuery,
predicate_slots: Option<&'a PredicateProgram>,
residual_predicate_scan_mode: ResidualPredicateScanMode,
cursor_emission: CursorEmissionMode,
retain_slot_rows: bool,
) -> Result<PostAccessStrategy<'a>, InternalError> {
let predicate_strategy = match residual_predicate_scan_mode {
ResidualPredicateScanMode::Absent => PostAccessPredicateStrategy::NotPresent,
ResidualPredicateScanMode::AppliedDuringScan => {
PostAccessPredicateStrategy::AppliedDuringScan
}
ResidualPredicateScanMode::DeferredPostAccess => PostAccessPredicateStrategy::Deferred {
predicate_program: predicate_slots
.ok_or_else(InternalError::scalar_page_predicate_slots_required)?,
},
};
Ok(PostAccessStrategy {
predicate_strategy,
defer_retained_slot_distinct_window: plan.scalar_plan().distinct
&& !cursor_emission.enabled()
&& retain_slot_rows,
})
}
const fn select_kernel_row_payload_mode(
retain_slot_rows: bool,
cursor_emission: CursorEmissionMode,
retained_slot_layout: Option<&RetainedSlotLayout>,
) -> KernelRowPayloadMode {
select_scalar_structural_payload_mode(
retain_slot_rows,
!cursor_emission.enabled(),
retained_slot_layout,
)
}
const fn select_scalar_structural_payload_mode(
retain_slot_rows: bool,
suppress_cursor: bool,
retained_slot_layout: Option<&RetainedSlotLayout>,
) -> KernelRowPayloadMode {
if retain_slot_rows && suppress_cursor {
KernelRowPayloadMode::SlotsOnly
} else if retained_slot_layout.is_some() {
KernelRowPayloadMode::FullRowRetained
} else {
KernelRowPayloadMode::DataRowOnly
}
}
fn cursorless_short_path_keep_cap(
plan: &AccessPlannedQuery,
cursor_boundary: Option<&CursorBoundary>,
retain_slot_rows: bool,
) -> Option<usize> {
if !retain_slot_rows || cursor_boundary.is_some() {
return None;
}
let page = plan.scalar_plan().page.as_ref()?;
let limit = page.limit?;
let offset = usize::try_from(page.offset).unwrap_or(usize::MAX);
let limit = usize::try_from(limit).unwrap_or(usize::MAX);
Some(offset.saturating_add(limit))
}
const fn select_cursorless_short_path_payload_mode(
retain_slot_rows: bool,
cursor_boundary: Option<&CursorBoundary>,
retained_slot_layout: Option<&RetainedSlotLayout>,
) -> KernelRowPayloadMode {
select_scalar_structural_payload_mode(
retain_slot_rows,
cursor_boundary.is_none(),
retained_slot_layout,
)
}
fn direct_data_row_keep_cap(plan: &AccessPlannedQuery) -> Option<usize> {
let page = plan.scalar_plan().page.as_ref()?;
let limit = page.limit?;
Some(compute_page_keep_count(page.offset, limit))
}