#[cfg(test)]
use crate::db::executor::planning::route::LoadTerminalFastPathContract;
use crate::{
db::{
access::AccessPlan,
cursor::{ContinuationSignature, CursorPlanError, GroupedPlannedCursor, PlannedCursor},
executor::{
EntityAuthority, ExecutionPreparation, ExecutorPlanError, GroupedPaginationWindow,
LoweredIndexPrefixSpec, LoweredIndexRangeSpec,
explain::assemble_load_execution_node_descriptor,
lower_index_prefix_specs, lower_index_range_specs,
pipeline::contracts::{
CursorEmissionMode, ProjectionMaterializationMode,
compile_retained_slot_layout_for_mode,
grouped::compile_grouped_row_slot_layout_from_parts,
},
planning::preparation::slot_map_for_model_plan,
projection::{PreparedProjectionShape, prepare_projection_shape_from_plan},
terminal::RetainedSlotLayout,
traversal::row_read_consistency_for_plan,
},
predicate::MissingRowPolicy,
query::explain::ExplainExecutionNodeDescriptor,
query::plan::{
AccessPlannedQuery, ExecutionOrdering, GroupSpec, OrderSpec,
PlannedContinuationContract, QueryMode, constant_covering_projection_value_from_access,
covering_index_projection_context,
},
},
error::InternalError,
traits::{EntityKind, EntityValue},
};
use std::marker::PhantomData;
use std::sync::Arc;
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum ExecutionFamily {
PrimaryKey,
Ordered,
Grouped,
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(in crate::db) enum BytesByProjectionMode {
Materialized,
CoveringIndex,
CoveringConstant,
}
#[must_use]
pub(in crate::db::executor) fn classify_bytes_by_projection_mode(
access: &AccessPlan<crate::value::Value>,
order_spec: Option<&OrderSpec>,
consistency: MissingRowPolicy,
has_predicate: bool,
target_field: &str,
primary_key_name: &'static str,
) -> BytesByProjectionMode {
if !matches!(consistency, MissingRowPolicy::Ignore) {
return BytesByProjectionMode::Materialized;
}
if constant_covering_projection_value_from_access(access, target_field).is_some() {
return BytesByProjectionMode::CoveringConstant;
}
if has_predicate {
return BytesByProjectionMode::Materialized;
}
if covering_index_projection_context(access, order_spec, target_field, primary_key_name)
.is_some()
{
return BytesByProjectionMode::CoveringIndex;
}
BytesByProjectionMode::Materialized
}
#[derive(Clone, Debug)]
struct PreparedExecutionPlanCoreShared {
plan: AccessPlannedQuery,
prepared_projection_shape: Option<Arc<PreparedProjectionShape>>,
prepared_grouped_runtime_residents: Option<Arc<PreparedGroupedRuntimeResidents>>,
shared_validation_emit_retained_slot_layout: Option<RetainedSlotLayout>,
retain_slot_rows_suppress_retained_slot_layout: Option<RetainedSlotLayout>,
none_suppress_retained_slot_layout: Option<RetainedSlotLayout>,
continuation: Option<PlannedContinuationContract>,
index_prefix_specs: Vec<LoweredIndexPrefixSpec>,
index_prefix_spec_invalid: bool,
index_range_specs: Vec<LoweredIndexRangeSpec>,
index_range_spec_invalid: bool,
}
#[derive(Clone, Debug)]
struct PreparedExecutionPlanCore {
shared: Arc<PreparedExecutionPlanCoreShared>,
}
#[derive(Clone)]
struct PreparedGroupedRuntimeResidents {
execution_preparation: ExecutionPreparation,
grouped_slot_layout: RetainedSlotLayout,
}
impl std::fmt::Debug for PreparedGroupedRuntimeResidents {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str("PreparedGroupedRuntimeResidents(..)")
}
}
impl PreparedGroupedRuntimeResidents {
const fn new(
execution_preparation: ExecutionPreparation,
grouped_slot_layout: RetainedSlotLayout,
) -> Self {
Self {
execution_preparation,
grouped_slot_layout,
}
}
fn execution_preparation(&self) -> ExecutionPreparation {
self.execution_preparation.clone()
}
fn grouped_slot_layout(&self) -> RetainedSlotLayout {
self.grouped_slot_layout.clone()
}
}
#[derive(Clone, Debug)]
pub(in crate::db) struct SharedPreparedExecutionPlan {
authority: EntityAuthority,
core: PreparedExecutionPlanCore,
}
impl SharedPreparedExecutionPlan {
#[must_use]
pub(in crate::db) fn from_plan(
authority: EntityAuthority,
mut plan: AccessPlannedQuery,
) -> Self {
authority.finalize_planner_route_profile(&mut plan);
Self {
authority,
core: build_prepared_execution_plan_core(authority, plan),
}
}
#[must_use]
pub(in crate::db) fn typed_clone<E: EntityKind>(&self) -> PreparedExecutionPlan<E> {
assert!(
self.authority.entity_path() == E::PATH,
"shared prepared plan entity mismatch: cached for '{}', requested '{}'",
self.authority.entity_path(),
E::PATH,
);
PreparedExecutionPlan {
core: self.core.clone(),
marker: PhantomData,
}
}
#[must_use]
pub(in crate::db) const fn authority(&self) -> EntityAuthority {
self.authority
}
#[must_use]
pub(in crate::db) fn logical_plan(&self) -> &AccessPlannedQuery {
self.core.plan()
}
#[must_use]
pub(in crate::db) fn prepared_projection_shape(&self) -> Option<&PreparedProjectionShape> {
self.core.prepared_projection_shape()
}
}
impl PreparedExecutionPlanCore {
#[must_use]
#[expect(
clippy::too_many_arguments,
reason = "prepared plan core assembly freezes all shared lowered residents at one boundary"
)]
fn new(
plan: AccessPlannedQuery,
prepared_projection_shape: Option<Arc<PreparedProjectionShape>>,
prepared_grouped_runtime_residents: Option<Arc<PreparedGroupedRuntimeResidents>>,
shared_validation_emit_retained_slot_layout: Option<RetainedSlotLayout>,
retain_slot_rows_suppress_retained_slot_layout: Option<RetainedSlotLayout>,
none_suppress_retained_slot_layout: Option<RetainedSlotLayout>,
continuation: Option<PlannedContinuationContract>,
index_prefix_specs: Vec<LoweredIndexPrefixSpec>,
index_prefix_spec_invalid: bool,
index_range_specs: Vec<LoweredIndexRangeSpec>,
index_range_spec_invalid: bool,
) -> Self {
Self {
shared: Arc::new(PreparedExecutionPlanCoreShared {
plan,
prepared_projection_shape,
prepared_grouped_runtime_residents,
shared_validation_emit_retained_slot_layout,
retain_slot_rows_suppress_retained_slot_layout,
none_suppress_retained_slot_layout,
continuation,
index_prefix_specs,
index_prefix_spec_invalid,
index_range_specs,
index_range_spec_invalid,
}),
}
}
#[must_use]
fn plan(&self) -> &AccessPlannedQuery {
&self.shared.plan
}
#[must_use]
fn prepared_projection_shape(&self) -> Option<&PreparedProjectionShape> {
self.shared.prepared_projection_shape.as_deref()
}
fn grouped_runtime_residents(&self) -> Option<&PreparedGroupedRuntimeResidents> {
self.shared.prepared_grouped_runtime_residents.as_deref()
}
fn grouped_execution_preparation(&self) -> Option<ExecutionPreparation> {
self.grouped_runtime_residents()
.map(PreparedGroupedRuntimeResidents::execution_preparation)
}
fn grouped_slot_layout(&self) -> Option<RetainedSlotLayout> {
self.grouped_runtime_residents()
.map(PreparedGroupedRuntimeResidents::grouped_slot_layout)
}
#[must_use]
fn retained_slot_layout(
&self,
projection_materialization: ProjectionMaterializationMode,
cursor_emission: CursorEmissionMode,
) -> Option<RetainedSlotLayout> {
match (projection_materialization, cursor_emission) {
(ProjectionMaterializationMode::SharedValidation, CursorEmissionMode::Emit) => self
.shared
.shared_validation_emit_retained_slot_layout
.clone(),
(ProjectionMaterializationMode::RetainSlotRows, CursorEmissionMode::Suppress) => self
.shared
.retain_slot_rows_suppress_retained_slot_layout
.clone(),
(ProjectionMaterializationMode::None, CursorEmissionMode::Suppress) => {
self.shared.none_suppress_retained_slot_layout.clone()
}
_ => None,
}
}
#[must_use]
fn mode(&self) -> QueryMode {
self.shared.plan.scalar_plan().mode
}
#[must_use]
fn is_grouped(&self) -> bool {
match self.shared.continuation {
Some(ref contract) => contract.is_grouped(),
None => false,
}
}
fn execution_ordering(&self) -> Result<ExecutionOrdering, InternalError> {
let contract = self.continuation_contract()?;
Ok(contract.order_contract().ordering().clone())
}
fn execution_family(&self) -> Result<ExecutionFamily, InternalError> {
let ordering = self.execution_ordering()?;
Ok(match ordering {
ExecutionOrdering::PrimaryKey => ExecutionFamily::PrimaryKey,
ExecutionOrdering::Explicit(_) => ExecutionFamily::Ordered,
ExecutionOrdering::Grouped(_) => ExecutionFamily::Grouped,
})
}
#[must_use]
fn consistency(&self) -> MissingRowPolicy {
row_read_consistency_for_plan(&self.shared.plan)
}
#[must_use]
fn order_spec(&self) -> Option<&OrderSpec> {
self.shared.plan.scalar_plan().order.as_ref()
}
#[must_use]
fn has_predicate(&self) -> bool {
self.shared.plan.has_residual_predicate()
}
fn index_prefix_specs(&self) -> Result<&[LoweredIndexPrefixSpec], InternalError> {
if self.shared.index_prefix_spec_invalid {
return Err(
ExecutorPlanError::lowered_index_prefix_spec_invalid().into_internal_error()
);
}
Ok(self.shared.index_prefix_specs.as_slice())
}
fn index_range_specs(&self) -> Result<&[LoweredIndexRangeSpec], InternalError> {
if self.shared.index_range_spec_invalid {
return Err(ExecutorPlanError::lowered_index_range_spec_invalid().into_internal_error());
}
Ok(self.shared.index_range_specs.as_slice())
}
#[must_use]
fn into_inner(self) -> AccessPlannedQuery {
self.into_shared().plan
}
#[must_use]
fn into_shared(self) -> PreparedExecutionPlanCoreShared {
Arc::try_unwrap(self.shared).unwrap_or_else(|shared| shared.as_ref().clone())
}
fn prepare_cursor(
&self,
authority: EntityAuthority,
cursor: Option<&[u8]>,
) -> Result<PlannedCursor, ExecutorPlanError> {
let Some(contract) = self.shared.continuation.as_ref() else {
return Err(ExecutorPlanError::continuation_cursor_requires_load_plan());
};
authority
.prepare_scalar_cursor(contract, cursor)
.map_err(ExecutorPlanError::from)
}
fn revalidate_cursor(
&self,
authority: EntityAuthority,
cursor: PlannedCursor,
) -> Result<PlannedCursor, InternalError> {
let Some(contract) = self.shared.continuation.as_ref() else {
return Err(
ExecutorPlanError::continuation_cursor_requires_load_plan().into_internal_error()
);
};
authority
.revalidate_scalar_cursor(contract, cursor)
.map_err(CursorPlanError::into_internal_error)
}
fn revalidate_grouped_cursor(
&self,
cursor: GroupedPlannedCursor,
) -> Result<GroupedPlannedCursor, InternalError> {
let Some(contract) = self.shared.continuation.as_ref() else {
return Err(
ExecutorPlanError::grouped_cursor_revalidation_requires_grouped_plan()
.into_internal_error(),
);
};
contract
.revalidate_grouped_cursor(cursor)
.map_err(CursorPlanError::into_internal_error)
}
fn continuation_signature_for_runtime(&self) -> Result<ContinuationSignature, InternalError> {
let contract = self.continuation_contract()?;
Ok(contract.continuation_signature())
}
fn grouped_cursor_boundary_arity(&self) -> Result<usize, InternalError> {
let contract = self.continuation_contract()?;
if !contract.is_grouped() {
return Err(
ExecutorPlanError::grouped_cursor_boundary_arity_requires_grouped_plan()
.into_internal_error(),
);
}
Ok(contract.boundary_arity())
}
fn grouped_pagination_window(
&self,
cursor: &GroupedPlannedCursor,
) -> Result<GroupedPaginationWindow, InternalError> {
let contract = self.continuation_contract()?;
let window = contract
.project_grouped_paging_window(cursor)
.map_err(CursorPlanError::into_internal_error)?;
let (
limit,
initial_offset_for_page,
selection_bound,
resume_initial_offset,
resume_boundary,
) = window.into_parts();
Ok(GroupedPaginationWindow::new(
limit,
initial_offset_for_page,
selection_bound,
resume_initial_offset,
resume_boundary,
))
}
fn continuation_contract(&self) -> Result<&PlannedContinuationContract, InternalError> {
self.shared.continuation.as_ref().ok_or_else(|| {
ExecutorPlanError::continuation_contract_requires_load_plan().into_internal_error()
})
}
}
fn build_prepared_execution_plan_core(
authority: EntityAuthority,
mut plan: AccessPlannedQuery,
) -> PreparedExecutionPlanCore {
authority.finalize_static_planning_shape(&mut plan);
let prepared_projection_shape = plan
.scalar_projection_plan()
.map(|_| Arc::new(prepare_projection_shape_from_plan(authority.model(), &plan)));
let prepared_grouped_runtime_residents = plan.grouped_plan().and_then(|grouped_plan| {
let grouped_distinct_execution_strategy = plan.grouped_distinct_execution_strategy()?;
let execution_preparation =
ExecutionPreparation::from_runtime_plan(&plan, plan.slot_map().map(<[usize]>::to_vec));
let grouped_slot_layout = compile_grouped_row_slot_layout_from_parts(
authority.row_layout(),
grouped_plan.group.group_fields.as_slice(),
plan.grouped_aggregate_execution_specs().unwrap_or(&[]),
grouped_distinct_execution_strategy,
execution_preparation.compiled_predicate(),
);
Some(Arc::new(PreparedGroupedRuntimeResidents::new(
execution_preparation,
grouped_slot_layout,
)))
});
let shared_validation_emit_retained_slot_layout = compile_retained_slot_layout_for_mode(
authority.model(),
&plan,
ProjectionMaterializationMode::SharedValidation,
CursorEmissionMode::Emit,
);
let retain_slot_rows_suppress_retained_slot_layout = compile_retained_slot_layout_for_mode(
authority.model(),
&plan,
ProjectionMaterializationMode::RetainSlotRows,
CursorEmissionMode::Suppress,
);
let none_suppress_retained_slot_layout = compile_retained_slot_layout_for_mode(
authority.model(),
&plan,
ProjectionMaterializationMode::None,
CursorEmissionMode::Suppress,
);
let continuation = plan.planned_continuation_contract(authority.entity_path());
let (index_prefix_specs, index_prefix_spec_invalid) =
match lower_index_prefix_specs(authority.entity_tag(), &plan.access) {
Ok(specs) => (specs, false),
Err(_) => (Vec::new(), true),
};
let (index_range_specs, index_range_spec_invalid) =
match lower_index_range_specs(authority.entity_tag(), &plan.access) {
Ok(specs) => (specs, false),
Err(_) => (Vec::new(), true),
};
PreparedExecutionPlanCore::new(
plan,
prepared_projection_shape,
prepared_grouped_runtime_residents,
shared_validation_emit_retained_slot_layout,
retain_slot_rows_suppress_retained_slot_layout,
none_suppress_retained_slot_layout,
continuation,
index_prefix_specs,
index_prefix_spec_invalid,
index_range_specs,
index_range_spec_invalid,
)
}
#[derive(Debug)]
pub(in crate::db) struct PreparedExecutionPlan<E: EntityKind> {
core: PreparedExecutionPlanCore,
marker: PhantomData<fn() -> E>,
}
#[derive(Debug)]
pub(in crate::db::executor) struct PreparedLoadPlan {
authority: EntityAuthority,
core: PreparedExecutionPlanCore,
}
impl PreparedLoadPlan {
#[must_use]
pub(in crate::db::executor) fn from_plan(
authority: EntityAuthority,
plan: AccessPlannedQuery,
) -> Self {
Self {
authority,
core: build_prepared_execution_plan_core(authority, plan),
}
}
#[must_use]
pub(in crate::db::executor) const fn authority(&self) -> EntityAuthority {
self.authority
}
#[must_use]
pub(in crate::db::executor) fn mode(&self) -> QueryMode {
self.core.mode()
}
#[must_use]
pub(in crate::db::executor) fn logical_plan(&self) -> &AccessPlannedQuery {
self.core.plan()
}
pub(in crate::db::executor) fn execution_ordering(
&self,
) -> Result<ExecutionOrdering, InternalError> {
self.core.execution_ordering()
}
pub(in crate::db::executor) fn revalidate_cursor(
&self,
cursor: PlannedCursor,
) -> Result<PlannedCursor, InternalError> {
self.core.revalidate_cursor(self.authority, cursor)
}
pub(in crate::db::executor) fn revalidate_grouped_cursor(
&self,
cursor: GroupedPlannedCursor,
) -> Result<GroupedPlannedCursor, InternalError> {
self.core.revalidate_grouped_cursor(cursor)
}
pub(in crate::db::executor) fn continuation_signature_for_runtime(
&self,
) -> Result<ContinuationSignature, InternalError> {
self.core.continuation_signature_for_runtime()
}
pub(in crate::db::executor) fn grouped_cursor_boundary_arity(
&self,
) -> Result<usize, InternalError> {
self.core.grouped_cursor_boundary_arity()
}
pub(in crate::db::executor) fn grouped_pagination_window(
&self,
cursor: &GroupedPlannedCursor,
) -> Result<GroupedPaginationWindow, InternalError> {
self.core.grouped_pagination_window(cursor)
}
pub(in crate::db::executor) fn index_prefix_specs(
&self,
) -> Result<&[LoweredIndexPrefixSpec], InternalError> {
self.core.index_prefix_specs()
}
pub(in crate::db::executor) fn index_range_specs(
&self,
) -> Result<&[LoweredIndexRangeSpec], InternalError> {
self.core.index_range_specs()
}
#[must_use]
pub(in crate::db::executor) fn cloned_prepared_projection_shape(
&self,
) -> Option<Arc<PreparedProjectionShape>> {
self.core.shared.prepared_projection_shape.clone()
}
#[must_use]
pub(in crate::db::executor) fn cloned_retained_slot_layout(
&self,
projection_materialization: ProjectionMaterializationMode,
cursor_emission: CursorEmissionMode,
) -> Option<RetainedSlotLayout> {
self.core
.retained_slot_layout(projection_materialization, cursor_emission)
}
#[must_use]
pub(in crate::db::executor) fn cloned_grouped_execution_preparation(
&self,
) -> Option<ExecutionPreparation> {
self.core.grouped_execution_preparation()
}
#[must_use]
pub(in crate::db::executor) fn cloned_grouped_slot_layout(&self) -> Option<RetainedSlotLayout> {
self.core.grouped_slot_layout()
}
#[must_use]
pub(in crate::db::executor) fn into_plan(self) -> AccessPlannedQuery {
self.core.into_inner()
}
}
#[derive(Debug)]
pub(in crate::db::executor) struct PreparedAggregatePlan {
authority: EntityAuthority,
core: PreparedExecutionPlanCore,
}
impl PreparedAggregatePlan {
#[must_use]
pub(in crate::db::executor) fn execution_preparation(&self) -> ExecutionPreparation {
ExecutionPreparation::from_plan(self.core.plan(), slot_map_for_model_plan(self.core.plan()))
}
pub(in crate::db::executor) fn into_streaming_parts(
self,
) -> Result<
(
EntityAuthority,
AccessPlannedQuery,
Vec<LoweredIndexPrefixSpec>,
Vec<LoweredIndexRangeSpec>,
),
InternalError,
> {
let Self { authority, core } = self;
let shared = core.into_shared();
if shared.index_prefix_spec_invalid {
return Err(
ExecutorPlanError::lowered_index_prefix_spec_invalid().into_internal_error()
);
}
if shared.index_range_spec_invalid {
return Err(ExecutorPlanError::lowered_index_range_spec_invalid().into_internal_error());
}
Ok((
authority,
shared.plan,
shared.index_prefix_specs,
shared.index_range_specs,
))
}
#[must_use]
pub(in crate::db::executor) fn into_grouped_load_plan(
self,
group: GroupSpec,
) -> PreparedLoadPlan {
PreparedLoadPlan::from_plan(self.authority, self.core.into_inner().into_grouped(group))
}
}
impl<E: EntityKind> PreparedExecutionPlan<E> {
pub(in crate::db) fn new(plan: AccessPlannedQuery) -> Self {
Self::build(plan)
}
fn build(mut plan: AccessPlannedQuery) -> Self {
let authority = EntityAuthority::for_type::<E>();
authority.finalize_planner_route_profile(&mut plan);
Self {
core: build_prepared_execution_plan_core(authority, plan),
marker: PhantomData,
}
}
pub(in crate::db) fn explain_load_execution_node_descriptor(
&self,
) -> Result<ExplainExecutionNodeDescriptor, InternalError>
where
E: EntityValue,
{
if !self.mode().is_load() {
return Err(
ExecutorPlanError::load_execution_descriptor_requires_load_plan()
.into_internal_error(),
);
}
let authority = EntityAuthority::for_type::<E>();
assemble_load_execution_node_descriptor(
authority.fields(),
authority.primary_key_name(),
self.core.plan(),
)
}
pub(in crate::db) fn prepare_cursor(
&self,
cursor: Option<&[u8]>,
) -> Result<PlannedCursor, ExecutorPlanError> {
self.core
.prepare_cursor(EntityAuthority::for_type::<E>(), cursor)
}
#[must_use]
pub(in crate::db) fn mode(&self) -> QueryMode {
self.core.mode()
}
#[must_use]
pub(in crate::db) fn is_grouped(&self) -> bool {
self.core.is_grouped()
}
pub(in crate::db) fn execution_family(&self) -> Result<ExecutionFamily, InternalError> {
self.core.execution_family()
}
#[must_use]
pub(in crate::db) fn logical_plan(&self) -> &AccessPlannedQuery {
self.core.plan()
}
#[cfg(test)]
pub(in crate::db) fn execution_ordering(&self) -> Result<ExecutionOrdering, InternalError> {
self.core.execution_ordering()
}
pub(in crate::db) fn access(&self) -> &crate::db::access::AccessPlan<crate::value::Value> {
&self.core.plan().access
}
#[must_use]
pub(in crate::db) fn consistency(&self) -> MissingRowPolicy {
self.core.consistency()
}
#[must_use]
pub(in crate::db) fn bytes_by_projection_mode(
&self,
target_field: &str,
) -> BytesByProjectionMode {
let authority = EntityAuthority::for_type::<E>();
classify_bytes_by_projection_mode(
self.access(),
self.order_spec(),
self.consistency(),
self.has_predicate(),
target_field,
authority.primary_key_name(),
)
}
#[must_use]
pub(in crate::db) const fn bytes_by_projection_mode_label(
mode: BytesByProjectionMode,
) -> &'static str {
match mode {
BytesByProjectionMode::Materialized => "field_materialized",
BytesByProjectionMode::CoveringIndex => "field_covering_index",
BytesByProjectionMode::CoveringConstant => "field_covering_constant",
}
}
#[must_use]
pub(in crate::db::executor) fn order_spec(&self) -> Option<&OrderSpec> {
self.core.order_spec()
}
#[must_use]
pub(in crate::db::executor) fn has_predicate(&self) -> bool {
self.core.has_predicate()
}
pub(in crate::db) fn index_prefix_specs(
&self,
) -> Result<&[LoweredIndexPrefixSpec], InternalError> {
self.core.index_prefix_specs()
}
pub(in crate::db) fn index_range_specs(
&self,
) -> Result<&[LoweredIndexRangeSpec], InternalError> {
self.core.index_range_specs()
}
#[cfg(test)]
#[expect(clippy::too_many_lines)]
pub(in crate::db) fn render_snapshot_canonical(&self) -> Result<String, InternalError>
where
E: EntityValue,
{
let plan = self.core.plan();
let authority = EntityAuthority::for_type::<E>();
let projection_spec = plan.frozen_projection_spec();
let projection_selection = if plan.grouped_plan().is_some()
|| projection_spec.len() != authority.row_layout().field_count()
{
"Declared"
} else {
"All"
};
let projection_coverage_flag = plan.grouped_plan().is_some();
let continuation_signature = self.core.continuation_signature_for_runtime()?;
let ordering_direction = self
.core
.continuation_contract()?
.order_contract()
.direction();
let load_terminal_fast_path =
crate::db::executor::planning::route::derive_load_terminal_fast_path_contract_for_plan(
authority, plan,
);
let render_lowered_bound =
|bound: &std::ops::Bound<crate::db::access::LoweredKey>| match bound {
std::ops::Bound::Included(key) => {
let bytes = key.as_bytes();
let head_len = bytes.len().min(8);
let tail_len = bytes.len().min(8);
let head = crate::db::codec::cursor::encode_cursor(&bytes[..head_len]);
let tail =
crate::db::codec::cursor::encode_cursor(&bytes[bytes.len() - tail_len..]);
format!("included(len:{}:head:{head}:tail:{tail})", bytes.len())
}
std::ops::Bound::Excluded(key) => {
let bytes = key.as_bytes();
let head_len = bytes.len().min(8);
let tail_len = bytes.len().min(8);
let head = crate::db::codec::cursor::encode_cursor(&bytes[..head_len]);
let tail =
crate::db::codec::cursor::encode_cursor(&bytes[bytes.len() - tail_len..]);
format!("excluded(len:{}:head:{head}:tail:{tail})", bytes.len())
}
std::ops::Bound::Unbounded => "unbounded".to_string(),
};
let index_prefix_specs = format!(
"[{}]",
self.core
.index_prefix_specs()?
.iter()
.map(|spec| {
format!(
"{{index:{},bound_type:equality,lower:{},upper:{}}}",
spec.index().name(),
render_lowered_bound(spec.lower()),
render_lowered_bound(spec.upper()),
)
})
.collect::<Vec<_>>()
.join(",")
);
let index_range_specs = format!(
"[{}]",
self.core
.index_range_specs()?
.iter()
.map(|spec| {
format!(
"{{index:{},lower:{},upper:{}}}",
spec.index().name(),
render_lowered_bound(spec.lower()),
render_lowered_bound(spec.upper()),
)
})
.collect::<Vec<_>>()
.join(",")
);
let explain_plan = plan.explain();
Ok([
"snapshot_version=1".to_string(),
format!("plan_hash={}", plan.fingerprint()),
format!("mode={:?}", self.core.mode()),
format!("is_grouped={}", self.core.is_grouped()),
format!("execution_family={:?}", self.core.execution_family()?),
format!(
"load_terminal_fast_path={}",
match load_terminal_fast_path.as_ref() {
Some(LoadTerminalFastPathContract::CoveringRead(_)) => "CoveringRead",
None => "Materialized",
}
),
format!("ordering_direction={ordering_direction:?}"),
format!(
"distinct_execution_strategy={:?}",
plan.distinct_execution_strategy()
),
format!("projection_selection={projection_selection}"),
format!("projection_spec={projection_spec:?}"),
format!("order_spec={:?}", plan.scalar_plan().order),
format!("page_spec={:?}", plan.scalar_plan().page),
format!("projection_coverage_flag={projection_coverage_flag}"),
format!("continuation_signature={continuation_signature}"),
format!("index_prefix_specs={index_prefix_specs}"),
format!("index_range_specs={index_range_specs}"),
format!("explain_plan={explain_plan:?}"),
]
.join("\n"))
}
pub(in crate::db) fn into_plan(self) -> AccessPlannedQuery {
self.core.into_inner()
}
#[cfg(test)]
pub(in crate::db) fn prepare_grouped_cursor(
&self,
cursor: Option<&[u8]>,
) -> Result<GroupedPlannedCursor, ExecutorPlanError> {
let Some(contract) = self.core.shared.continuation.as_ref() else {
return Err(ExecutorPlanError::grouped_cursor_preparation_requires_grouped_plan());
};
contract
.prepare_grouped_cursor(EntityAuthority::for_type::<E>().entity_path(), cursor)
.map_err(ExecutorPlanError::from)
}
pub(in crate::db) fn prepare_grouped_cursor_token(
&self,
cursor: Option<crate::db::cursor::GroupedContinuationToken>,
) -> Result<GroupedPlannedCursor, ExecutorPlanError> {
let Some(contract) = self.core.shared.continuation.as_ref() else {
return Err(ExecutorPlanError::grouped_cursor_preparation_requires_grouped_plan());
};
contract
.prepare_grouped_cursor_token(EntityAuthority::for_type::<E>().entity_path(), cursor)
.map_err(ExecutorPlanError::from)
}
#[must_use]
pub(in crate::db::executor) fn into_prepared_load_plan(self) -> PreparedLoadPlan {
PreparedLoadPlan {
authority: EntityAuthority::for_type::<E>(),
core: self.core,
}
}
#[must_use]
pub(in crate::db::executor) fn into_prepared_aggregate_plan(self) -> PreparedAggregatePlan {
PreparedAggregatePlan {
authority: EntityAuthority::for_type::<E>(),
core: self.core,
}
}
}