use crate::{
db::{
data::{DataKey, DataRow},
executor::{
OrderedKeyStreamBox, ScalarContinuationContext, exact_output_key_count_hint,
key_stream_budget_is_redundant, measure_execution_stats_phase,
record_key_stream_micros, record_key_stream_yield,
route::LoadOrderRouteContract,
terminal::page::{
KernelRow, KernelRowScanStrategy, ResidualFilterScanMode, RetainedSlotLayout,
ScalarRowRuntimeHandle,
},
},
predicate::MissingRowPolicy,
query::plan::EffectiveRuntimeFilterProgram,
},
error::InternalError,
};
#[cfg(feature = "diagnostics")]
use super::metrics::{
measure_direct_data_row_phase, record_direct_data_row_key_stream_local_instructions,
record_direct_data_row_row_read_local_instructions,
};
#[cfg(any(test, feature = "diagnostics"))]
use super::metrics::{
record_kernel_data_row_path_hit, record_kernel_full_row_retained_path_hit,
record_kernel_slots_only_path_hit,
};
pub(super) struct ScalarPageKernelRequest<'a, 'r> {
pub(super) key_stream: &'a mut OrderedKeyStreamBox,
pub(super) scan_budget_hint: Option<usize>,
pub(super) load_order_route_contract: LoadOrderRouteContract,
pub(super) consistency: MissingRowPolicy,
pub(super) scan_strategy: KernelRowScanStrategy<'a>,
pub(super) continuation: &'a ScalarContinuationContext,
pub(super) row_runtime: &'r mut ScalarRowRuntimeHandle<'a>,
}
pub(in crate::db::executor) struct KernelRowScanRequest<'a, 'r> {
pub(in crate::db::executor) key_stream: &'a mut OrderedKeyStreamBox,
pub(in crate::db::executor) scan_budget_hint: Option<usize>,
pub(in crate::db::executor) consistency: MissingRowPolicy,
pub(in crate::db::executor) scan_strategy: KernelRowScanStrategy<'a>,
pub(in crate::db::executor) row_keep_cap: Option<usize>,
pub(in crate::db::executor) row_runtime: &'r mut ScalarRowRuntimeHandle<'a>,
}
#[expect(clippy::too_many_lines)]
pub(in crate::db::executor) fn execute_kernel_row_scan(
request: KernelRowScanRequest<'_, '_>,
) -> Result<(Vec<KernelRow>, usize), InternalError> {
let KernelRowScanRequest {
key_stream,
scan_budget_hint,
consistency,
scan_strategy,
row_keep_cap,
row_runtime,
} = request;
match scan_strategy {
KernelRowScanStrategy::DataRows => {
#[cfg(any(test, feature = "diagnostics"))]
record_kernel_data_row_path_hit();
execute_scalar_page_read_loop(key_stream, scan_budget_hint, |key_stream| {
scan_data_rows_only_into_kernel(key_stream, consistency, row_keep_cap, row_runtime)
})
}
KernelRowScanStrategy::RetainedFullRows {
retained_slot_layout,
} => {
#[cfg(any(test, feature = "diagnostics"))]
record_kernel_full_row_retained_path_hit();
execute_retained_kernel_scan(
key_stream,
scan_budget_hint,
Some(retained_slot_layout),
"retained full-row kernel rows require one retained-slot layout",
|key_stream, retained_slot_layout| {
scan_full_retained_rows_into_kernel(
key_stream,
consistency,
retained_slot_layout,
row_keep_cap,
row_runtime,
)
},
)
}
KernelRowScanStrategy::RetainedFullRowsFiltered {
filter_program,
retained_slot_layout,
} => {
#[cfg(any(test, feature = "diagnostics"))]
record_kernel_full_row_retained_path_hit();
execute_retained_kernel_scan(
key_stream,
scan_budget_hint,
Some(retained_slot_layout),
"retained full-row kernel rows require one retained-slot layout",
|key_stream, retained_slot_layout| {
scan_full_retained_rows_into_kernel_with_filter_program(
key_stream,
consistency,
filter_program,
retained_slot_layout,
row_keep_cap,
row_runtime,
)
},
)
}
KernelRowScanStrategy::SlotOnlyRows {
retained_slot_layout,
} => {
#[cfg(any(test, feature = "diagnostics"))]
record_kernel_slots_only_path_hit();
execute_retained_kernel_scan(
key_stream,
scan_budget_hint,
Some(retained_slot_layout),
"slot-only kernel rows require one retained-slot layout",
|key_stream, retained_slot_layout| {
scan_slot_rows_into_kernel(
key_stream,
consistency,
retained_slot_layout,
row_keep_cap,
row_runtime,
)
},
)
}
KernelRowScanStrategy::SlotOnlyRowsFiltered {
filter_program,
retained_slot_layout,
} => {
#[cfg(any(test, feature = "diagnostics"))]
record_kernel_slots_only_path_hit();
execute_retained_kernel_scan(
key_stream,
scan_budget_hint,
Some(retained_slot_layout),
"slot-only kernel rows require one retained-slot layout",
|key_stream, retained_slot_layout| {
scan_slot_rows_into_kernel_with_filter_program(
key_stream,
consistency,
filter_program,
retained_slot_layout,
row_keep_cap,
row_runtime,
)
},
)
}
}
}
fn execute_retained_kernel_scan(
key_stream: &mut OrderedKeyStreamBox,
scan_budget_hint: Option<usize>,
retained_slot_layout: Option<&RetainedSlotLayout>,
missing_layout_message: &'static str,
mut scan_rows: impl FnMut(
&mut OrderedKeyStreamBox,
&RetainedSlotLayout,
) -> Result<(Vec<KernelRow>, usize), InternalError>,
) -> Result<(Vec<KernelRow>, usize), InternalError> {
let retained_slot_layout = retained_slot_layout
.ok_or_else(|| InternalError::query_executor_invariant(missing_layout_message))?;
execute_scalar_page_read_loop(key_stream, scan_budget_hint, |key_stream| {
scan_rows(key_stream, retained_slot_layout)
})
}
pub(super) fn execute_scalar_page_kernel_dyn(
request: ScalarPageKernelRequest<'_, '_>,
) -> Result<(Vec<KernelRow>, usize), InternalError> {
let ScalarPageKernelRequest {
key_stream,
scan_budget_hint,
load_order_route_contract,
consistency,
scan_strategy,
continuation,
row_runtime,
} = request;
continuation.validate_load_scan_budget_hint(scan_budget_hint, load_order_route_contract)?;
execute_kernel_row_scan(KernelRowScanRequest {
key_stream,
scan_budget_hint,
consistency,
scan_strategy,
row_keep_cap: None,
row_runtime,
})
}
fn execute_scalar_page_read_loop(
key_stream: &mut OrderedKeyStreamBox,
scan_budget_hint: Option<usize>,
mut scan_rows: impl FnMut(
&mut OrderedKeyStreamBox,
) -> Result<(Vec<KernelRow>, usize), InternalError>,
) -> Result<(Vec<KernelRow>, usize), InternalError> {
if let Some(scan_budget) = scan_budget_hint
&& !key_stream_budget_is_redundant(key_stream, scan_budget)
{
let inner = std::mem::replace(key_stream, OrderedKeyStreamBox::empty());
*key_stream = OrderedKeyStreamBox::budgeted(inner, scan_budget);
return scan_rows(key_stream);
}
scan_rows(key_stream)
}
fn execute_scalar_data_row_read_loop(
key_stream: &mut OrderedKeyStreamBox,
scan_budget_hint: Option<usize>,
row_keep_cap: Option<usize>,
mut scan_rows: impl FnMut(
&mut OrderedKeyStreamBox,
Option<usize>,
) -> Result<(Vec<DataRow>, usize), InternalError>,
) -> Result<(Vec<DataRow>, usize), InternalError> {
if row_keep_cap == Some(0) {
return Ok((Vec::new(), 0));
}
if let Some(scan_budget) = scan_budget_hint
&& !key_stream_budget_is_redundant(key_stream, scan_budget)
{
let inner = std::mem::replace(key_stream, OrderedKeyStreamBox::empty());
*key_stream = OrderedKeyStreamBox::budgeted(inner, scan_budget);
return scan_rows(key_stream, row_keep_cap);
}
scan_rows(key_stream, row_keep_cap)
}
fn scan_kernel_rows_with(
key_stream: &mut OrderedKeyStreamBox,
row_keep_cap: Option<usize>,
mut read_row: impl FnMut(DataKey) -> Result<Option<KernelRow>, InternalError>,
) -> Result<(Vec<KernelRow>, usize), InternalError> {
let mut rows_scanned = 0usize;
let staged_capacity = exact_output_key_count_hint(key_stream, row_keep_cap)
.unwrap_or_else(|| row_keep_cap.unwrap_or(0));
let mut rows = Vec::with_capacity(staged_capacity);
let Some(row_keep_cap) = row_keep_cap else {
loop {
let (next_key, key_stream_micros) =
measure_execution_stats_phase(|| key_stream.next_key());
record_key_stream_micros(key_stream_micros);
let Some(key) = next_key? else {
break;
};
record_key_stream_yield();
rows_scanned = rows_scanned.saturating_add(1);
let Some(row) = read_row(key)? else {
continue;
};
rows.push(row);
}
return Ok((rows, rows_scanned));
};
loop {
let (next_key, key_stream_micros) = measure_execution_stats_phase(|| key_stream.next_key());
record_key_stream_micros(key_stream_micros);
let Some(key) = next_key? else {
break;
};
record_key_stream_yield();
rows_scanned = rows_scanned.saturating_add(1);
let Some(row) = read_row(key)? else {
continue;
};
rows.push(row);
if rows.len() >= row_keep_cap {
break;
}
}
Ok((rows, rows_scanned))
}
fn scan_data_rows_direct(
key_stream: &mut OrderedKeyStreamBox,
consistency: MissingRowPolicy,
row_keep_cap: Option<usize>,
row_runtime: &ScalarRowRuntimeHandle<'_>,
) -> Result<(Vec<DataRow>, usize), InternalError> {
scan_data_rows_direct_with_reader(key_stream, row_keep_cap, |key| {
row_runtime.read_data_row(consistency, key)
})
}
fn scan_data_rows_direct_with_reader(
key_stream: &mut OrderedKeyStreamBox,
row_keep_cap: Option<usize>,
mut read_data_row: impl FnMut(DataKey) -> Result<Option<DataRow>, InternalError>,
) -> Result<(Vec<DataRow>, usize), InternalError> {
let mut rows_scanned = 0usize;
let staged_capacity = exact_output_key_count_hint(key_stream, row_keep_cap)
.unwrap_or_else(|| row_keep_cap.unwrap_or(0));
let mut data_rows = Vec::with_capacity(staged_capacity);
let Some(row_keep_cap) = row_keep_cap else {
loop {
#[cfg(feature = "diagnostics")]
let ((key_stream_local_instructions, read_result), key_stream_micros) =
measure_execution_stats_phase(|| {
measure_direct_data_row_phase(|| key_stream.next_key())
});
#[cfg(not(feature = "diagnostics"))]
let (read_result, key_stream_micros) =
measure_execution_stats_phase(|| key_stream.next_key());
record_key_stream_micros(key_stream_micros);
let Some(key) = read_result? else {
break;
};
#[cfg(feature = "diagnostics")]
record_direct_data_row_key_stream_local_instructions(key_stream_local_instructions);
record_key_stream_yield();
rows_scanned = rows_scanned.saturating_add(1);
#[cfg(feature = "diagnostics")]
let (row_read_local_instructions, row_read_result) =
measure_direct_data_row_phase(|| read_data_row(key));
#[cfg(not(feature = "diagnostics"))]
let row_read_result = read_data_row(key);
#[cfg(feature = "diagnostics")]
record_direct_data_row_row_read_local_instructions(row_read_local_instructions);
let Some(data_row) = row_read_result? else {
continue;
};
data_rows.push(data_row);
}
return Ok((data_rows, rows_scanned));
};
loop {
#[cfg(feature = "diagnostics")]
let ((key_stream_local_instructions, read_result), key_stream_micros) =
measure_execution_stats_phase(|| {
measure_direct_data_row_phase(|| key_stream.next_key())
});
#[cfg(not(feature = "diagnostics"))]
let (read_result, key_stream_micros) =
measure_execution_stats_phase(|| key_stream.next_key());
record_key_stream_micros(key_stream_micros);
let Some(key) = read_result? else {
break;
};
#[cfg(feature = "diagnostics")]
record_direct_data_row_key_stream_local_instructions(key_stream_local_instructions);
record_key_stream_yield();
rows_scanned = rows_scanned.saturating_add(1);
#[cfg(feature = "diagnostics")]
let (row_read_local_instructions, row_read_result) =
measure_direct_data_row_phase(|| read_data_row(key));
#[cfg(not(feature = "diagnostics"))]
let row_read_result = read_data_row(key);
#[cfg(feature = "diagnostics")]
record_direct_data_row_row_read_local_instructions(row_read_local_instructions);
let Some(data_row) = row_read_result? else {
continue;
};
data_rows.push(data_row);
if data_rows.len() >= row_keep_cap {
break;
}
}
Ok((data_rows, rows_scanned))
}
fn scan_data_rows_direct_with_filter_program(
key_stream: &mut OrderedKeyStreamBox,
consistency: MissingRowPolicy,
row_keep_cap: Option<usize>,
row_runtime: &ScalarRowRuntimeHandle<'_>,
filter_program: &EffectiveRuntimeFilterProgram,
retained_slot_layout: &RetainedSlotLayout,
) -> Result<(Vec<DataRow>, usize), InternalError> {
scan_data_rows_direct_with_reader(key_stream, row_keep_cap, |key| {
row_runtime.read_data_row_with_filter_program(
consistency,
key,
filter_program,
retained_slot_layout,
)
})
}
pub(super) fn scan_materialized_order_direct_data_rows(
key_stream: &mut OrderedKeyStreamBox,
scan_budget_hint: Option<usize>,
consistency: MissingRowPolicy,
residual_filter_scan_mode: ResidualFilterScanMode,
row_runtime: &ScalarRowRuntimeHandle<'_>,
residual_filter_program: Option<&EffectiveRuntimeFilterProgram>,
retained_slot_layout: Option<&RetainedSlotLayout>,
) -> Result<(Vec<DataRow>, usize), InternalError> {
scan_direct_data_rows_with_residual_policy(
key_stream,
scan_budget_hint,
None,
consistency,
residual_filter_scan_mode,
row_runtime,
residual_filter_program,
retained_slot_layout,
"materialized-order direct data-row path cannot defer residual filtering",
)
}
#[expect(clippy::too_many_arguments)]
pub(super) fn scan_direct_data_rows_with_residual_policy(
key_stream: &mut OrderedKeyStreamBox,
scan_budget_hint: Option<usize>,
row_keep_cap: Option<usize>,
consistency: MissingRowPolicy,
residual_filter_scan_mode: ResidualFilterScanMode,
row_runtime: &ScalarRowRuntimeHandle<'_>,
residual_filter_program: Option<&EffectiveRuntimeFilterProgram>,
retained_slot_layout: Option<&RetainedSlotLayout>,
deferred_filtering_message: &'static str,
) -> Result<(Vec<DataRow>, usize), InternalError> {
execute_scalar_data_row_read_loop(
key_stream,
scan_budget_hint,
row_keep_cap,
|key_stream, row_keep_cap| match residual_filter_scan_mode {
ResidualFilterScanMode::Absent => {
scan_data_rows_direct(key_stream, consistency, row_keep_cap, row_runtime)
}
ResidualFilterScanMode::AppliedDuringScan => {
let filter_program = residual_filter_program.ok_or_else(|| {
InternalError::query_executor_invariant(
"scan-time residual filtering requires one compiled residual filter program",
)
})?;
let retained_slot_layout = retained_slot_layout.ok_or_else(|| {
InternalError::query_executor_invariant(
"scan-time residual filtering requires one retained-slot layout",
)
})?;
scan_data_rows_direct_with_filter_program(
key_stream,
consistency,
row_keep_cap,
row_runtime,
filter_program,
retained_slot_layout,
)
}
ResidualFilterScanMode::DeferredPostAccess => Err(
InternalError::query_executor_invariant(deferred_filtering_message),
),
},
)
}
fn scan_data_rows_only_into_kernel(
key_stream: &mut OrderedKeyStreamBox,
consistency: MissingRowPolicy,
row_keep_cap: Option<usize>,
row_runtime: &ScalarRowRuntimeHandle<'_>,
) -> Result<(Vec<KernelRow>, usize), InternalError> {
scan_kernel_rows_with(key_stream, row_keep_cap, |key| {
row_runtime.read_data_row_only(consistency, key)
})
}
fn scan_full_retained_rows_into_kernel(
key_stream: &mut OrderedKeyStreamBox,
consistency: MissingRowPolicy,
retained_slot_layout: &RetainedSlotLayout,
row_keep_cap: Option<usize>,
row_runtime: &ScalarRowRuntimeHandle<'_>,
) -> Result<(Vec<KernelRow>, usize), InternalError> {
scan_full_retained_rows_into_kernel_with_reader(key_stream, row_keep_cap, |key| {
row_runtime.read_full_row_retained(consistency, key, retained_slot_layout)
})
}
fn scan_full_retained_rows_into_kernel_with_reader(
key_stream: &mut OrderedKeyStreamBox,
row_keep_cap: Option<usize>,
read_row: impl FnMut(DataKey) -> Result<Option<KernelRow>, InternalError>,
) -> Result<(Vec<KernelRow>, usize), InternalError> {
scan_kernel_rows_with(key_stream, row_keep_cap, read_row)
}
fn scan_full_retained_rows_into_kernel_with_filter_program(
key_stream: &mut OrderedKeyStreamBox,
consistency: MissingRowPolicy,
filter_program: &EffectiveRuntimeFilterProgram,
retained_slot_layout: &RetainedSlotLayout,
row_keep_cap: Option<usize>,
row_runtime: &ScalarRowRuntimeHandle<'_>,
) -> Result<(Vec<KernelRow>, usize), InternalError> {
scan_full_retained_rows_into_kernel_with_reader(key_stream, row_keep_cap, |key| {
row_runtime.read_full_row_retained_with_filter_program(
consistency,
key,
filter_program,
retained_slot_layout,
)
})
}
fn scan_slot_rows_into_kernel(
key_stream: &mut OrderedKeyStreamBox,
consistency: MissingRowPolicy,
retained_slot_layout: &RetainedSlotLayout,
row_keep_cap: Option<usize>,
row_runtime: &ScalarRowRuntimeHandle<'_>,
) -> Result<(Vec<KernelRow>, usize), InternalError> {
scan_slot_rows_into_kernel_with_reader(key_stream, row_keep_cap, |key| {
row_runtime.read_slot_only(consistency, &key, retained_slot_layout)
})
}
fn scan_slot_rows_into_kernel_with_reader(
key_stream: &mut OrderedKeyStreamBox,
row_keep_cap: Option<usize>,
read_row: impl FnMut(DataKey) -> Result<Option<KernelRow>, InternalError>,
) -> Result<(Vec<KernelRow>, usize), InternalError> {
scan_kernel_rows_with(key_stream, row_keep_cap, read_row)
}
fn scan_slot_rows_into_kernel_with_filter_program(
key_stream: &mut OrderedKeyStreamBox,
consistency: MissingRowPolicy,
filter_program: &EffectiveRuntimeFilterProgram,
retained_slot_layout: &RetainedSlotLayout,
row_keep_cap: Option<usize>,
row_runtime: &ScalarRowRuntimeHandle<'_>,
) -> Result<(Vec<KernelRow>, usize), InternalError> {
scan_slot_rows_into_kernel_with_reader(key_stream, row_keep_cap, |key| {
row_runtime.read_slot_only_with_filter_program(
consistency,
&key,
filter_program,
retained_slot_layout,
)
})
}