use crate::{
db::{
data::{DataKey, RawRow, StorageKey},
executor::{
ExecutionOptimization, ExecutionPreparation,
aggregate::field::{
AggregateFieldValueError, FieldSlot, extract_orderable_field_value_with_slot_reader,
},
pipeline::contracts::{GroupedCursorPage, ResolvedExecutionKeyStream},
projection::eval_effective_runtime_filter_program_with_value_cow_reader,
terminal::{RetainedSlotLayout, RetainedSlotRow, RowDecoder, RowLayout},
},
predicate::MissingRowPolicy,
query::plan::{
EffectiveRuntimeFilterProgram, FieldSlot as PlannedFieldSlot,
GroupedAggregateExecutionSpec, GroupedDistinctExecutionStrategy,
expr::CompiledExprValueReader,
},
registry::StoreHandle,
},
error::InternalError,
model::field::FieldModel,
value::Value,
};
use std::borrow::Cow;
pub(in crate::db::executor) struct RowView {
storage: RowViewStorage,
}
pub(in crate::db::executor) fn compile_grouped_row_slot_layout_from_parts(
row_layout: RowLayout,
group_fields: &[PlannedFieldSlot],
grouped_aggregate_execution_specs: &[GroupedAggregateExecutionSpec],
grouped_distinct_execution_strategy: &GroupedDistinctExecutionStrategy,
effective_runtime_filter_program: Option<&EffectiveRuntimeFilterProgram>,
) -> RetainedSlotLayout {
let field_count = row_layout.field_count();
let mut required_slots = vec![false; field_count];
for field in group_fields {
if let Some(required_slot) = required_slots.get_mut(field.index()) {
*required_slot = true;
}
}
if let Some(effective_runtime_filter_program) = effective_runtime_filter_program {
effective_runtime_filter_program.mark_referenced_slots(&mut required_slots);
}
for aggregate in grouped_aggregate_execution_specs {
if let Some(target_slot) = aggregate.target_slot()
&& let Some(required_slot) = required_slots.get_mut(target_slot.index())
{
*required_slot = true;
}
if let Some(compiled_input_expr) = aggregate.compiled_input_expr() {
compiled_input_expr.mark_referenced_slots(&mut required_slots);
}
if let Some(compiled_filter_expr) = aggregate.compiled_filter_expr() {
compiled_filter_expr.mark_referenced_slots(&mut required_slots);
}
}
if let Some(target_field) = grouped_distinct_execution_strategy.global_distinct_target_slot()
&& let Some(required_slot) = required_slots.get_mut(target_field.index())
{
*required_slot = true;
}
RetainedSlotLayout::compile(
field_count,
required_slots
.into_iter()
.enumerate()
.filter_map(|(slot, required)| required.then_some(slot))
.collect(),
)
}
enum RowViewStorage {
#[cfg(test)]
Dense(Vec<Option<Value>>),
Single {
slot: usize,
value: Value,
},
Retained(RetainedSlotRow),
}
impl RowView {
fn missing_required_slot_error(index: usize) -> InternalError {
InternalError::query_executor_invariant(format!(
"grouped row view missing required slot value: index={index}",
))
}
#[must_use]
#[cfg(test)]
pub(in crate::db::executor) const fn new(slots: Vec<Option<Value>>) -> Self {
Self {
storage: RowViewStorage::Dense(slots),
}
}
#[must_use]
pub(in crate::db::executor) const fn from_retained_slots(row: RetainedSlotRow) -> Self {
Self {
storage: RowViewStorage::Retained(row),
}
}
#[must_use]
pub(in crate::db::executor) const fn from_single_value(slot: usize, value: Value) -> Self {
Self {
storage: RowViewStorage::Single { slot, value },
}
}
#[cfg(test)]
#[must_use]
pub(in crate::db::executor) fn borrow_slot_for_test(&self, index: usize) -> Option<&Value> {
match &self.storage {
RowViewStorage::Dense(slots) => slots.get(index).and_then(Option::as_ref),
RowViewStorage::Single { slot, value } => (*slot == index).then_some(value),
RowViewStorage::Retained(row) => row.slot_ref(index),
}
}
pub(in crate::db::executor) fn slot_value(&self, index: usize) -> Option<Cow<'_, Value>> {
match &self.storage {
#[cfg(test)]
RowViewStorage::Dense(slots) => {
slots.get(index).and_then(Option::as_ref).map(Cow::Borrowed)
}
RowViewStorage::Single { slot, value } => {
(*slot == index).then_some(Cow::Borrowed(value))
}
RowViewStorage::Retained(row) => row.slot_ref(index).map(Cow::Borrowed),
}
}
pub(in crate::db::executor) fn slot_value_ref(&self, index: usize) -> Option<&Value> {
match &self.storage {
#[cfg(test)]
RowViewStorage::Dense(slots) => slots.get(index).and_then(Option::as_ref),
RowViewStorage::Single { slot, value } => (*slot == index).then_some(value),
RowViewStorage::Retained(row) => row.slot_ref(index),
}
}
pub(in crate::db::executor) fn require_slot_value(
&self,
index: usize,
) -> Result<Cow<'_, Value>, InternalError> {
self.slot_value(index)
.ok_or_else(|| Self::missing_required_slot_error(index))
}
pub(in crate::db::executor) fn into_required_slot_value(
self,
index: usize,
) -> Result<Value, InternalError> {
match self.storage {
#[cfg(test)]
RowViewStorage::Dense(mut slots) => slots
.get_mut(index)
.and_then(Option::take)
.ok_or_else(|| Self::missing_required_slot_error(index)),
RowViewStorage::Single { slot, value } => {
if slot == index {
return Ok(value);
}
Err(Self::missing_required_slot_error(index))
}
RowViewStorage::Retained(mut row) => row
.take_slot(index)
.ok_or_else(|| Self::missing_required_slot_error(index)),
}
}
pub(in crate::db::executor) fn require_slot_owned(
&self,
index: usize,
) -> Result<Value, InternalError> {
match self.require_slot_value(index)? {
Cow::Borrowed(value) => Ok(value.clone()),
Cow::Owned(value) => Ok(value),
}
}
pub(in crate::db::executor) fn with_required_slot<R>(
&self,
index: usize,
f: impl FnOnce(&Value) -> Result<R, InternalError>,
) -> Result<R, InternalError> {
match self.require_slot_value(index)? {
Cow::Borrowed(value) => f(value),
Cow::Owned(value) => f(&value),
}
}
pub(in crate::db::executor) fn eval_filter_program(
&self,
effective_runtime_filter_program: &EffectiveRuntimeFilterProgram,
) -> Result<bool, InternalError> {
eval_effective_runtime_filter_program_with_value_cow_reader(
effective_runtime_filter_program,
&mut |slot| self.slot_value(slot),
"grouped row filter expression could not read slot",
)
}
pub(in crate::db::executor) fn extract_orderable_field_value(
&self,
target_field: &str,
field_slot: FieldSlot,
) -> Result<Value, InternalError> {
let mut value = Some(self.require_slot_owned(field_slot.index)?);
extract_orderable_field_value_with_slot_reader(target_field, field_slot, &mut |_| {
value.take()
})
.map_err(AggregateFieldValueError::into_internal_error)
}
pub(in crate::db::executor) fn group_values(
&self,
group_fields: &[PlannedFieldSlot],
) -> Result<Vec<Value>, InternalError> {
let mut values = Vec::with_capacity(group_fields.len());
for field in group_fields {
let value = self.require_slot_owned(field.index())?;
values.push(value);
}
Ok(values)
}
}
impl CompiledExprValueReader for RowView {
fn read_slot(&self, slot: usize) -> Option<Cow<'_, Value>> {
self.slot_value_ref(slot).map(Cow::Borrowed)
}
fn read_group_key(&self, _offset: usize) -> Option<Cow<'_, Value>> {
None
}
fn read_aggregate(&self, _index: usize) -> Option<Cow<'_, Value>> {
None
}
}
struct SingleGroupedSlotDecode {
slot: usize,
field: &'static FieldModel,
primary_key_field: &'static FieldModel,
}
enum GroupedRowDecodePath<'a> {
Single(&'a SingleGroupedSlotDecode),
Indexed,
}
pub(in crate::db::executor) struct StructuralGroupedRowRuntime {
store: StoreHandle,
row_layout: RowLayout,
grouped_slot_layout: RetainedSlotLayout,
single_grouped_slot_decode: Option<SingleGroupedSlotDecode>,
}
impl StructuralGroupedRowRuntime {
#[must_use]
pub(in crate::db::executor) fn new(
store: StoreHandle,
row_layout: RowLayout,
grouped_slot_layout: RetainedSlotLayout,
) -> Self {
let single_grouped_slot_decode = match grouped_slot_layout.required_slots() {
[required_slot] => {
let contract = row_layout.contract();
let field = contract
.fields()
.get(*required_slot)
.expect("grouped slot layout must reference one declared structural row field");
let primary_key_field = contract
.fields()
.get(contract.primary_key_slot())
.expect("structural row contract must retain one declared primary-key field");
Some(SingleGroupedSlotDecode {
slot: *required_slot,
field,
primary_key_field,
})
}
_ => None,
};
Self {
store,
row_layout,
grouped_slot_layout,
single_grouped_slot_decode,
}
}
fn row_view_from_data_row(&self, key: &DataKey, row: RawRow) -> Result<RowView, InternalError> {
match self.row_decode_path() {
GroupedRowDecodePath::Single(single_grouped_slot_decode) => self
.single_slot_row_view_from_data_row(
key.storage_key(),
row,
single_grouped_slot_decode,
),
GroupedRowDecodePath::Indexed => {
let retained_slots = RowDecoder::decode_retained_slots(
&self.row_layout,
key.storage_key(),
&row,
&self.grouped_slot_layout,
)?;
Ok(RowView::from_retained_slots(retained_slots))
}
}
}
fn single_slot_row_view_from_data_row(
&self,
expected_key: StorageKey,
row: RawRow,
single_grouped_slot_decode: &SingleGroupedSlotDecode,
) -> Result<RowView, InternalError> {
let value = self.decode_single_grouped_slot_value_from_raw_row(
expected_key,
&row,
single_grouped_slot_decode,
)?;
let value = value.ok_or_else(|| {
InternalError::query_executor_invariant(format!(
"single-slot grouped row decode returned no value: slot={}",
single_grouped_slot_decode.slot,
))
})?;
Ok(RowView::from_single_value(
single_grouped_slot_decode.slot,
value,
))
}
fn decode_single_grouped_slot_value_from_raw_row(
&self,
expected_key: StorageKey,
row: &RawRow,
single_grouped_slot_decode: &SingleGroupedSlotDecode,
) -> Result<Option<Value>, InternalError> {
RowDecoder::decode_required_slot_value_with_fields(
&self.row_layout,
expected_key,
row,
single_grouped_slot_decode.slot,
single_grouped_slot_decode.field,
single_grouped_slot_decode.primary_key_field,
)
}
fn row_decode_path(&self) -> GroupedRowDecodePath<'_> {
self.single_grouped_slot_decode
.as_ref()
.map_or(GroupedRowDecodePath::Indexed, GroupedRowDecodePath::Single)
}
fn matching_single_grouped_slot_decode(
&self,
required_slot: usize,
) -> Option<&SingleGroupedSlotDecode> {
self.single_grouped_slot_decode
.as_ref()
.filter(|single_grouped_slot_decode| single_grouped_slot_decode.slot == required_slot)
}
fn read_data_row(
&self,
consistency: MissingRowPolicy,
key: &DataKey,
) -> Result<Option<RawRow>, InternalError> {
let raw_key = key.to_raw()?;
let row = self.store.with_data(|store| store.get(&raw_key));
match (consistency, row) {
(MissingRowPolicy::Ignore, None) => Ok(None),
(MissingRowPolicy::Ignore | MissingRowPolicy::Error, Some(row)) => Ok(Some(row)),
(MissingRowPolicy::Error, None) => {
Err(crate::db::executor::ExecutorError::missing_row(key).into())
}
}
}
pub(in crate::db::executor) fn read_single_group_value(
&self,
consistency: MissingRowPolicy,
key: &DataKey,
required_slot: usize,
) -> Result<Option<Value>, InternalError> {
let Some(row) = self.read_data_row(consistency, key)? else {
return Ok(None);
};
if let Some(single_grouped_slot_decode) =
self.matching_single_grouped_slot_decode(required_slot)
{
return self.decode_single_grouped_slot_value_from_raw_row(
key.storage_key(),
&row,
single_grouped_slot_decode,
);
}
let row_view = self.row_view_from_data_row(key, row)?;
row_view.into_required_slot_value(required_slot).map(Some)
}
pub(in crate::db::executor) fn read_row_view(
&self,
consistency: MissingRowPolicy,
key: &DataKey,
) -> Result<Option<RowView>, InternalError> {
self.read_data_row(consistency, key)?
.map(|row| self.row_view_from_data_row(key, row))
.transpose()
}
}
pub(in crate::db::executor) struct GroupedStreamStage {
row_runtime: StructuralGroupedRowRuntime,
execution_preparation: ExecutionPreparation,
resolved: ResolvedExecutionKeyStream,
}
impl GroupedStreamStage {
pub(in crate::db::executor) const fn new(
row_runtime: StructuralGroupedRowRuntime,
execution_preparation: ExecutionPreparation,
resolved: ResolvedExecutionKeyStream,
) -> Self {
Self {
row_runtime,
execution_preparation,
resolved,
}
}
pub(in crate::db::executor) const fn parts_mut(
&mut self,
) -> (
&StructuralGroupedRowRuntime,
&ExecutionPreparation,
&mut ResolvedExecutionKeyStream,
) {
(
&self.row_runtime,
&self.execution_preparation,
&mut self.resolved,
)
}
#[must_use]
pub(in crate::db::executor) fn cheap_access_candidate_count_hint(&self) -> Option<usize> {
self.resolved.cheap_access_candidate_count_hint()
}
}
pub(in crate::db::executor) struct GroupedFoldStage {
page: GroupedCursorPage,
filtered_rows: usize,
check_filtered_rows_upper_bound: bool,
rows_scanned: usize,
optimization: Option<ExecutionOptimization>,
index_predicate_applied: bool,
index_predicate_keys_rejected: u64,
distinct_keys_deduped: u64,
}
impl GroupedFoldStage {
pub(in crate::db::executor) fn from_grouped_stream(
page: GroupedCursorPage,
filtered_rows: usize,
check_filtered_rows_upper_bound: bool,
stream: &GroupedStreamStage,
scanned_rows_fallback: usize,
) -> Self {
Self {
page,
filtered_rows,
check_filtered_rows_upper_bound,
rows_scanned: stream
.resolved
.rows_scanned_override()
.unwrap_or(scanned_rows_fallback),
optimization: stream.resolved.optimization(),
index_predicate_applied: stream.resolved.index_predicate_applied(),
index_predicate_keys_rejected: stream.resolved.index_predicate_keys_rejected(),
distinct_keys_deduped: stream.resolved.distinct_keys_deduped(),
}
}
pub(in crate::db::executor) const fn rows_returned(&self) -> usize {
self.page.rows.len()
}
pub(in crate::db::executor) const fn optimization(&self) -> Option<ExecutionOptimization> {
self.optimization
}
pub(in crate::db::executor) const fn rows_scanned(&self) -> usize {
self.rows_scanned
}
pub(in crate::db::executor) const fn index_predicate_applied(&self) -> bool {
self.index_predicate_applied
}
pub(in crate::db::executor) const fn index_predicate_keys_rejected(&self) -> u64 {
self.index_predicate_keys_rejected
}
pub(in crate::db::executor) const fn distinct_keys_deduped(&self) -> u64 {
self.distinct_keys_deduped
}
pub(in crate::db::executor) const fn should_check_filtered_rows_upper_bound(&self) -> bool {
self.check_filtered_rows_upper_bound
}
pub(in crate::db::executor) const fn filtered_rows(&self) -> usize {
self.filtered_rows
}
pub(in crate::db::executor) fn into_page(self) -> GroupedCursorPage {
self.page
}
}
#[cfg(test)]
mod tests {
use crate::{
db::executor::{
pipeline::runtime::RowView,
terminal::{RetainedSlotLayout, RetainedSlotRow},
},
value::Value,
};
#[test]
fn dense_test_row_view_resolves_sparse_slots() {
let row_view = RowView::new(vec![
None,
Some(Value::Uint(7)),
None,
None,
Some(Value::Text("group".to_string())),
None,
]);
assert_eq!(row_view.borrow_slot_for_test(1), Some(&Value::Uint(7)));
assert_eq!(
row_view.borrow_slot_for_test(4),
Some(&Value::Text("group".to_string()))
);
assert_eq!(row_view.borrow_slot_for_test(0), None);
}
#[test]
fn single_slot_row_view_resolves_only_its_declared_slot() {
let row_view = RowView::from_single_value(4, Value::Text("group".to_string()));
assert_eq!(
row_view.borrow_slot_for_test(4),
Some(&Value::Text("group".to_string()))
);
assert_eq!(row_view.borrow_slot_for_test(1), None);
}
#[test]
fn retained_row_view_slot_reads_are_repeatable_borrows() {
let layout = RetainedSlotLayout::compile(5, vec![1, 4]);
let retained = RetainedSlotRow::from_indexed_values(
&layout,
vec![Some(Value::Uint(7)), Some(Value::Text("group".to_string()))],
);
let row_view = RowView::from_retained_slots(retained);
assert_eq!(
row_view.slot_value(1).as_deref(),
Some(&Value::Uint(7)),
"first retained slot read should borrow the decoded value",
);
assert_eq!(
row_view.slot_value(1).as_deref(),
Some(&Value::Uint(7)),
"second retained slot read must see the same decoded value",
);
assert_eq!(
row_view.slot_value(4).as_deref(),
Some(&Value::Text("group".to_string())),
"reading another retained slot must not invalidate earlier slots",
);
}
}