use crate::{
db::{
executor::{
ExecutableAccessNode, ExecutableAccessPlan, ExecutionPathPayload,
LoweredIndexPrefixSpec, LoweredIndexRangeSpec,
pipeline::contracts::{AccessScanContinuationInput, AccessStreamBindings},
route::IndexPrefixChildExpansionHint,
stream::{
access::{
bindings::{
AccessSpecCursor, AccessStreamExecutionPolicy, ExecutableAccess,
IndexLeafOrderPolicy, IndexStreamConstraints,
},
physical,
},
key::{KeyOrderComparator, OrderedKeyStreamBox},
},
traversal::IndexRangeTraversalContract,
},
index::predicate::IndexPredicateExecution,
},
error::InternalError,
value::Value,
};
#[derive(Clone, Copy)]
struct TraversalInputs<'a> {
index_prefix_specs: &'a [LoweredIndexPrefixSpec],
index_range_specs: &'a [LoweredIndexRangeSpec],
continuation: AccessScanContinuationInput<'a>,
execution_policy: AccessStreamExecutionPolicy,
index_predicate_execution: Option<crate::db::index::predicate::IndexPredicateExecution<'a>>,
index_prefix_child_expansion: Option<IndexPrefixChildExpansionHint>,
}
impl<'a> TraversalInputs<'a> {
const fn with_physical_fetch_hint(self, physical_fetch_hint: Option<usize>) -> Self {
Self {
execution_policy: self
.execution_policy
.with_physical_fetch_hint(physical_fetch_hint),
..self
}
}
const fn without_leaf_index_order_preservation(self) -> Self {
Self {
execution_policy: self
.execution_policy
.with_index_leaf_order_policy(IndexLeafOrderPolicy::CanonicalKeyOrder),
..self
}
}
const fn spec_cursor(&self) -> AccessSpecCursor<'a> {
AccessSpecCursor::new(self.index_prefix_specs, self.index_range_specs)
}
}
fn validate_index_range_spec_alignment(
path: &ExecutionPathPayload<'_, Value>,
index_range_spec: Option<&LoweredIndexRangeSpec>,
) -> Result<(), InternalError> {
IndexRangeTraversalContract::validate_spec_alignment(path, index_range_spec)
}
#[derive(Clone, Copy)]
pub(in crate::db::executor) struct TraversalRuntime {
pub(in crate::db::executor) store: crate::db::registry::StoreHandle,
pub(in crate::db::executor) entity_tag: crate::types::EntityTag,
}
impl TraversalRuntime {
#[must_use]
pub(in crate::db::executor) const fn new(
store: crate::db::registry::StoreHandle,
entity_tag: crate::types::EntityTag,
) -> Self {
Self { store, entity_tag }
}
pub(in crate::db::executor) fn ordered_key_stream_from_runtime_access(
&self,
request: ExecutableAccess<'_, Value>,
) -> Result<OrderedKeyStreamBox, InternalError> {
self.ordered_key_stream_from_executable_plan(
&request.plan,
request.bindings,
request.execution_policy,
request.index_predicate_execution,
)
}
pub(in crate::db::executor) fn ordered_key_stream_from_executable_plan<'input>(
&self,
plan: &ExecutableAccessPlan<'_, Value>,
bindings: AccessStreamBindings<'input>,
execution_policy: AccessStreamExecutionPolicy,
index_predicate_execution: Option<IndexPredicateExecution<'input>>,
) -> Result<OrderedKeyStreamBox, InternalError> {
let inputs = TraversalInputs {
index_prefix_specs: bindings.index_prefix_specs,
index_range_specs: bindings.index_range_specs,
continuation: bindings.continuation,
execution_policy,
index_predicate_execution,
index_prefix_child_expansion: bindings.index_prefix_child_expansion,
};
let mut spec_cursor = inputs.spec_cursor();
let key_stream =
AccessPlanStreamResolver::produce_key_stream(self, plan, inputs, &mut spec_cursor)?;
spec_cursor.validate_consumed()?;
Ok(key_stream)
}
fn lower_path_access(
&self,
path: &ExecutionPathPayload<'_, Value>,
inputs: TraversalInputs<'_>,
index_prefix_specs: &[LoweredIndexPrefixSpec],
index_range_spec: Option<&LoweredIndexRangeSpec>,
) -> Result<OrderedKeyStreamBox, InternalError> {
let constraints = IndexStreamConstraints {
prefixes: index_prefix_specs,
range: index_range_spec,
};
path.resolve_structural_physical_key_stream(physical::StructuralPhysicalStreamRequest {
store: self.store,
entity_tag: self.entity_tag,
index_prefix_specs: constraints.prefixes,
index_range_spec: constraints.range,
continuation: inputs.continuation,
execution_policy: inputs.execution_policy,
index_predicate_execution: inputs.index_predicate_execution,
index_prefix_child_expansion: inputs.index_prefix_child_expansion,
})
}
}
struct AccessPlanStreamResolver;
impl AccessPlanStreamResolver {
fn validate_index_prefix_spec_alignment(
path: &ExecutionPathPayload<'_, Value>,
index_prefix_specs: &[LoweredIndexPrefixSpec],
) -> Result<(), InternalError> {
let path_facts = path.shape_facts();
if let Some(details) = path_facts.index_prefix_details() {
for spec in index_prefix_specs {
if spec.scan_contract().name() != details.name() {
return Err(InternalError::query_executor_invariant());
}
}
}
Ok(())
}
fn collect_child_key_streams(
runtime: &TraversalRuntime,
children: &[ExecutableAccessPlan<'_, Value>],
inputs: TraversalInputs<'_>,
spec_cursor: &mut AccessSpecCursor<'_>,
) -> Result<Vec<OrderedKeyStreamBox>, InternalError> {
let mut streams = Vec::with_capacity(children.len());
for child in children {
let child_inputs = inputs
.with_physical_fetch_hint(None)
.without_leaf_index_order_preservation();
streams.push(Self::produce_key_stream(
runtime,
child,
child_inputs,
spec_cursor,
)?);
}
Ok(streams)
}
fn produce_key_stream(
runtime: &TraversalRuntime,
access: &ExecutableAccessPlan<'_, Value>,
inputs: TraversalInputs<'_>,
spec_cursor: &mut AccessSpecCursor<'_>,
) -> Result<OrderedKeyStreamBox, InternalError> {
match access.node() {
ExecutableAccessNode::Path(path) => {
let path_facts = path.shape_facts();
let index_prefix_specs = if path_facts.index_prefix_spec_count() > 0 {
spec_cursor
.require_next_index_prefix_specs(path_facts.index_prefix_spec_count())?
} else {
&[]
};
let index_range_spec = if path_facts.consumes_index_range_spec() {
Some(spec_cursor.require_next_index_range_spec()?)
} else {
None
};
Self::validate_index_prefix_spec_alignment(path, index_prefix_specs)?;
validate_index_range_spec_alignment(path, index_range_spec)?;
runtime.lower_path_access(path, inputs, index_prefix_specs, index_range_spec)
}
ExecutableAccessNode::Union(children) => {
Self::produce_union_key_stream(runtime, children, inputs, spec_cursor)
}
ExecutableAccessNode::Intersection(children) => {
Self::produce_intersection_key_stream(runtime, children, inputs, spec_cursor)
}
}
}
fn produce_union_key_stream(
runtime: &TraversalRuntime,
children: &[ExecutableAccessPlan<'_, Value>],
inputs: TraversalInputs<'_>,
spec_cursor: &mut AccessSpecCursor<'_>,
) -> Result<OrderedKeyStreamBox, InternalError> {
let streams = Self::collect_child_key_streams(runtime, children, inputs, spec_cursor)?;
let key_comparator = KeyOrderComparator::from_direction(inputs.continuation.direction());
Ok(OrderedKeyStreamBox::merge_all(streams, key_comparator))
}
fn produce_intersection_key_stream(
runtime: &TraversalRuntime,
children: &[ExecutableAccessPlan<'_, Value>],
inputs: TraversalInputs<'_>,
spec_cursor: &mut AccessSpecCursor<'_>,
) -> Result<OrderedKeyStreamBox, InternalError> {
let streams = Self::collect_child_key_streams(runtime, children, inputs, spec_cursor)?;
let key_comparator = KeyOrderComparator::from_direction(inputs.continuation.direction());
Ok(OrderedKeyStreamBox::intersect_all(streams, key_comparator))
}
}