use crate::db::executor::terminal::page::{
KernelRow, KernelRowScanStrategy, ResidualPredicateScanMode, ScalarRowRuntimeHandle,
};
use crate::{
db::{
data::{DataKey, DataRow},
executor::{
BudgetedOrderedKeyStream, OrderedKeyStream, ScalarContinuationContext,
exact_output_key_count_hint, key_stream_budget_is_redundant,
route::LoadOrderRouteContract, terminal::page::RetainedSlotLayout,
},
predicate::{MissingRowPolicy, PredicateProgram},
},
error::InternalError,
value::Value,
};
#[cfg(feature = "diagnostics")]
use super::metrics::{
measure_direct_data_row_phase, record_direct_data_row_key_stream_local_instructions,
record_direct_data_row_row_read_local_instructions,
};
#[cfg(any(test, feature = "diagnostics"))]
use super::metrics::{
record_kernel_data_row_path_hit, record_kernel_full_row_retained_path_hit,
record_kernel_slots_only_path_hit,
};
pub(super) struct ScalarPageKernelRequest<'a, 'r> {
pub(super) key_stream: &'a mut dyn OrderedKeyStream,
pub(super) scan_budget_hint: Option<usize>,
pub(super) load_order_route_contract: LoadOrderRouteContract,
pub(super) consistency: MissingRowPolicy,
pub(super) scan_strategy: KernelRowScanStrategy<'a>,
pub(super) continuation: &'a ScalarContinuationContext,
pub(super) row_runtime: &'r mut ScalarRowRuntimeHandle<'a>,
}
pub(in crate::db::executor) struct KernelRowScanRequest<'a, 'r> {
pub(in crate::db::executor) key_stream: &'a mut dyn OrderedKeyStream,
pub(in crate::db::executor) scan_budget_hint: Option<usize>,
pub(in crate::db::executor) consistency: MissingRowPolicy,
pub(in crate::db::executor) scan_strategy: KernelRowScanStrategy<'a>,
pub(in crate::db::executor) row_keep_cap: Option<usize>,
pub(in crate::db::executor) row_runtime: &'r mut ScalarRowRuntimeHandle<'a>,
}
#[expect(clippy::too_many_lines)]
pub(in crate::db::executor) fn execute_kernel_row_scan(
request: KernelRowScanRequest<'_, '_>,
) -> Result<(Vec<KernelRow>, usize), InternalError> {
let KernelRowScanRequest {
key_stream,
scan_budget_hint,
consistency,
scan_strategy,
row_keep_cap,
row_runtime,
} = request;
match scan_strategy {
KernelRowScanStrategy::DataRows => {
#[cfg(any(test, feature = "diagnostics"))]
record_kernel_data_row_path_hit();
execute_scalar_page_read_loop(key_stream, scan_budget_hint, |key_stream| {
scan_data_rows_only_into_kernel(key_stream, consistency, row_keep_cap, row_runtime)
})
}
KernelRowScanStrategy::RetainedFullRows {
retained_slot_layout,
} => {
#[cfg(any(test, feature = "diagnostics"))]
record_kernel_full_row_retained_path_hit();
execute_retained_kernel_scan(
key_stream,
scan_budget_hint,
Some(retained_slot_layout),
"retained full-row kernel rows require one retained-slot layout",
|key_stream, retained_slot_layout| {
scan_full_retained_rows_into_kernel(
key_stream,
consistency,
retained_slot_layout,
row_keep_cap,
row_runtime,
)
},
)
}
KernelRowScanStrategy::RetainedFullRowsFiltered {
predicate_program,
retained_slot_layout,
} => {
#[cfg(any(test, feature = "diagnostics"))]
record_kernel_full_row_retained_path_hit();
execute_retained_kernel_scan(
key_stream,
scan_budget_hint,
Some(retained_slot_layout),
"retained full-row kernel rows require one retained-slot layout",
|key_stream, retained_slot_layout| {
scan_full_retained_rows_into_kernel_with_predicate(
key_stream,
consistency,
predicate_program,
retained_slot_layout,
row_keep_cap,
row_runtime,
)
},
)
}
KernelRowScanStrategy::SlotOnlyRows {
retained_slot_layout,
} => {
#[cfg(any(test, feature = "diagnostics"))]
record_kernel_slots_only_path_hit();
execute_retained_kernel_scan(
key_stream,
scan_budget_hint,
Some(retained_slot_layout),
"slot-only kernel rows require one retained-slot layout",
|key_stream, retained_slot_layout| {
scan_slot_rows_into_kernel(
key_stream,
consistency,
retained_slot_layout,
row_keep_cap,
row_runtime,
)
},
)
}
KernelRowScanStrategy::SlotOnlyRowsFiltered {
predicate_program,
retained_slot_layout,
} => {
#[cfg(any(test, feature = "diagnostics"))]
record_kernel_slots_only_path_hit();
execute_retained_kernel_scan(
key_stream,
scan_budget_hint,
Some(retained_slot_layout),
"slot-only kernel rows require one retained-slot layout",
|key_stream, retained_slot_layout| {
scan_slot_rows_into_kernel_with_predicate(
key_stream,
consistency,
predicate_program,
retained_slot_layout,
row_keep_cap,
row_runtime,
)
},
)
}
}
}
fn execute_retained_kernel_scan(
key_stream: &mut dyn OrderedKeyStream,
scan_budget_hint: Option<usize>,
retained_slot_layout: Option<&RetainedSlotLayout>,
missing_layout_message: &'static str,
mut scan_rows: impl FnMut(
&mut dyn OrderedKeyStream,
&RetainedSlotLayout,
) -> Result<(Vec<KernelRow>, usize), InternalError>,
) -> Result<(Vec<KernelRow>, usize), InternalError> {
let retained_slot_layout = retained_slot_layout
.ok_or_else(|| InternalError::query_executor_invariant(missing_layout_message))?;
execute_scalar_page_read_loop(key_stream, scan_budget_hint, |key_stream| {
scan_rows(key_stream, retained_slot_layout)
})
}
pub(super) fn execute_scalar_page_kernel_dyn(
request: ScalarPageKernelRequest<'_, '_>,
) -> Result<(Vec<KernelRow>, usize), InternalError> {
let ScalarPageKernelRequest {
key_stream,
scan_budget_hint,
load_order_route_contract,
consistency,
scan_strategy,
continuation,
row_runtime,
} = request;
continuation.validate_load_scan_budget_hint(scan_budget_hint, load_order_route_contract)?;
execute_kernel_row_scan(KernelRowScanRequest {
key_stream,
scan_budget_hint,
consistency,
scan_strategy,
row_keep_cap: None,
row_runtime,
})
}
fn execute_scalar_page_read_loop(
key_stream: &mut dyn OrderedKeyStream,
scan_budget_hint: Option<usize>,
mut scan_rows: impl FnMut(
&mut dyn OrderedKeyStream,
) -> Result<(Vec<KernelRow>, usize), InternalError>,
) -> Result<(Vec<KernelRow>, usize), InternalError> {
if let Some(scan_budget) = scan_budget_hint
&& !key_stream_budget_is_redundant(key_stream, scan_budget)
{
let mut budgeted = BudgetedOrderedKeyStream::new(key_stream, scan_budget);
return scan_rows(&mut budgeted);
}
scan_rows(key_stream)
}
fn execute_scalar_data_row_read_loop(
key_stream: &mut dyn OrderedKeyStream,
scan_budget_hint: Option<usize>,
row_keep_cap: Option<usize>,
mut scan_rows: impl FnMut(
&mut dyn OrderedKeyStream,
Option<usize>,
) -> Result<(Vec<DataRow>, usize), InternalError>,
) -> Result<(Vec<DataRow>, usize), InternalError> {
if row_keep_cap == Some(0) {
return Ok((Vec::new(), 0));
}
if let Some(scan_budget) = scan_budget_hint
&& !key_stream_budget_is_redundant(key_stream, scan_budget)
{
let mut budgeted = BudgetedOrderedKeyStream::new(key_stream, scan_budget);
return scan_rows(&mut budgeted, row_keep_cap);
}
scan_rows(key_stream, row_keep_cap)
}
fn scan_kernel_rows_with(
key_stream: &mut dyn OrderedKeyStream,
row_keep_cap: Option<usize>,
mut read_row: impl FnMut(DataKey) -> Result<Option<KernelRow>, InternalError>,
) -> Result<(Vec<KernelRow>, usize), InternalError> {
let mut rows_scanned = 0usize;
let staged_capacity = exact_output_key_count_hint(key_stream, None).map_or_else(
|| row_keep_cap.unwrap_or(0),
|hint| row_keep_cap.map_or(hint, |cap| usize::min(hint, cap)),
);
let mut rows = Vec::with_capacity(staged_capacity);
while let Some(key) = key_stream.next_key()? {
rows_scanned = rows_scanned.saturating_add(1);
let Some(row) = read_row(key)? else {
continue;
};
rows.push(row);
if row_keep_cap.is_some_and(|cap| rows.len() >= cap) {
break;
}
}
Ok((rows, rows_scanned))
}
pub(super) fn predicate_matches_retained_values(
predicate_program: &PredicateProgram,
retained_slot_layout: &RetainedSlotLayout,
retained_values: &[Option<Value>],
) -> bool {
predicate_program.eval_with_slot_value_ref_reader(&mut |slot| {
let index = retained_slot_layout.value_index_for_slot(slot)?;
retained_values.get(index).and_then(Option::as_ref)
})
}
fn scan_data_rows_direct(
key_stream: &mut dyn OrderedKeyStream,
consistency: MissingRowPolicy,
row_keep_cap: Option<usize>,
row_runtime: &ScalarRowRuntimeHandle<'_>,
) -> Result<(Vec<DataRow>, usize), InternalError> {
scan_data_rows_direct_with_reader(key_stream, row_keep_cap, |key| {
row_runtime.read_data_row(consistency, key)
})
}
fn scan_data_rows_direct_with_reader(
key_stream: &mut dyn OrderedKeyStream,
row_keep_cap: Option<usize>,
mut read_data_row: impl FnMut(DataKey) -> Result<Option<DataRow>, InternalError>,
) -> Result<(Vec<DataRow>, usize), InternalError> {
let mut rows_scanned = 0usize;
let staged_capacity = exact_output_key_count_hint(key_stream, None).map_or_else(
|| row_keep_cap.unwrap_or(0),
|hint| row_keep_cap.map_or(hint, |cap| usize::min(hint, cap)),
);
let mut data_rows = Vec::with_capacity(staged_capacity);
loop {
#[cfg(feature = "diagnostics")]
let (key_stream_local_instructions, read_result) =
measure_direct_data_row_phase(|| key_stream.next_key());
#[cfg(not(feature = "diagnostics"))]
let read_result = key_stream.next_key();
let Some(key) = read_result? else {
break;
};
#[cfg(feature = "diagnostics")]
record_direct_data_row_key_stream_local_instructions(key_stream_local_instructions);
rows_scanned = rows_scanned.saturating_add(1);
#[cfg(feature = "diagnostics")]
let (row_read_local_instructions, row_read_result) =
measure_direct_data_row_phase(|| read_data_row(key));
#[cfg(not(feature = "diagnostics"))]
let row_read_result = read_data_row(key);
#[cfg(feature = "diagnostics")]
record_direct_data_row_row_read_local_instructions(row_read_local_instructions);
let Some(data_row) = row_read_result? else {
continue;
};
data_rows.push(data_row);
if row_keep_cap.is_some_and(|cap| data_rows.len() >= cap) {
break;
}
}
Ok((data_rows, rows_scanned))
}
fn scan_data_rows_direct_with_predicate(
key_stream: &mut dyn OrderedKeyStream,
consistency: MissingRowPolicy,
row_keep_cap: Option<usize>,
row_runtime: &ScalarRowRuntimeHandle<'_>,
predicate_program: &PredicateProgram,
retained_slot_layout: &RetainedSlotLayout,
) -> Result<(Vec<DataRow>, usize), InternalError> {
scan_data_rows_direct_with_reader(key_stream, row_keep_cap, |key| {
row_runtime.read_data_row_with_predicate(
consistency,
key,
predicate_program,
retained_slot_layout,
)
})
}
pub(super) fn scan_materialized_order_direct_data_rows(
key_stream: &mut dyn OrderedKeyStream,
scan_budget_hint: Option<usize>,
consistency: MissingRowPolicy,
residual_predicate_scan_mode: ResidualPredicateScanMode,
row_runtime: &ScalarRowRuntimeHandle<'_>,
predicate_slots: Option<&PredicateProgram>,
retained_slot_layout: Option<&RetainedSlotLayout>,
) -> Result<(Vec<DataRow>, usize), InternalError> {
scan_direct_data_rows_with_residual_policy(
key_stream,
scan_budget_hint,
None,
consistency,
residual_predicate_scan_mode,
row_runtime,
predicate_slots,
retained_slot_layout,
"materialized-order direct data-row path cannot defer residual filtering",
)
}
#[expect(clippy::too_many_arguments)]
pub(super) fn scan_direct_data_rows_with_residual_policy(
key_stream: &mut dyn OrderedKeyStream,
scan_budget_hint: Option<usize>,
row_keep_cap: Option<usize>,
consistency: MissingRowPolicy,
residual_predicate_scan_mode: ResidualPredicateScanMode,
row_runtime: &ScalarRowRuntimeHandle<'_>,
predicate_program: Option<&PredicateProgram>,
retained_slot_layout: Option<&RetainedSlotLayout>,
deferred_filtering_message: &'static str,
) -> Result<(Vec<DataRow>, usize), InternalError> {
execute_scalar_data_row_read_loop(
key_stream,
scan_budget_hint,
row_keep_cap,
|key_stream, row_keep_cap| match residual_predicate_scan_mode {
ResidualPredicateScanMode::Absent => {
scan_data_rows_direct(key_stream, consistency, row_keep_cap, row_runtime)
}
ResidualPredicateScanMode::AppliedDuringScan => {
let predicate_program = predicate_program.ok_or_else(|| {
InternalError::query_executor_invariant(
"scan-time residual filtering requires one compiled predicate program",
)
})?;
let retained_slot_layout = retained_slot_layout.ok_or_else(|| {
InternalError::query_executor_invariant(
"scan-time residual filtering requires one retained-slot layout",
)
})?;
scan_data_rows_direct_with_predicate(
key_stream,
consistency,
row_keep_cap,
row_runtime,
predicate_program,
retained_slot_layout,
)
}
ResidualPredicateScanMode::DeferredPostAccess => Err(
InternalError::query_executor_invariant(deferred_filtering_message),
),
},
)
}
fn scan_data_rows_only_into_kernel(
key_stream: &mut dyn OrderedKeyStream,
consistency: MissingRowPolicy,
row_keep_cap: Option<usize>,
row_runtime: &ScalarRowRuntimeHandle<'_>,
) -> Result<(Vec<KernelRow>, usize), InternalError> {
scan_kernel_rows_with(key_stream, row_keep_cap, |key| {
row_runtime.read_data_row_only(consistency, key)
})
}
fn scan_full_retained_rows_into_kernel(
key_stream: &mut dyn OrderedKeyStream,
consistency: MissingRowPolicy,
retained_slot_layout: &RetainedSlotLayout,
row_keep_cap: Option<usize>,
row_runtime: &ScalarRowRuntimeHandle<'_>,
) -> Result<(Vec<KernelRow>, usize), InternalError> {
scan_full_retained_rows_into_kernel_with_reader(key_stream, row_keep_cap, |key| {
row_runtime.read_full_row_retained(consistency, key, retained_slot_layout)
})
}
fn scan_full_retained_rows_into_kernel_with_reader(
key_stream: &mut dyn OrderedKeyStream,
row_keep_cap: Option<usize>,
read_row: impl FnMut(DataKey) -> Result<Option<KernelRow>, InternalError>,
) -> Result<(Vec<KernelRow>, usize), InternalError> {
scan_kernel_rows_with(key_stream, row_keep_cap, read_row)
}
fn scan_full_retained_rows_into_kernel_with_predicate(
key_stream: &mut dyn OrderedKeyStream,
consistency: MissingRowPolicy,
predicate_program: &PredicateProgram,
retained_slot_layout: &RetainedSlotLayout,
row_keep_cap: Option<usize>,
row_runtime: &ScalarRowRuntimeHandle<'_>,
) -> Result<(Vec<KernelRow>, usize), InternalError> {
scan_full_retained_rows_into_kernel_with_reader(key_stream, row_keep_cap, |key| {
row_runtime.read_full_row_retained_with_predicate(
consistency,
key,
predicate_program,
retained_slot_layout,
)
})
}
fn scan_slot_rows_into_kernel(
key_stream: &mut dyn OrderedKeyStream,
consistency: MissingRowPolicy,
retained_slot_layout: &RetainedSlotLayout,
row_keep_cap: Option<usize>,
row_runtime: &ScalarRowRuntimeHandle<'_>,
) -> Result<(Vec<KernelRow>, usize), InternalError> {
scan_slot_rows_into_kernel_with_reader(key_stream, row_keep_cap, |key| {
row_runtime.read_slot_only(consistency, &key, retained_slot_layout)
})
}
fn scan_slot_rows_into_kernel_with_reader(
key_stream: &mut dyn OrderedKeyStream,
row_keep_cap: Option<usize>,
read_row: impl FnMut(DataKey) -> Result<Option<KernelRow>, InternalError>,
) -> Result<(Vec<KernelRow>, usize), InternalError> {
scan_kernel_rows_with(key_stream, row_keep_cap, read_row)
}
fn scan_slot_rows_into_kernel_with_predicate(
key_stream: &mut dyn OrderedKeyStream,
consistency: MissingRowPolicy,
predicate_program: &PredicateProgram,
retained_slot_layout: &RetainedSlotLayout,
row_keep_cap: Option<usize>,
row_runtime: &ScalarRowRuntimeHandle<'_>,
) -> Result<(Vec<KernelRow>, usize), InternalError> {
scan_slot_rows_into_kernel_with_reader(key_stream, row_keep_cap, |key| {
row_runtime.read_slot_only_with_predicate(
consistency,
&key,
predicate_program,
retained_slot_layout,
)
})
}