use crate::{
db::{
access::AccessKey,
executor::{
ExecutableAccessNode, ExecutableAccessPath, ExecutableAccessPlan,
LoweredIndexPrefixSpec, LoweredIndexRangeSpec,
stream::{
access::{
bindings::{
AccessScanContinuationInput, AccessSpecCursor, ExecutableAccess,
IndexStreamConstraints, StreamExecutionHints,
},
physical,
},
key::{
KeyOrderComparator, OrderedKeyStreamBox,
ordered_key_stream_from_materialized_keys,
},
},
traversal::IndexRangeTraversalContract,
},
},
error::InternalError,
};
#[derive(Clone, Copy)]
struct TraversalInputs<'a> {
index_prefix_specs: &'a [LoweredIndexPrefixSpec],
index_range_specs: &'a [LoweredIndexRangeSpec],
continuation: AccessScanContinuationInput<'a>,
physical_fetch_hint: Option<usize>,
index_predicate_execution: Option<crate::db::index::predicate::IndexPredicateExecution<'a>>,
preserve_leaf_index_order: bool,
}
impl<'a> TraversalInputs<'a> {
const fn with_physical_fetch_hint(self, physical_fetch_hint: Option<usize>) -> Self {
Self {
physical_fetch_hint,
..self
}
}
const fn without_leaf_index_order_preservation(self) -> Self {
Self {
preserve_leaf_index_order: false,
..self
}
}
const fn spec_cursor(&self) -> AccessSpecCursor<'a> {
AccessSpecCursor::new(self.index_prefix_specs, self.index_range_specs)
}
}
fn validate_index_range_spec_alignment(
path: &ExecutableAccessPath<'_, AccessKey>,
index_range_spec: Option<&LoweredIndexRangeSpec>,
) -> Result<(), InternalError> {
IndexRangeTraversalContract::validate_spec_alignment(path, index_range_spec)
}
#[derive(Clone, Copy)]
pub(in crate::db::executor) struct TraversalRuntime {
pub(in crate::db::executor) store: crate::db::registry::StoreHandle,
pub(in crate::db::executor) entity_tag: crate::types::EntityTag,
}
impl TraversalRuntime {
#[must_use]
pub(in crate::db::executor) const fn new(
store: crate::db::registry::StoreHandle,
entity_tag: crate::types::EntityTag,
) -> Self {
Self { store, entity_tag }
}
pub(in crate::db::executor) fn ordered_key_stream_from_runtime_access(
&self,
request: ExecutableAccess<'_, AccessKey>,
) -> Result<OrderedKeyStreamBox, InternalError> {
let inputs = TraversalInputs {
index_prefix_specs: request.bindings.index_prefix_specs,
index_range_specs: request.bindings.index_range_specs,
continuation: request.bindings.continuation,
physical_fetch_hint: request.physical_fetch_hint,
index_predicate_execution: request.index_predicate_execution,
preserve_leaf_index_order: request.preserve_leaf_index_order,
};
let mut spec_cursor = inputs.spec_cursor();
let key_stream = AccessPlanStreamResolver::produce_key_stream(
self,
&request.plan,
inputs,
&mut spec_cursor,
)?;
spec_cursor.validate_consumed()?;
Ok(key_stream)
}
fn lower_path_access(
&self,
path: &ExecutableAccessPath<'_, AccessKey>,
inputs: TraversalInputs<'_>,
index_prefix_specs: &[LoweredIndexPrefixSpec],
index_range_spec: Option<&LoweredIndexRangeSpec>,
) -> Result<OrderedKeyStreamBox, InternalError> {
let constraints = IndexStreamConstraints {
prefixes: index_prefix_specs,
range: index_range_spec,
};
let hints = StreamExecutionHints {
physical_fetch_hint: inputs.physical_fetch_hint,
predicate_execution: inputs.index_predicate_execution,
};
path.resolve_structural_physical_key_stream(physical::StructuralPhysicalStreamRequest {
store: self.store,
entity_tag: self.entity_tag,
index_prefix_specs: constraints.prefixes,
index_range_spec: constraints.range,
continuation: inputs.continuation,
physical_fetch_hint: hints.physical_fetch_hint,
index_predicate_execution: hints.predicate_execution,
preserve_leaf_index_order: inputs.preserve_leaf_index_order,
})
}
}
struct AccessPlanStreamResolver;
impl AccessPlanStreamResolver {
fn validate_index_prefix_spec_alignment(
path: &ExecutableAccessPath<'_, AccessKey>,
index_prefix_specs: &[LoweredIndexPrefixSpec],
) -> Result<(), InternalError> {
let path_capabilities = path.capabilities();
if let Some(index) = path_capabilities.index_prefix_model() {
for spec in index_prefix_specs {
if spec.index() != &index {
return Err(InternalError::query_executor_invariant(
"index-prefix spec does not match access path index",
));
}
}
}
Ok(())
}
fn collect_child_key_streams<'a>(
runtime: &TraversalRuntime,
children: &[ExecutableAccessPlan<'a, AccessKey>],
inputs: TraversalInputs<'a>,
spec_cursor: &mut AccessSpecCursor<'a>,
) -> Result<Vec<OrderedKeyStreamBox>, InternalError> {
let mut streams = Vec::with_capacity(children.len());
for child in children {
let child_inputs = inputs
.with_physical_fetch_hint(None)
.without_leaf_index_order_preservation();
streams.push(Self::produce_key_stream(
runtime,
child,
child_inputs,
spec_cursor,
)?);
}
Ok(streams)
}
fn reduce_key_streams<F>(
mut streams: Vec<OrderedKeyStreamBox>,
combiner: F,
) -> OrderedKeyStreamBox
where
F: Fn(OrderedKeyStreamBox, OrderedKeyStreamBox) -> OrderedKeyStreamBox,
{
if streams.is_empty() {
return ordered_key_stream_from_materialized_keys(Vec::new());
}
if streams.len() == 1 {
return streams
.pop()
.unwrap_or_else(|| ordered_key_stream_from_materialized_keys(Vec::new()));
}
while streams.len() > 1 {
let mut next_round = Vec::with_capacity((streams.len().saturating_add(1)) / 2);
let mut iter = streams.into_iter();
while let Some(left) = iter.next() {
if let Some(right) = iter.next() {
next_round.push(combiner(left, right));
} else {
next_round.push(left);
}
}
streams = next_round;
}
streams
.pop()
.unwrap_or_else(|| ordered_key_stream_from_materialized_keys(Vec::new()))
}
fn produce_key_stream<'a>(
runtime: &TraversalRuntime,
access: &ExecutableAccessPlan<'a, AccessKey>,
inputs: TraversalInputs<'a>,
spec_cursor: &mut AccessSpecCursor<'a>,
) -> Result<OrderedKeyStreamBox, InternalError> {
match access.node() {
ExecutableAccessNode::Path(path) => {
let path_capabilities = path.capabilities();
let index_prefix_specs = if path_capabilities.index_prefix_spec_count() > 0 {
spec_cursor.require_next_index_prefix_specs(
path_capabilities.index_prefix_spec_count(),
)?
} else {
&[]
};
let index_range_spec = if path_capabilities.consumes_index_range_spec() {
Some(spec_cursor.require_next_index_range_spec()?)
} else {
None
};
Self::validate_index_prefix_spec_alignment(path, index_prefix_specs)?;
validate_index_range_spec_alignment(path, index_range_spec)?;
runtime.lower_path_access(path, inputs, index_prefix_specs, index_range_spec)
}
ExecutableAccessNode::Union(children) => {
Self::produce_union_key_stream(runtime, children, inputs, spec_cursor)
}
ExecutableAccessNode::Intersection(children) => {
Self::produce_intersection_key_stream(runtime, children, inputs, spec_cursor)
}
}
}
fn produce_union_key_stream<'a>(
runtime: &TraversalRuntime,
children: &[ExecutableAccessPlan<'a, AccessKey>],
inputs: TraversalInputs<'a>,
spec_cursor: &mut AccessSpecCursor<'a>,
) -> Result<OrderedKeyStreamBox, InternalError> {
let streams = Self::collect_child_key_streams(runtime, children, inputs, spec_cursor)?;
let key_comparator = KeyOrderComparator::from_direction(inputs.continuation.direction());
Ok(Self::reduce_key_streams(streams, |left, right| {
OrderedKeyStreamBox::merge(left, right, key_comparator)
}))
}
fn produce_intersection_key_stream<'a>(
runtime: &TraversalRuntime,
children: &[ExecutableAccessPlan<'a, AccessKey>],
inputs: TraversalInputs<'a>,
spec_cursor: &mut AccessSpecCursor<'a>,
) -> Result<OrderedKeyStreamBox, InternalError> {
let streams = Self::collect_child_key_streams(runtime, children, inputs, spec_cursor)?;
let key_comparator = KeyOrderComparator::from_direction(inputs.continuation.direction());
Ok(Self::reduce_key_streams(streams, |left, right| {
OrderedKeyStreamBox::intersect(left, right, key_comparator)
}))
}
}