#[cfg(test)]
use crate::db::executor::planning::route::LoadTerminalFastPathContract;
use crate::{
db::{
access::AccessPlan,
cursor::{ContinuationSignature, CursorPlanError, GroupedPlannedCursor, PlannedCursor},
executor::{
EntityAuthority, ExecutionPreparation, ExecutorPlanError, GroupedPaginationWindow,
LoweredIndexPrefixSpec, LoweredIndexRangeSpec,
explain::assemble_load_execution_node_descriptor, lower_index_prefix_specs,
lower_index_range_specs, planning::preparation::slot_map_for_model_plan,
traversal::row_read_consistency_for_plan,
},
predicate::MissingRowPolicy,
query::explain::ExplainExecutionNodeDescriptor,
query::plan::{
AccessPlannedQuery, ExecutionOrdering, GroupSpec, OrderSpec,
PlannedContinuationContract, QueryMode, constant_covering_projection_value_from_access,
covering_index_projection_context,
},
},
error::InternalError,
traits::{EntityKind, EntityValue},
};
use std::marker::PhantomData;
#[cfg(test)]
use std::ops::Bound;
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum ExecutionFamily {
PrimaryKey,
Ordered,
Grouped,
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(in crate::db) enum BytesByProjectionMode {
Materialized,
CoveringIndex,
CoveringConstant,
}
#[must_use]
pub(in crate::db::executor) fn classify_bytes_by_projection_mode(
access: &AccessPlan<crate::value::Value>,
order_spec: Option<&OrderSpec>,
consistency: MissingRowPolicy,
has_predicate: bool,
target_field: &str,
primary_key_name: &'static str,
) -> BytesByProjectionMode {
if !matches!(consistency, MissingRowPolicy::Ignore) {
return BytesByProjectionMode::Materialized;
}
if constant_covering_projection_value_from_access(access, target_field).is_some() {
return BytesByProjectionMode::CoveringConstant;
}
if has_predicate {
return BytesByProjectionMode::Materialized;
}
if covering_index_projection_context(access, order_spec, target_field, primary_key_name)
.is_some()
{
return BytesByProjectionMode::CoveringIndex;
}
BytesByProjectionMode::Materialized
}
#[derive(Debug)]
struct PreparedExecutionPlanCore {
plan: AccessPlannedQuery,
continuation: Option<PlannedContinuationContract>,
index_prefix_specs: Vec<LoweredIndexPrefixSpec>,
index_prefix_spec_invalid: bool,
index_range_specs: Vec<LoweredIndexRangeSpec>,
index_range_spec_invalid: bool,
}
impl PreparedExecutionPlanCore {
#[must_use]
const fn new(
plan: AccessPlannedQuery,
continuation: Option<PlannedContinuationContract>,
index_prefix_specs: Vec<LoweredIndexPrefixSpec>,
index_prefix_spec_invalid: bool,
index_range_specs: Vec<LoweredIndexRangeSpec>,
index_range_spec_invalid: bool,
) -> Self {
Self {
plan,
continuation,
index_prefix_specs,
index_prefix_spec_invalid,
index_range_specs,
index_range_spec_invalid,
}
}
#[must_use]
const fn plan(&self) -> &AccessPlannedQuery {
&self.plan
}
#[must_use]
const fn mode(&self) -> QueryMode {
self.plan.scalar_plan().mode
}
#[must_use]
const fn is_grouped(&self) -> bool {
match self.continuation {
Some(ref contract) => contract.is_grouped(),
None => false,
}
}
fn execution_ordering(&self) -> Result<ExecutionOrdering, InternalError> {
let contract = self.continuation_contract()?;
Ok(contract.order_contract().ordering().clone())
}
fn execution_family(&self) -> Result<ExecutionFamily, InternalError> {
let ordering = self.execution_ordering()?;
Ok(match ordering {
ExecutionOrdering::PrimaryKey => ExecutionFamily::PrimaryKey,
ExecutionOrdering::Explicit(_) => ExecutionFamily::Ordered,
ExecutionOrdering::Grouped(_) => ExecutionFamily::Grouped,
})
}
#[must_use]
const fn consistency(&self) -> MissingRowPolicy {
row_read_consistency_for_plan(&self.plan)
}
#[must_use]
const fn order_spec(&self) -> Option<&OrderSpec> {
self.plan.scalar_plan().order.as_ref()
}
#[must_use]
fn has_predicate(&self) -> bool {
self.plan.has_residual_predicate()
}
fn index_prefix_specs(&self) -> Result<&[LoweredIndexPrefixSpec], InternalError> {
if self.index_prefix_spec_invalid {
return Err(
ExecutorPlanError::lowered_index_prefix_spec_invalid().into_internal_error()
);
}
Ok(self.index_prefix_specs.as_slice())
}
fn index_range_specs(&self) -> Result<&[LoweredIndexRangeSpec], InternalError> {
if self.index_range_spec_invalid {
return Err(ExecutorPlanError::lowered_index_range_spec_invalid().into_internal_error());
}
Ok(self.index_range_specs.as_slice())
}
#[must_use]
fn into_inner(self) -> AccessPlannedQuery {
self.plan
}
fn prepare_cursor(
&self,
authority: EntityAuthority,
cursor: Option<&[u8]>,
) -> Result<PlannedCursor, ExecutorPlanError> {
let Some(contract) = self.continuation.as_ref() else {
return Err(ExecutorPlanError::continuation_cursor_requires_load_plan());
};
authority
.prepare_scalar_cursor(contract, cursor)
.map_err(ExecutorPlanError::from)
}
fn revalidate_cursor(
&self,
authority: EntityAuthority,
cursor: PlannedCursor,
) -> Result<PlannedCursor, InternalError> {
let Some(contract) = self.continuation.as_ref() else {
return Err(
ExecutorPlanError::continuation_cursor_requires_load_plan().into_internal_error()
);
};
authority
.revalidate_scalar_cursor(contract, cursor)
.map_err(CursorPlanError::into_internal_error)
}
fn revalidate_grouped_cursor(
&self,
cursor: GroupedPlannedCursor,
) -> Result<GroupedPlannedCursor, InternalError> {
let Some(contract) = self.continuation.as_ref() else {
return Err(
ExecutorPlanError::grouped_cursor_revalidation_requires_grouped_plan()
.into_internal_error(),
);
};
contract
.revalidate_grouped_cursor(cursor)
.map_err(CursorPlanError::into_internal_error)
}
fn continuation_signature_for_runtime(&self) -> Result<ContinuationSignature, InternalError> {
let contract = self.continuation_contract()?;
Ok(contract.continuation_signature())
}
fn grouped_cursor_boundary_arity(&self) -> Result<usize, InternalError> {
let contract = self.continuation_contract()?;
if !contract.is_grouped() {
return Err(
ExecutorPlanError::grouped_cursor_boundary_arity_requires_grouped_plan()
.into_internal_error(),
);
}
Ok(contract.boundary_arity())
}
fn grouped_pagination_window(
&self,
cursor: &GroupedPlannedCursor,
) -> Result<GroupedPaginationWindow, InternalError> {
let contract = self.continuation_contract()?;
let window = contract
.project_grouped_paging_window(cursor)
.map_err(CursorPlanError::into_internal_error)?;
let (
limit,
initial_offset_for_page,
selection_bound,
resume_initial_offset,
resume_boundary,
) = window.into_parts();
Ok(GroupedPaginationWindow::new(
limit,
initial_offset_for_page,
selection_bound,
resume_initial_offset,
resume_boundary,
))
}
fn continuation_contract(&self) -> Result<&PlannedContinuationContract, InternalError> {
self.continuation.as_ref().ok_or_else(|| {
ExecutorPlanError::continuation_contract_requires_load_plan().into_internal_error()
})
}
}
fn build_prepared_execution_plan_core(
authority: EntityAuthority,
mut plan: AccessPlannedQuery,
) -> PreparedExecutionPlanCore {
authority.finalize_static_planning_shape(&mut plan);
let continuation = plan.planned_continuation_contract(authority.entity_path());
let (index_prefix_specs, index_prefix_spec_invalid) =
match lower_index_prefix_specs(authority.entity_tag(), &plan.access) {
Ok(specs) => (specs, false),
Err(_) => (Vec::new(), true),
};
let (index_range_specs, index_range_spec_invalid) =
match lower_index_range_specs(authority.entity_tag(), &plan.access) {
Ok(specs) => (specs, false),
Err(_) => (Vec::new(), true),
};
PreparedExecutionPlanCore::new(
plan,
continuation,
index_prefix_specs,
index_prefix_spec_invalid,
index_range_specs,
index_range_spec_invalid,
)
}
#[derive(Debug)]
pub(in crate::db) struct PreparedExecutionPlan<E: EntityKind> {
core: PreparedExecutionPlanCore,
marker: PhantomData<fn() -> E>,
}
#[derive(Debug)]
pub(in crate::db::executor) struct PreparedLoadPlan {
authority: EntityAuthority,
core: PreparedExecutionPlanCore,
}
impl PreparedLoadPlan {
#[must_use]
pub(in crate::db::executor) fn from_plan(
authority: EntityAuthority,
plan: AccessPlannedQuery,
) -> Self {
Self {
authority,
core: build_prepared_execution_plan_core(authority, plan),
}
}
#[must_use]
pub(in crate::db::executor) const fn authority(&self) -> EntityAuthority {
self.authority
}
#[must_use]
pub(in crate::db::executor) const fn mode(&self) -> QueryMode {
self.core.mode()
}
#[must_use]
pub(in crate::db::executor) const fn logical_plan(&self) -> &AccessPlannedQuery {
self.core.plan()
}
pub(in crate::db::executor) fn execution_ordering(
&self,
) -> Result<ExecutionOrdering, InternalError> {
self.core.execution_ordering()
}
pub(in crate::db::executor) fn revalidate_cursor(
&self,
cursor: PlannedCursor,
) -> Result<PlannedCursor, InternalError> {
self.core.revalidate_cursor(self.authority, cursor)
}
pub(in crate::db::executor) fn revalidate_grouped_cursor(
&self,
cursor: GroupedPlannedCursor,
) -> Result<GroupedPlannedCursor, InternalError> {
self.core.revalidate_grouped_cursor(cursor)
}
pub(in crate::db::executor) fn continuation_signature_for_runtime(
&self,
) -> Result<ContinuationSignature, InternalError> {
self.core.continuation_signature_for_runtime()
}
pub(in crate::db::executor) fn grouped_cursor_boundary_arity(
&self,
) -> Result<usize, InternalError> {
self.core.grouped_cursor_boundary_arity()
}
pub(in crate::db::executor) fn grouped_pagination_window(
&self,
cursor: &GroupedPlannedCursor,
) -> Result<GroupedPaginationWindow, InternalError> {
self.core.grouped_pagination_window(cursor)
}
pub(in crate::db::executor) fn index_prefix_specs(
&self,
) -> Result<&[LoweredIndexPrefixSpec], InternalError> {
self.core.index_prefix_specs()
}
pub(in crate::db::executor) fn index_range_specs(
&self,
) -> Result<&[LoweredIndexRangeSpec], InternalError> {
self.core.index_range_specs()
}
#[must_use]
pub(in crate::db::executor) fn into_plan(self) -> AccessPlannedQuery {
self.core.into_inner()
}
}
#[derive(Debug)]
pub(in crate::db::executor) struct PreparedAggregatePlan {
authority: EntityAuthority,
core: PreparedExecutionPlanCore,
}
impl PreparedAggregatePlan {
#[must_use]
pub(in crate::db::executor) fn execution_preparation(&self) -> ExecutionPreparation {
ExecutionPreparation::from_plan(self.core.plan(), slot_map_for_model_plan(self.core.plan()))
}
pub(in crate::db::executor) fn into_streaming_parts(
self,
) -> Result<
(
EntityAuthority,
AccessPlannedQuery,
Vec<LoweredIndexPrefixSpec>,
Vec<LoweredIndexRangeSpec>,
),
InternalError,
> {
let Self { authority, core } = self;
if core.index_prefix_spec_invalid {
return Err(
ExecutorPlanError::lowered_index_prefix_spec_invalid().into_internal_error()
);
}
if core.index_range_spec_invalid {
return Err(ExecutorPlanError::lowered_index_range_spec_invalid().into_internal_error());
}
Ok((
authority,
core.plan,
core.index_prefix_specs,
core.index_range_specs,
))
}
#[must_use]
pub(in crate::db::executor) fn into_grouped_load_plan(
self,
group: GroupSpec,
) -> PreparedLoadPlan {
PreparedLoadPlan::from_plan(self.authority, self.core.into_inner().into_grouped(group))
}
}
impl<E: EntityKind> PreparedExecutionPlan<E> {
pub(in crate::db) fn new(plan: AccessPlannedQuery) -> Self {
Self::build(plan)
}
fn build(mut plan: AccessPlannedQuery) -> Self {
let authority = EntityAuthority::for_type::<E>();
authority.finalize_static_planning_shape(&mut plan);
authority.finalize_planner_route_profile(&mut plan);
Self {
core: build_prepared_execution_plan_core(authority, plan),
marker: PhantomData,
}
}
pub(in crate::db) fn explain_load_execution_node_descriptor(
&self,
) -> Result<ExplainExecutionNodeDescriptor, InternalError>
where
E: EntityValue,
{
if !self.mode().is_load() {
return Err(
ExecutorPlanError::load_execution_descriptor_requires_load_plan()
.into_internal_error(),
);
}
let authority = EntityAuthority::for_type::<E>();
assemble_load_execution_node_descriptor(
authority.fields(),
authority.primary_key_name(),
self.core.plan(),
)
}
pub(in crate::db) fn prepare_cursor(
&self,
cursor: Option<&[u8]>,
) -> Result<PlannedCursor, ExecutorPlanError> {
self.core
.prepare_cursor(EntityAuthority::for_type::<E>(), cursor)
}
#[must_use]
pub(in crate::db) const fn mode(&self) -> QueryMode {
self.core.mode()
}
#[must_use]
pub(in crate::db) const fn is_grouped(&self) -> bool {
self.core.is_grouped()
}
pub(in crate::db) fn execution_family(&self) -> Result<ExecutionFamily, InternalError> {
self.core.execution_family()
}
#[must_use]
#[cfg(test)]
pub(in crate::db) const fn logical_plan(&self) -> &AccessPlannedQuery {
self.core.plan()
}
#[cfg(test)]
pub(in crate::db) fn execution_ordering(&self) -> Result<ExecutionOrdering, InternalError> {
self.core.execution_ordering()
}
pub(in crate::db) const fn access(
&self,
) -> &crate::db::access::AccessPlan<crate::value::Value> {
&self.core.plan().access
}
#[must_use]
pub(in crate::db) const fn consistency(&self) -> MissingRowPolicy {
self.core.consistency()
}
#[must_use]
pub(in crate::db) fn bytes_by_projection_mode(
&self,
target_field: &str,
) -> BytesByProjectionMode {
let authority = EntityAuthority::for_type::<E>();
classify_bytes_by_projection_mode(
self.access(),
self.order_spec(),
self.consistency(),
self.has_predicate(),
target_field,
authority.primary_key_name(),
)
}
#[must_use]
pub(in crate::db) const fn bytes_by_projection_mode_label(
mode: BytesByProjectionMode,
) -> &'static str {
match mode {
BytesByProjectionMode::Materialized => "field_materialized",
BytesByProjectionMode::CoveringIndex => "field_covering_index",
BytesByProjectionMode::CoveringConstant => "field_covering_constant",
}
}
#[must_use]
pub(in crate::db::executor) const fn order_spec(&self) -> Option<&OrderSpec> {
self.core.order_spec()
}
#[must_use]
pub(in crate::db::executor) fn has_predicate(&self) -> bool {
self.core.has_predicate()
}
pub(in crate::db) fn index_prefix_specs(
&self,
) -> Result<&[LoweredIndexPrefixSpec], InternalError> {
self.core.index_prefix_specs()
}
pub(in crate::db) fn index_range_specs(
&self,
) -> Result<&[LoweredIndexRangeSpec], InternalError> {
self.core.index_range_specs()
}
#[cfg(test)]
pub(in crate::db) fn render_snapshot_canonical(&self) -> Result<String, InternalError>
where
E: EntityValue,
{
let plan = self.core.plan();
let authority = EntityAuthority::for_type::<E>();
let projection_spec = plan.frozen_projection_spec();
let projection_selection = if plan.grouped_plan().is_some()
|| projection_spec.len() != authority.row_layout().field_count()
{
"Declared"
} else {
"All"
};
let projection_coverage_flag = plan.grouped_plan().is_some();
let continuation_signature = self.core.continuation_signature_for_runtime()?;
let ordering_direction = self
.core
.continuation_contract()?
.order_contract()
.direction();
let load_terminal_fast_path =
crate::db::executor::planning::route::derive_load_terminal_fast_path_contract_for_plan(
authority, plan,
);
let index_prefix_specs = render_index_prefix_specs(self.core.index_prefix_specs()?);
let index_range_specs = render_index_range_specs(self.core.index_range_specs()?);
let explain_plan = plan.explain_with_model(E::MODEL);
Ok([
"snapshot_version=1".to_string(),
format!("plan_hash={}", plan.fingerprint()),
format!("mode={:?}", self.core.mode()),
format!("is_grouped={}", self.core.is_grouped()),
format!("execution_family={:?}", self.core.execution_family()?),
format!(
"load_terminal_fast_path={}",
render_load_terminal_fast_path_label(load_terminal_fast_path.as_ref())
),
format!("ordering_direction={ordering_direction:?}"),
format!(
"distinct_execution_strategy={:?}",
plan.distinct_execution_strategy()
),
format!("projection_selection={projection_selection}"),
format!("projection_spec={projection_spec:?}"),
format!("order_spec={:?}", plan.scalar_plan().order),
format!("page_spec={:?}", plan.scalar_plan().page),
format!("projection_coverage_flag={projection_coverage_flag}"),
format!("continuation_signature={continuation_signature}"),
format!("index_prefix_specs={index_prefix_specs}"),
format!("index_range_specs={index_range_specs}"),
format!("explain_plan={explain_plan:?}"),
]
.join("\n"))
}
pub(in crate::db) fn into_plan(self) -> AccessPlannedQuery {
self.core.into_inner()
}
#[cfg(test)]
pub(in crate::db) fn prepare_grouped_cursor(
&self,
cursor: Option<&[u8]>,
) -> Result<GroupedPlannedCursor, ExecutorPlanError> {
let Some(contract) = self.core.continuation.as_ref() else {
return Err(ExecutorPlanError::grouped_cursor_preparation_requires_grouped_plan());
};
contract
.prepare_grouped_cursor(EntityAuthority::for_type::<E>().entity_path(), cursor)
.map_err(ExecutorPlanError::from)
}
pub(in crate::db) fn prepare_grouped_cursor_token(
&self,
cursor: Option<crate::db::cursor::GroupedContinuationToken>,
) -> Result<GroupedPlannedCursor, ExecutorPlanError> {
let Some(contract) = self.core.continuation.as_ref() else {
return Err(ExecutorPlanError::grouped_cursor_preparation_requires_grouped_plan());
};
contract
.prepare_grouped_cursor_token(EntityAuthority::for_type::<E>().entity_path(), cursor)
.map_err(ExecutorPlanError::from)
}
#[must_use]
pub(in crate::db::executor) fn into_prepared_load_plan(self) -> PreparedLoadPlan {
PreparedLoadPlan {
authority: EntityAuthority::for_type::<E>(),
core: self.core,
}
}
#[must_use]
pub(in crate::db::executor) fn into_prepared_aggregate_plan(self) -> PreparedAggregatePlan {
PreparedAggregatePlan {
authority: EntityAuthority::for_type::<E>(),
core: self.core,
}
}
}
#[cfg(test)]
const fn render_load_terminal_fast_path_label(
contract: Option<&LoadTerminalFastPathContract>,
) -> &'static str {
match contract {
Some(LoadTerminalFastPathContract::CoveringRead(_)) => "CoveringRead",
None => "Materialized",
}
}
#[cfg(test)]
fn render_index_prefix_specs(specs: &[LoweredIndexPrefixSpec]) -> String {
let rendered = specs
.iter()
.map(|spec| {
format!(
"{{index:{},bound_type:equality,lower:{},upper:{}}}",
spec.index().name(),
render_lowered_bound(spec.lower()),
render_lowered_bound(spec.upper()),
)
})
.collect::<Vec<_>>();
format!("[{}]", rendered.join(","))
}
#[cfg(test)]
fn render_index_range_specs(specs: &[LoweredIndexRangeSpec]) -> String {
let rendered = specs
.iter()
.map(|spec| {
format!(
"{{index:{},lower:{},upper:{}}}",
spec.index().name(),
render_lowered_bound(spec.lower()),
render_lowered_bound(spec.upper()),
)
})
.collect::<Vec<_>>();
format!("[{}]", rendered.join(","))
}
#[cfg(test)]
fn render_lowered_bound(bound: &Bound<crate::db::access::LoweredKey>) -> String {
match bound {
Bound::Included(key) => format!("included({})", render_lowered_key_summary(key)),
Bound::Excluded(key) => format!("excluded({})", render_lowered_key_summary(key)),
Bound::Unbounded => "unbounded".to_string(),
}
}
#[cfg(test)]
fn render_lowered_key_summary(key: &crate::db::access::LoweredKey) -> String {
let bytes = key.as_bytes();
let head_len = bytes.len().min(8);
let tail_len = bytes.len().min(8);
let head = crate::db::codec::cursor::encode_cursor(&bytes[..head_len]);
let tail = crate::db::codec::cursor::encode_cursor(&bytes[bytes.len() - tail_len..]);
format!("len:{}:head:{head}:tail:{tail}", bytes.len())
}