use crate::{
db::{
data::{DataRow, DecodedDataStoreKey},
executor::{
OrderedKeyStreamBox, ScalarContinuationContext, exact_output_key_count_hint,
key_stream_budget_is_redundant, measure_execution_stats_phase,
record_key_stream_micros, record_key_stream_yield,
route::LoadOrderRouteMode,
terminal::page::{
KernelRow, KernelRowScanStrategy, ResidualFilterScanMode, RetainedSlotLayout,
ScalarRowRuntimeHandle,
},
},
predicate::MissingRowPolicy,
query::plan::EffectiveRuntimeFilterProgram,
},
error::InternalError,
};
#[cfg(feature = "diagnostics")]
use super::metrics::{
measure_direct_data_row_phase, record_direct_data_row_key_stream_local_instructions,
record_direct_data_row_row_read_local_instructions,
};
#[cfg(feature = "diagnostics")]
use super::metrics::{
measure_kernel_row_phase, record_kernel_row_key_stream_local_instructions,
record_kernel_row_row_read_local_instructions, record_kernel_row_scan_local_instructions,
};
#[cfg(any(test, feature = "diagnostics"))]
use super::metrics::{
record_kernel_data_row_path_hit, record_kernel_full_row_retained_path_hit,
record_kernel_retained_slot_layout, record_kernel_slots_only_path_hit,
};
pub(super) struct DirectDataRowScanResult {
pub(super) rows: Vec<DataRow>,
pub(super) rows_scanned: usize,
pub(super) rows_matched: usize,
}
struct RowScanResult<T> {
rows: Vec<T>,
rows_scanned: usize,
rows_matched: usize,
}
pub(super) struct ScalarPageKernelRequest<'a, 'r> {
pub(super) key_stream: &'a mut OrderedKeyStreamBox,
pub(super) scan_budget_hint: Option<usize>,
pub(super) row_keep_cap: Option<usize>,
pub(super) load_order_route_mode: LoadOrderRouteMode,
pub(super) consistency: MissingRowPolicy,
pub(super) scan_strategy: KernelRowScanStrategy<'a>,
pub(super) continuation: &'a ScalarContinuationContext,
pub(super) row_runtime: &'r mut ScalarRowRuntimeHandle<'a>,
}
pub(in crate::db::executor) struct KernelRowScanRequest<'a, 'r> {
pub(in crate::db::executor) key_stream: &'a mut OrderedKeyStreamBox,
pub(in crate::db::executor) scan_budget_hint: Option<usize>,
pub(in crate::db::executor) consistency: MissingRowPolicy,
pub(in crate::db::executor) scan_strategy: KernelRowScanStrategy<'a>,
pub(in crate::db::executor) row_keep_cap: Option<usize>,
pub(in crate::db::executor) row_skip_count: usize,
pub(in crate::db::executor) row_runtime: &'r mut ScalarRowRuntimeHandle<'a>,
}
pub(in crate::db::executor) fn execute_kernel_row_scan(
request: KernelRowScanRequest<'_, '_>,
) -> Result<(Vec<KernelRow>, usize), InternalError> {
#[cfg(feature = "diagnostics")]
{
let (scan_local_instructions, result) =
measure_kernel_row_phase(|| execute_kernel_row_scan_inner(request));
record_kernel_row_scan_local_instructions(scan_local_instructions);
result
}
#[cfg(not(feature = "diagnostics"))]
execute_kernel_row_scan_inner(request)
}
#[expect(clippy::too_many_lines)]
fn execute_kernel_row_scan_inner(
request: KernelRowScanRequest<'_, '_>,
) -> Result<(Vec<KernelRow>, usize), InternalError> {
let KernelRowScanRequest {
key_stream,
scan_budget_hint,
consistency,
scan_strategy,
row_keep_cap,
row_skip_count,
row_runtime,
} = request;
match scan_strategy {
KernelRowScanStrategy::DataRows => {
#[cfg(any(test, feature = "diagnostics"))]
record_kernel_data_row_path_hit();
execute_scalar_read_loop(key_stream, scan_budget_hint, |key_stream| {
scan_data_rows_only_into_kernel(
key_stream,
consistency,
row_keep_cap,
row_skip_count,
row_runtime,
)
})
}
KernelRowScanStrategy::RetainedFullRows {
retained_slot_layout,
} => {
#[cfg(any(test, feature = "diagnostics"))]
{
record_kernel_full_row_retained_path_hit();
record_kernel_retained_slot_layout(retained_slot_layout);
}
execute_retained_kernel_scan(
key_stream,
scan_budget_hint,
Some(retained_slot_layout),
|key_stream, retained_slot_layout| {
scan_full_retained_rows_into_kernel(
key_stream,
consistency,
retained_slot_layout,
row_keep_cap,
row_skip_count,
row_runtime,
)
},
)
}
KernelRowScanStrategy::RetainedFullRowsFiltered {
filter_program,
retained_slot_layout,
} => {
#[cfg(any(test, feature = "diagnostics"))]
{
record_kernel_full_row_retained_path_hit();
record_kernel_retained_slot_layout(retained_slot_layout);
}
execute_retained_kernel_scan(
key_stream,
scan_budget_hint,
Some(retained_slot_layout),
|key_stream, retained_slot_layout| {
scan_full_retained_rows_into_kernel_with_filter_program(
key_stream,
consistency,
filter_program,
retained_slot_layout,
row_keep_cap,
row_skip_count,
row_runtime,
)
},
)
}
KernelRowScanStrategy::SlotOnlyRows {
retained_slot_layout,
} => {
#[cfg(any(test, feature = "diagnostics"))]
{
record_kernel_slots_only_path_hit();
record_kernel_retained_slot_layout(retained_slot_layout);
}
execute_retained_kernel_scan(
key_stream,
scan_budget_hint,
Some(retained_slot_layout),
|key_stream, retained_slot_layout| {
scan_slot_rows_into_kernel(
key_stream,
consistency,
retained_slot_layout,
row_keep_cap,
row_skip_count,
row_runtime,
)
},
)
}
KernelRowScanStrategy::SlotOnlyRowsFiltered {
filter_program,
retained_slot_layout,
} => {
#[cfg(any(test, feature = "diagnostics"))]
{
record_kernel_slots_only_path_hit();
record_kernel_retained_slot_layout(retained_slot_layout);
}
execute_retained_kernel_scan(
key_stream,
scan_budget_hint,
Some(retained_slot_layout),
|key_stream, retained_slot_layout| {
scan_slot_rows_into_kernel_with_filter_program(
key_stream,
consistency,
filter_program,
retained_slot_layout,
row_keep_cap,
row_skip_count,
row_runtime,
)
},
)
}
}
}
fn execute_retained_kernel_scan(
key_stream: &mut OrderedKeyStreamBox,
scan_budget_hint: Option<usize>,
retained_slot_layout: Option<&RetainedSlotLayout>,
mut scan_rows: impl FnMut(
&mut OrderedKeyStreamBox,
&RetainedSlotLayout,
) -> Result<(Vec<KernelRow>, usize), InternalError>,
) -> Result<(Vec<KernelRow>, usize), InternalError> {
let retained_slot_layout =
retained_slot_layout.ok_or_else(InternalError::query_executor_invariant)?;
execute_scalar_read_loop(key_stream, scan_budget_hint, |key_stream| {
scan_rows(key_stream, retained_slot_layout)
})
}
pub(super) fn execute_scalar_page_kernel_dyn(
request: ScalarPageKernelRequest<'_, '_>,
) -> Result<(Vec<KernelRow>, usize), InternalError> {
let ScalarPageKernelRequest {
key_stream,
scan_budget_hint,
row_keep_cap,
load_order_route_mode,
consistency,
scan_strategy,
continuation,
row_runtime,
} = request;
continuation.validate_load_scan_budget_hint(scan_budget_hint, load_order_route_mode)?;
execute_kernel_row_scan(KernelRowScanRequest {
key_stream,
scan_budget_hint,
consistency,
scan_strategy,
row_keep_cap,
row_skip_count: 0,
row_runtime,
})
}
fn execute_scalar_read_loop<T>(
key_stream: &mut OrderedKeyStreamBox,
scan_budget_hint: Option<usize>,
mut scan_rows: impl FnMut(&mut OrderedKeyStreamBox) -> Result<T, InternalError>,
) -> Result<T, InternalError> {
if let Some(scan_budget) = scan_budget_hint
&& !key_stream_budget_is_redundant(key_stream, scan_budget)
{
let inner = std::mem::replace(key_stream, OrderedKeyStreamBox::empty());
*key_stream = OrderedKeyStreamBox::budgeted(inner, scan_budget);
return scan_rows(key_stream);
}
scan_rows(key_stream)
}
fn scan_kernel_rows_with(
key_stream: &mut OrderedKeyStreamBox,
row_keep_cap: Option<usize>,
row_skip_count: usize,
mut read_row: impl FnMut(DecodedDataStoreKey) -> Result<Option<KernelRow>, InternalError>,
) -> Result<(Vec<KernelRow>, usize), InternalError> {
let result = scan_rows_with(
key_stream,
row_keep_cap,
row_skip_count,
next_kernel_scan_key,
|key| read_kernel_scan_row(key, &mut read_row),
)?;
Ok((result.rows, result.rows_scanned))
}
fn scan_rows_with<T>(
key_stream: &mut OrderedKeyStreamBox,
row_keep_cap: Option<usize>,
row_skip_count: usize,
mut next_key: impl FnMut(
&mut OrderedKeyStreamBox,
) -> Result<Option<DecodedDataStoreKey>, InternalError>,
mut read_row: impl FnMut(DecodedDataStoreKey) -> Result<Option<T>, InternalError>,
) -> Result<RowScanResult<T>, InternalError> {
let mut rows_scanned = 0usize;
let staged_capacity = staged_row_capacity(key_stream, row_keep_cap, row_skip_count);
let mut rows = Vec::with_capacity(staged_capacity);
let mut rows_matched = 0usize;
let Some(row_keep_cap) = row_keep_cap else {
while let Some(key) = next_key(key_stream)? {
record_key_stream_yield();
rows_scanned = rows_scanned.saturating_add(1);
let Some(row) = read_row(key)? else {
continue;
};
retain_scanned_row(row, row_skip_count, rows_matched, &mut rows);
rows_matched = rows_matched.saturating_add(1);
}
return Ok(RowScanResult {
rows,
rows_scanned,
rows_matched,
});
};
while let Some(key) = next_key(key_stream)? {
record_key_stream_yield();
rows_scanned = rows_scanned.saturating_add(1);
let Some(row) = read_row(key)? else {
continue;
};
retain_scanned_row(row, row_skip_count, rows_matched, &mut rows);
rows_matched = rows_matched.saturating_add(1);
if rows_matched >= row_keep_cap {
break;
}
}
Ok(RowScanResult {
rows,
rows_scanned,
rows_matched,
})
}
fn next_kernel_scan_key(
key_stream: &mut OrderedKeyStreamBox,
) -> Result<Option<DecodedDataStoreKey>, InternalError> {
#[cfg(feature = "diagnostics")]
let ((key_stream_local_instructions, next_key), key_stream_micros) =
measure_execution_stats_phase(|| measure_kernel_row_phase(|| key_stream.next_key()));
#[cfg(not(feature = "diagnostics"))]
let (next_key, key_stream_micros) = measure_execution_stats_phase(|| key_stream.next_key());
record_key_stream_micros(key_stream_micros);
#[cfg(feature = "diagnostics")]
record_kernel_row_key_stream_local_instructions(key_stream_local_instructions);
next_key
}
fn read_kernel_scan_row(
key: DecodedDataStoreKey,
read_row: &mut impl FnMut(DecodedDataStoreKey) -> Result<Option<KernelRow>, InternalError>,
) -> Result<Option<KernelRow>, InternalError> {
#[cfg(feature = "diagnostics")]
let (row_read_local_instructions, row) = measure_kernel_row_phase(|| read_row(key));
#[cfg(not(feature = "diagnostics"))]
let row = read_row(key);
#[cfg(feature = "diagnostics")]
record_kernel_row_row_read_local_instructions(row_read_local_instructions);
row
}
fn staged_row_capacity(
key_stream: &OrderedKeyStreamBox,
row_keep_cap: Option<usize>,
row_skip_count: usize,
) -> usize {
row_keep_cap
.map(|row_keep_cap| row_keep_cap.saturating_sub(row_skip_count))
.or_else(|| {
exact_output_key_count_hint(key_stream, row_keep_cap)
.map(|hint| hint.saturating_sub(row_skip_count))
})
.unwrap_or(0)
}
fn retain_scanned_row<T>(row: T, row_skip_count: usize, rows_matched: usize, rows: &mut Vec<T>) {
if rows_matched >= row_skip_count {
rows.push(row);
}
}
fn scan_data_rows_direct(
key_stream: &mut OrderedKeyStreamBox,
consistency: MissingRowPolicy,
row_keep_cap: Option<usize>,
row_skip_count: usize,
row_runtime: &ScalarRowRuntimeHandle<'_>,
) -> Result<DirectDataRowScanResult, InternalError> {
scan_data_rows_direct_with_reader(key_stream, row_keep_cap, row_skip_count, |key| {
row_runtime.read_data_row(consistency, key)
})
}
fn scan_data_rows_direct_with_reader(
key_stream: &mut OrderedKeyStreamBox,
row_keep_cap: Option<usize>,
row_skip_count: usize,
mut read_data_row: impl FnMut(DecodedDataStoreKey) -> Result<Option<DataRow>, InternalError>,
) -> Result<DirectDataRowScanResult, InternalError> {
let result = scan_rows_with(
key_stream,
row_keep_cap,
row_skip_count,
next_direct_data_row_scan_key,
|key| read_direct_data_row_scan_row(key, &mut read_data_row),
)?;
Ok(DirectDataRowScanResult {
rows: result.rows,
rows_scanned: result.rows_scanned,
rows_matched: result.rows_matched,
})
}
fn next_direct_data_row_scan_key(
key_stream: &mut OrderedKeyStreamBox,
) -> Result<Option<DecodedDataStoreKey>, InternalError> {
#[cfg(feature = "diagnostics")]
let ((key_stream_local_instructions, read_result), key_stream_micros) =
measure_execution_stats_phase(|| measure_direct_data_row_phase(|| key_stream.next_key()));
#[cfg(not(feature = "diagnostics"))]
let (read_result, key_stream_micros) = measure_execution_stats_phase(|| key_stream.next_key());
record_key_stream_micros(key_stream_micros);
#[cfg(feature = "diagnostics")]
record_direct_data_row_key_stream_local_instructions(key_stream_local_instructions);
read_result
}
fn read_direct_data_row_scan_row(
key: DecodedDataStoreKey,
read_data_row: &mut impl FnMut(DecodedDataStoreKey) -> Result<Option<DataRow>, InternalError>,
) -> Result<Option<DataRow>, InternalError> {
#[cfg(feature = "diagnostics")]
let (row_read_local_instructions, row_read_result) =
measure_direct_data_row_phase(|| read_data_row(key));
#[cfg(not(feature = "diagnostics"))]
let row_read_result = read_data_row(key);
#[cfg(feature = "diagnostics")]
record_direct_data_row_row_read_local_instructions(row_read_local_instructions);
row_read_result
}
fn scan_data_rows_direct_with_filter_program(
key_stream: &mut OrderedKeyStreamBox,
consistency: MissingRowPolicy,
row_keep_cap: Option<usize>,
row_skip_count: usize,
row_runtime: &ScalarRowRuntimeHandle<'_>,
filter_program: &EffectiveRuntimeFilterProgram,
retained_slot_layout: &RetainedSlotLayout,
) -> Result<DirectDataRowScanResult, InternalError> {
scan_data_rows_direct_with_reader(key_stream, row_keep_cap, row_skip_count, |key| {
row_runtime.read_data_row_with_filter_program(
consistency,
key,
filter_program,
retained_slot_layout,
)
})
}
pub(super) fn scan_materialized_order_direct_data_rows(
key_stream: &mut OrderedKeyStreamBox,
scan_budget_hint: Option<usize>,
consistency: MissingRowPolicy,
residual_filter_scan_mode: ResidualFilterScanMode,
row_runtime: &ScalarRowRuntimeHandle<'_>,
residual_filter_program: Option<&EffectiveRuntimeFilterProgram>,
retained_slot_layout: Option<&RetainedSlotLayout>,
) -> Result<DirectDataRowScanResult, InternalError> {
scan_direct_data_rows_with_residual_policy(
key_stream,
scan_budget_hint,
None,
0,
consistency,
residual_filter_scan_mode,
row_runtime,
residual_filter_program,
retained_slot_layout,
)
}
#[expect(clippy::too_many_arguments)]
pub(super) fn scan_direct_data_rows_with_residual_policy(
key_stream: &mut OrderedKeyStreamBox,
scan_budget_hint: Option<usize>,
row_keep_cap: Option<usize>,
row_skip_count: usize,
consistency: MissingRowPolicy,
residual_filter_scan_mode: ResidualFilterScanMode,
row_runtime: &ScalarRowRuntimeHandle<'_>,
residual_filter_program: Option<&EffectiveRuntimeFilterProgram>,
retained_slot_layout: Option<&RetainedSlotLayout>,
) -> Result<DirectDataRowScanResult, InternalError> {
if row_keep_cap == Some(0) {
return Ok(DirectDataRowScanResult {
rows: Vec::new(),
rows_scanned: 0,
rows_matched: 0,
});
}
execute_scalar_read_loop(key_stream, scan_budget_hint, |key_stream| {
match residual_filter_scan_mode {
ResidualFilterScanMode::Absent => scan_data_rows_direct(
key_stream,
consistency,
row_keep_cap,
row_skip_count,
row_runtime,
),
ResidualFilterScanMode::AppliedDuringScan => {
let filter_program =
residual_filter_program.ok_or_else(InternalError::query_executor_invariant)?;
let retained_slot_layout =
retained_slot_layout.ok_or_else(InternalError::query_executor_invariant)?;
scan_data_rows_direct_with_filter_program(
key_stream,
consistency,
row_keep_cap,
row_skip_count,
row_runtime,
filter_program,
retained_slot_layout,
)
}
ResidualFilterScanMode::DeferredPostAccess => {
Err(InternalError::query_executor_invariant())
}
}
})
}
fn scan_data_rows_only_into_kernel(
key_stream: &mut OrderedKeyStreamBox,
consistency: MissingRowPolicy,
row_keep_cap: Option<usize>,
row_skip_count: usize,
row_runtime: &ScalarRowRuntimeHandle<'_>,
) -> Result<(Vec<KernelRow>, usize), InternalError> {
scan_kernel_rows_with(key_stream, row_keep_cap, row_skip_count, |key| {
row_runtime.read_data_row_only(consistency, key)
})
}
fn scan_full_retained_rows_into_kernel(
key_stream: &mut OrderedKeyStreamBox,
consistency: MissingRowPolicy,
retained_slot_layout: &RetainedSlotLayout,
row_keep_cap: Option<usize>,
row_skip_count: usize,
row_runtime: &ScalarRowRuntimeHandle<'_>,
) -> Result<(Vec<KernelRow>, usize), InternalError> {
scan_full_retained_rows_into_kernel_with_reader(
key_stream,
row_keep_cap,
row_skip_count,
|key| row_runtime.read_full_row_retained(consistency, key, retained_slot_layout),
)
}
fn scan_full_retained_rows_into_kernel_with_reader(
key_stream: &mut OrderedKeyStreamBox,
row_keep_cap: Option<usize>,
row_skip_count: usize,
read_row: impl FnMut(DecodedDataStoreKey) -> Result<Option<KernelRow>, InternalError>,
) -> Result<(Vec<KernelRow>, usize), InternalError> {
scan_kernel_rows_with(key_stream, row_keep_cap, row_skip_count, read_row)
}
fn scan_full_retained_rows_into_kernel_with_filter_program(
key_stream: &mut OrderedKeyStreamBox,
consistency: MissingRowPolicy,
filter_program: &EffectiveRuntimeFilterProgram,
retained_slot_layout: &RetainedSlotLayout,
row_keep_cap: Option<usize>,
row_skip_count: usize,
row_runtime: &ScalarRowRuntimeHandle<'_>,
) -> Result<(Vec<KernelRow>, usize), InternalError> {
scan_full_retained_rows_into_kernel_with_reader(
key_stream,
row_keep_cap,
row_skip_count,
|key| {
row_runtime.read_full_row_retained_with_filter_program(
consistency,
key,
filter_program,
retained_slot_layout,
)
},
)
}
fn scan_slot_rows_into_kernel(
key_stream: &mut OrderedKeyStreamBox,
consistency: MissingRowPolicy,
retained_slot_layout: &RetainedSlotLayout,
row_keep_cap: Option<usize>,
row_skip_count: usize,
row_runtime: &ScalarRowRuntimeHandle<'_>,
) -> Result<(Vec<KernelRow>, usize), InternalError> {
scan_slot_rows_into_kernel_with_reader(key_stream, row_keep_cap, row_skip_count, |key| {
row_runtime.read_slot_only(consistency, &key, retained_slot_layout)
})
}
fn scan_slot_rows_into_kernel_with_reader(
key_stream: &mut OrderedKeyStreamBox,
row_keep_cap: Option<usize>,
row_skip_count: usize,
read_row: impl FnMut(DecodedDataStoreKey) -> Result<Option<KernelRow>, InternalError>,
) -> Result<(Vec<KernelRow>, usize), InternalError> {
scan_kernel_rows_with(key_stream, row_keep_cap, row_skip_count, read_row)
}
fn scan_slot_rows_into_kernel_with_filter_program(
key_stream: &mut OrderedKeyStreamBox,
consistency: MissingRowPolicy,
filter_program: &EffectiveRuntimeFilterProgram,
retained_slot_layout: &RetainedSlotLayout,
row_keep_cap: Option<usize>,
row_skip_count: usize,
row_runtime: &ScalarRowRuntimeHandle<'_>,
) -> Result<(Vec<KernelRow>, usize), InternalError> {
scan_slot_rows_into_kernel_with_reader(key_stream, row_keep_cap, row_skip_count, |key| {
row_runtime.read_slot_only_with_filter_program(
consistency,
&key,
filter_program,
retained_slot_layout,
)
})
}