use std::sync::Arc;
use crate::builtins::registry::{
participates_in_demand, pipeline_materialization, pipeline_order_effect, pipeline_shape,
BuiltinId,
};
use crate::builtins::{
BuiltinCardinality, BuiltinMethod, BuiltinPipelineMaterialization, BuiltinPipelineOrderEffect,
BuiltinSelectionPosition, BuiltinSinkAccumulator, BuiltinSinkDemand, BuiltinSinkSpec,
BuiltinSinkValueNeed, BuiltinViewStage,
};
use crate::parse::ast::Expr;
use crate::parse::chain_ir::ChainOp;
use crate::plan::demand::{Demand as ChainDemand, DemandLanes, FieldDemand, PullDemand, ValueNeed};
use crate::vm::{CompiledObjEntry, Opcode, Program};
use super::{
BodyKernel, Pipeline, PipelineBody, PredicateSinkOp, ReducerOp, Sink, Stage,
ViewMembershipTarget, ViewSinkCapability, ViewStageCapability,
};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Position {
First,
Last,
}
#[derive(Debug, Clone, Copy)]
pub struct SinkDemand {
pub chain: ChainDemand,
pub positional: Option<Position>,
}
impl SinkDemand {
pub const RESULT: SinkDemand = SinkDemand {
chain: ChainDemand::RESULT,
positional: None,
};
}
#[allow(dead_code)]
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PayloadDemand {
pub scan_need: FieldDemand,
pub result_need: FieldDemand,
}
#[derive(Debug, Clone)]
pub struct LateProjection {
#[allow(dead_code)]
pub prefix_len: usize,
#[allow(dead_code)]
pub kernel: BodyKernel,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum FallbackBoundary {
None,
LegacyStage {
index: usize,
},
MaterializedExecution,
}
impl From<DemandLanes> for PayloadDemand {
fn from(value: DemandLanes) -> Self {
Self {
scan_need: value.scan_need,
result_need: value.result_need,
}
}
}
impl Sink {
pub fn demand(&self) -> SinkDemand {
if let Sink::Nth(idx) = self {
return SinkDemand {
chain: ChainDemand {
pull: PullDemand::NthInput(*idx),
value: ValueNeed::Whole,
order: false,
},
positional: Some(Position::First),
};
}
if let Sink::SelectMany { n, from_end } = self {
return SinkDemand {
chain: ChainDemand {
pull: if *from_end {
PullDemand::LastInput(*n)
} else {
PullDemand::FirstInput(*n)
},
value: ValueNeed::Whole,
order: true,
},
positional: None,
};
}
if matches!(self, Sink::Predicate(_)) {
let value = match self {
Sink::Predicate(spec) if spec.op == PredicateSinkOp::FindOne => ValueNeed::Whole,
_ => ValueNeed::Predicate,
};
return SinkDemand {
chain: ChainDemand {
pull: PullDemand::All,
value,
order: false,
},
positional: None,
};
}
if matches!(self, Sink::Membership(_)) {
return SinkDemand {
chain: ChainDemand {
pull: PullDemand::All,
value: ValueNeed::Whole,
order: false,
},
positional: None,
};
}
if matches!(self, Sink::ArgExtreme(_)) {
return SinkDemand {
chain: ChainDemand {
pull: PullDemand::All,
value: ValueNeed::Whole,
order: true,
},
positional: None,
};
}
if let Some(spec) = self.builtin_sink_spec() {
return sink_demand_from_builtin(spec);
}
SinkDemand::RESULT
}
pub(crate) fn can_run_with_receiver_only<F>(&self, mut program_ok: F) -> bool
where
F: FnMut(&crate::vm::Program) -> bool,
{
match self {
Sink::Collect
| Sink::Terminal(_)
| Sink::SelectMany { .. }
| Sink::Nth(_)
| Sink::ApproxCountDistinct => true,
Sink::Membership(spec) => spec.sink_programs().all(|prog| program_ok(prog)),
Sink::Predicate(spec) => program_ok(&spec.predicate),
Sink::ArgExtreme(spec) => program_ok(&spec.key),
Sink::Reducer(spec) => spec.sink_programs().all(|prog| program_ok(prog)),
}
}
pub(crate) fn view_capability(
&self,
sink_kernels: &[BodyKernel],
) -> Option<ViewSinkCapability> {
if matches!(self, Sink::Collect) {
return Some(ViewSinkCapability::Collect);
}
if let Sink::SelectMany { n, from_end } = self {
return Some(ViewSinkCapability::SelectMany {
n: *n,
from_end: *from_end,
source_reversed: false,
});
}
if let Sink::Nth(index) = self {
return Some(ViewSinkCapability::Nth { index: *index });
}
if let Sink::Predicate(spec) = self {
return Some(ViewSinkCapability::Predicate {
op: spec.op,
predicate_kernel: view_native_sink_kernel(
sink_kernels,
spec.predicate_kernel_index(),
)?,
});
}
if let Sink::Membership(spec) = self {
return Some(ViewSinkCapability::Membership {
op: spec.op,
target: ViewMembershipTarget::from(&spec.target),
});
}
if let Sink::ArgExtreme(spec) = self {
return Some(ViewSinkCapability::ArgExtreme {
want_max: spec.want_max,
key_kernel: view_native_sink_kernel(sink_kernels, spec.key_kernel_index())?,
});
}
let sink_spec = self.builtin_sink_spec()?;
let reducer = self.reducer_spec();
let predicate_kernel = match reducer
.as_ref()
.and_then(|spec| spec.predicate_kernel_index())
{
Some(idx) => Some(view_native_sink_kernel(sink_kernels, idx)?),
None => None,
};
let project_kernel = match reducer
.as_ref()
.and_then(|spec| spec.projection_kernel_index())
{
Some(idx) => Some(view_native_sink_kernel(sink_kernels, idx)?),
None => None,
};
if sink_spec.accumulator == BuiltinSinkAccumulator::Numeric {
reducer.as_ref()?.numeric_op()?;
}
Some(ViewSinkCapability::from_sink_spec(
sink_spec,
predicate_kernel,
project_kernel,
))
}
pub(crate) fn builtin_sink_spec(&self) -> Option<BuiltinSinkSpec> {
match self {
Sink::Terminal(method) => method.spec().sink,
Sink::Nth(_) => None,
Sink::SelectMany { .. } => None,
Sink::Predicate(_) => None,
Sink::Membership(_) => None,
Sink::ArgExtreme(_) => None,
Sink::Reducer(spec) => spec.method()?.spec().sink,
Sink::ApproxCountDistinct => BuiltinMethod::ApproxCountDistinct.spec().sink,
Sink::Collect => None,
}
}
}
fn view_native_sink_kernel(sink_kernels: &[BodyKernel], idx: usize) -> Option<usize> {
sink_kernels.get(idx)?.is_view_native().then_some(idx)
}
fn sink_demand_from_builtin(spec: BuiltinSinkSpec) -> SinkDemand {
match spec.demand {
BuiltinSinkDemand::First { value } => SinkDemand {
chain: ChainDemand::first(sink_value_need(value)),
positional: match spec.accumulator {
BuiltinSinkAccumulator::SelectOne(position) => Some(position.into()),
_ => None,
},
},
BuiltinSinkDemand::Last { value } => SinkDemand {
chain: ChainDemand {
pull: PullDemand::LastInput(1),
value: sink_value_need(value),
order: true,
},
positional: match spec.accumulator {
BuiltinSinkAccumulator::SelectOne(position) => Some(position.into()),
_ => None,
},
},
BuiltinSinkDemand::All { value, order } => SinkDemand {
chain: ChainDemand {
pull: PullDemand::All,
value: sink_value_need(value),
order,
},
positional: match spec.accumulator {
BuiltinSinkAccumulator::SelectOne(position) => Some(position.into()),
_ => None,
},
},
}
}
fn sink_value_need(value: BuiltinSinkValueNeed) -> ValueNeed {
match value {
BuiltinSinkValueNeed::None => ValueNeed::CountOnly,
BuiltinSinkValueNeed::Whole => ValueNeed::Whole,
BuiltinSinkValueNeed::Numeric => ValueNeed::Numeric,
}
}
impl From<BuiltinSelectionPosition> for Position {
fn from(value: BuiltinSelectionPosition) -> Self {
match value {
BuiltinSelectionPosition::First => Position::First,
BuiltinSelectionPosition::Last => Position::Last,
}
}
}
#[derive(Debug, Clone, Copy)]
pub enum StageStrategy {
Default,
SortTopK(usize),
SortBottomK(usize),
SortUntilOutput(usize),
}
#[derive(Debug, Clone, Copy)]
pub struct StageShape {
pub cardinality: BuiltinCardinality,
pub can_indexed: bool,
pub cost: f64,
pub selectivity: f64,
}
impl StageShape {
pub(crate) fn from_view_stage(stage: BuiltinViewStage) -> Self {
Self {
cardinality: stage.cardinality(),
can_indexed: stage.can_indexed(),
cost: stage.cost(),
selectivity: stage.selectivity(),
}
}
pub(crate) fn from_builtin(method: BuiltinMethod) -> Self {
use crate::builtins::BuiltinCategory;
let spec = method.spec();
if let Some(shape) = pipeline_shape(BuiltinId::from_method(method)) {
return Self {
cardinality: shape.cardinality,
can_indexed: shape.can_indexed,
cost: shape.cost,
selectivity: shape.selectivity,
};
}
Self {
cardinality: spec.cardinality,
can_indexed: spec.can_indexed,
cost: spec.cost,
selectivity: if matches!(spec.category, BuiltinCategory::StreamingFilter) {
0.5
} else {
1.0
},
}
}
}
#[derive(Debug, Clone, Copy)]
pub(crate) struct StageDescriptor<'a> {
pub method: Option<BuiltinMethod>,
pub body: Option<&'a Program>,
pub usize_arg: Option<usize>,
view_stage_override: Option<BuiltinViewStage>,
allow_one_to_one_order_fallback: bool,
receiver_safe_without_body: bool,
}
impl<'a> StageDescriptor<'a> {
#[inline]
pub(crate) fn new(method: BuiltinMethod) -> Self {
Self {
method: Some(method),
body: None,
usize_arg: None,
view_stage_override: None,
allow_one_to_one_order_fallback: false,
receiver_safe_without_body: true,
}
}
#[inline]
pub(crate) fn special() -> Self {
Self {
method: None,
body: None,
usize_arg: None,
view_stage_override: None,
allow_one_to_one_order_fallback: false,
receiver_safe_without_body: true,
}
}
#[inline]
pub(crate) fn body(mut self, body: &'a Program) -> Self {
self.body = Some(body);
self
}
#[inline]
pub(crate) fn usize_arg(mut self, usize_arg: usize) -> Self {
self.usize_arg = Some(usize_arg);
self
}
#[inline]
pub(crate) fn with_view_stage(mut self, stage: BuiltinViewStage) -> Self {
self.view_stage_override = Some(stage);
self
}
#[inline]
pub(crate) fn allow_one_to_one_order_fallback(mut self) -> Self {
self.allow_one_to_one_order_fallback = true;
self
}
#[inline]
pub(crate) fn receiver_unsafe_without_body(mut self) -> Self {
self.receiver_safe_without_body = false;
self
}
#[inline]
pub(crate) fn view_stage(self) -> Option<BuiltinViewStage> {
self.view_stage_override
.or_else(|| self.method.and_then(|method| method.spec().view_stage))
}
#[inline]
pub(crate) fn columnar_stage(self) -> Option<crate::builtins::BuiltinColumnarStage> {
self.method.and_then(|method| method.spec().columnar_stage)
}
#[inline]
pub(crate) fn pipeline_materialization(self) -> BuiltinPipelineMaterialization {
self.method
.map(|method| pipeline_materialization(BuiltinId::from_method(method)))
.unwrap_or(BuiltinPipelineMaterialization::Streaming)
}
#[inline]
pub(crate) fn pipeline_order_effect(self) -> BuiltinPipelineOrderEffect {
let Some(method) = self.method else {
return BuiltinPipelineOrderEffect::Blocks;
};
let spec = method.spec();
if let Some(effect) = pipeline_order_effect(BuiltinId::from_method(method)) {
return effect;
}
if self.allow_one_to_one_order_fallback
&& spec.cardinality == crate::builtins::BuiltinCardinality::OneToOne
{
return BuiltinPipelineOrderEffect::Preserves;
}
BuiltinPipelineOrderEffect::Blocks
}
#[inline]
pub(crate) fn can_run_with_receiver_only<F>(self, program_ok: F) -> bool
where
F: FnMut(&crate::vm::Program) -> bool,
{
self.body
.map(program_ok)
.unwrap_or(self.receiver_safe_without_body)
}
}
macro_rules! view_body_stage_descriptor {
($stage:expr, { $($variant:ident => $method:ident),+ $(,)? }) => {
match $stage {
$(
Stage::$variant(prog, view_stage) => {
Some(StageDescriptor::new(BuiltinMethod::$method).body(prog).with_view_stage(*view_stage))
},
)+
_ => None,
}
};
}
macro_rules! method_stage_descriptor {
($stage:expr, { $($pattern:pat => $method:ident),+ $(,)? }) => {
match $stage {
$(
$pattern => Some(StageDescriptor::new(BuiltinMethod::$method)),
)+
_ => None,
}
};
}
impl Stage {
pub(crate) fn is_composed_barrier(&self) -> bool {
self.pipeline_materialization() == BuiltinPipelineMaterialization::ComposedBarrier
}
pub(crate) fn requires_legacy_materialization(&self) -> bool {
!matches!(
self.pipeline_materialization(),
BuiltinPipelineMaterialization::Streaming
)
}
pub(crate) fn requires_legacy_fallback(&self) -> bool {
matches!(
self.pipeline_materialization(),
BuiltinPipelineMaterialization::LegacyMaterialized
)
}
pub(crate) fn view_capability(
&self,
idx: usize,
kernel: Option<&BodyKernel>,
) -> Option<ViewStageCapability> {
let desc = self.descriptor()?;
let stage = desc.view_stage()?;
if stage == BuiltinViewStage::Distinct {
return match desc.body {
Some(_) if kernel.is_some_and(BodyKernel::is_view_native) => {
Some(ViewStageCapability::Distinct { kernel: Some(idx) })
}
Some(_) => None,
None => Some(ViewStageCapability::Distinct { kernel: None }),
};
}
if stage == BuiltinViewStage::KeyedReduce {
return match (desc.method, desc.body) {
(Some(method), Some(_)) if kernel.is_some_and(BodyKernel::is_view_native) => {
let kind = method.spec().keyed_reducer?;
Some(ViewStageCapability::KeyedReduce { kind, kernel: idx })
}
_ => None,
};
}
ViewStageCapability::from_stage_metadata(
stage,
desc.usize_arg,
idx,
kernel.is_some_and(BodyKernel::is_view_native),
)
}
pub(crate) fn descriptor(&self) -> Option<StageDescriptor<'_>> {
if let Some(desc) = view_body_stage_descriptor!(self, {
Filter => Filter,
Map => Map,
FlatMap => FlatMap,
}) {
return Some(desc);
}
if let Some(desc) = method_stage_descriptor!(self, {
Stage::Reverse(_) => Reverse,
Stage::UniqueBy(None) => Unique,
}) {
return Some(desc);
}
match self {
Stage::UniqueBy(Some(prog)) => {
Some(StageDescriptor::new(BuiltinMethod::UniqueBy).body(prog))
}
Stage::Sort(super::SortSpec { key, .. }) => {
let desc = StageDescriptor::new(BuiltinMethod::Sort);
Some(if let Some(prog) = key {
desc.body(prog)
} else {
desc
})
}
Stage::UsizeBuiltin { method, value } => {
Some(StageDescriptor::new(*method).usize_arg(*value))
}
Stage::StringBuiltin { method, .. } | Stage::StringPairBuiltin { method, .. } => {
Some(StageDescriptor::new(*method))
}
Stage::IntRangeBuiltin { method, .. } => Some(StageDescriptor::new(*method)),
Stage::ExprBuiltin { method, body } => Some(StageDescriptor::new(*method).body(body)),
Stage::Builtin(call) => {
Some(StageDescriptor::new(call.method).allow_one_to_one_order_fallback())
}
Stage::SortedDedup(prog) => {
let desc = StageDescriptor::special();
Some(if let Some(prog) = prog {
desc.body(prog)
} else {
desc
})
}
Stage::CompiledMap(_) => {
Some(StageDescriptor::special().receiver_unsafe_without_body())
}
_ => None,
}
}
fn pipeline_materialization(&self) -> BuiltinPipelineMaterialization {
match self {
Stage::CompiledMap(_) => BuiltinPipelineMaterialization::Streaming,
Stage::SortedDedup(_) => BuiltinPipelineMaterialization::LegacyMaterialized,
_ => self
.descriptor()
.map(StageDescriptor::pipeline_materialization)
.unwrap_or(BuiltinPipelineMaterialization::Streaming),
}
}
pub(crate) fn can_run_with_receiver_only<F>(&self, mut program_ok: F) -> bool
where
F: FnMut(&crate::vm::Program) -> bool,
{
self.descriptor()
.is_some_and(|desc| desc.can_run_with_receiver_only(&mut program_ok))
}
pub(crate) fn body_program(&self) -> Option<&crate::vm::Program> {
self.descriptor().and_then(|desc| desc.body)
}
pub(crate) fn can_use_terminal_map_collector(&self) -> bool {
match self {
Stage::Map(_, _) => true,
Stage::CompiledMap(_) => true,
_ => false,
}
}
pub(crate) fn is_symbolic_map_stage(&self) -> bool {
matches!(self, Stage::CompiledMap(_) | Stage::Map(_, _))
}
pub(crate) fn is_symbolic_filter_stage(&self) -> bool {
matches!(self, Stage::Filter(_, _))
}
pub(crate) fn is_positional_stage(&self) -> bool {
matches!(
self,
Stage::UsizeBuiltin {
method: BuiltinMethod::Take | BuiltinMethod::Skip,
..
}
)
}
pub(crate) fn is_order_only_stage(&self) -> bool {
matches!(self, Stage::Sort(_) | Stage::Reverse(_))
}
pub(crate) fn consumes_input_value(&self) -> bool {
match self {
Stage::Filter(_, _)
| Stage::Map(_, _)
| Stage::FlatMap(_, _)
| Stage::CompiledMap(_)
| Stage::ExprBuiltin { .. }
| Stage::UniqueBy(Some(_))
| Stage::SortedDedup(Some(_)) => true,
Stage::Sort(spec) => spec.key.is_some(),
_ => false,
}
}
pub(crate) fn can_drop_when_value_unused(&self) -> bool {
let Some(desc) = self.descriptor() else {
return false;
};
if !matches!(self.shape().cardinality, BuiltinCardinality::OneToOne) {
return false;
}
if desc.pipeline_order_effect() != BuiltinPipelineOrderEffect::Preserves {
return false;
}
match self {
Stage::Builtin(_) | Stage::IntRangeBuiltin { .. } | Stage::StringPairBuiltin { .. } => {
desc.method.is_some_and(|m| m.spec().pure)
}
Stage::ExprBuiltin {
method:
BuiltinMethod::TransformKeys
| BuiltinMethod::TransformValues
| BuiltinMethod::FilterKeys
| BuiltinMethod::FilterValues,
..
} => true,
_ => false,
}
}
pub fn chain_op(&self) -> Option<ChainOp> {
match self {
Stage::CompiledMap(_) => Some(ChainOp::builtin(BuiltinMethod::Map)),
Stage::SortedDedup(_) => None,
Stage::Filter(prog, _) if program_is_match_only(prog) => Some(ChainOp::match_role(
crate::parse::chain_ir::MatchRole::Predicate,
)),
Stage::Map(prog, _) if program_is_match_only(prog) => Some(ChainOp::match_role(
crate::parse::chain_ir::MatchRole::Transform,
)),
_ => self.chain_demand_op(),
}
}
fn chain_demand_op(&self) -> Option<ChainOp> {
let desc = self.descriptor()?;
let method = desc.method?;
match self {
_ if desc.usize_arg.is_some() => Some(ChainOp::builtin_usize(method, desc.usize_arg?)),
Stage::Builtin(_) => Some(ChainOp::builtin(method)),
_ if participates_in_demand(BuiltinId::from_method(method)) => {
Some(ChainOp::builtin(method))
}
_ => None,
}
}
pub fn upstream_demand(&self, demand: SinkDemand) -> SinkDemand {
let chain = match self.chain_op() {
Some(op) => op.propagate_demand(demand.chain),
None => ChainDemand::RESULT,
};
let positional = if matches!(self.shape().cardinality, BuiltinCardinality::OneToOne) {
demand.positional
} else {
None
};
SinkDemand { chain, positional }
}
pub(crate) fn ordered_prefix_effect(
&self,
sort: &super::SortSpec,
sort_kernel: &BodyKernel,
kernel: &BodyKernel,
) -> bool {
match self.pipeline_order_effect() {
BuiltinPipelineOrderEffect::Preserves => true,
BuiltinPipelineOrderEffect::PredicatePrefix => {
super::plan::predicate_is_order_prefix(sort, sort_kernel, kernel)
}
BuiltinPipelineOrderEffect::Blocks => false,
}
}
fn pipeline_order_effect(&self) -> BuiltinPipelineOrderEffect {
match self {
Stage::CompiledMap(_) => BuiltinPipelineOrderEffect::Preserves,
Stage::SortedDedup(_) => BuiltinPipelineOrderEffect::Blocks,
_ => self
.descriptor()
.map(StageDescriptor::pipeline_order_effect)
.unwrap_or(BuiltinPipelineOrderEffect::Blocks),
}
}
pub fn shape(&self) -> StageShape {
match self {
Stage::CompiledMap(_) => StageShape {
cardinality: BuiltinCardinality::OneToOne,
can_indexed: true,
cost: 10.0,
selectivity: 1.0,
},
Stage::SortedDedup(_) => StageShape {
cardinality: BuiltinCardinality::OneToOne,
can_indexed: true,
cost: 1.0,
selectivity: 1.0,
},
_ => self.descriptor().map_or(
StageShape {
cardinality: BuiltinCardinality::OneToOne,
can_indexed: false,
cost: 1.0,
selectivity: 1.0,
},
|desc| {
desc.view_stage()
.map(StageShape::from_view_stage)
.or_else(|| desc.method.map(StageShape::from_builtin))
.unwrap_or(StageShape {
cardinality: BuiltinCardinality::OneToOne,
can_indexed: false,
cost: 1.0,
selectivity: 1.0,
})
},
),
}
}
pub fn merge_with(&self, other: &Self) -> Option<Self> {
if let Some(merged) = self.merge_with_usize_stage(other) {
return Some(merged);
}
match (self, other) {
(Stage::Sort(_), Stage::Sort(b)) => Some(Stage::Sort(b.clone())),
(Stage::UniqueBy(_), Stage::UniqueBy(b)) => Some(Stage::UniqueBy(b.clone())),
(Stage::UniqueBy(None), Stage::Sort(super::SortSpec { key: None, .. }))
| (Stage::Sort(super::SortSpec { key: None, .. }), Stage::UniqueBy(None)) => {
Some(Stage::SortedDedup(None))
}
(Stage::UniqueBy(Some(a)), Stage::Sort(super::SortSpec { key: Some(b), .. }))
| (Stage::Sort(super::SortSpec { key: Some(a), .. }), Stage::UniqueBy(Some(b)))
if Arc::ptr_eq(a, b) =>
{
Some(Stage::SortedDedup(Some(a.clone())))
}
(Stage::Builtin(a), Stage::Builtin(b)) if a.method == b.method && a.is_idempotent() => {
Some(Stage::Builtin(a.clone()))
}
_ => None,
}
}
fn merge_with_usize_stage(&self, other: &Self) -> Option<Self> {
let lhs = self.usize_stage_merge_parts()?;
let rhs = other.usize_stage_merge_parts()?;
if lhs.stage != rhs.stage || lhs.merge != rhs.merge {
return None;
}
self.with_usize_stage_value(lhs.merge.combine_usize(lhs.value, rhs.value))
}
fn usize_stage_merge_parts(&self) -> Option<UsizeStageMergeParts> {
match self {
Stage::UsizeBuiltin { method, value } => Some(UsizeStageMergeParts {
value: *value,
stage: method.spec().view_stage?,
merge: method.spec().stage_merge?,
}),
_ => None,
}
}
fn with_usize_stage_value(&self, value: usize) -> Option<Self> {
match self {
Stage::UsizeBuiltin { method, .. } => Some(Stage::UsizeBuiltin {
method: *method,
value,
}),
_ => None,
}
}
pub fn cancels_with(&self, other: &Self) -> bool {
match (self.cancellation(), other.cancellation()) {
(Some(a), Some(b)) => a.cancels_with(b),
_ => false,
}
}
fn cancellation(&self) -> Option<crate::builtins::BuiltinCancellation> {
match self {
Stage::Reverse(cancel) => Some(*cancel),
Stage::Builtin(call) => call.spec().cancellation,
_ => None,
}
}
}
#[derive(Debug, Clone, Copy)]
struct UsizeStageMergeParts {
value: usize,
stage: BuiltinViewStage,
merge: crate::builtins::BuiltinStageMerge,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Strategy {
IndexedDispatch,
BarrierMaterialise,
EarlyExit,
PullLoop,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PhysicalExecPath {
Indexed,
Columnar,
Composed,
Legacy,
}
#[derive(Debug, Clone)]
pub struct Plan {
pub stages: Vec<Stage>,
pub stage_exprs: Vec<Option<Arc<Expr>>>,
pub sink: Sink,
}
pub(super) fn stages_can_run_with_materialized_receiver(stages: &[Stage]) -> bool {
stages
.iter()
.all(|stage| stage.can_run_with_receiver_only(program_is_current_only))
}
pub(super) fn program_match_only(program: &Program) -> Option<Arc<crate::vm::CompiledMatch>> {
let mut ops = program.ops.iter();
while let Some(op) = ops.next() {
match op {
Opcode::SetCurrent | Opcode::PushCurrent => continue,
Opcode::Match(cm) => {
if ops.next().is_none() {
return Some(Arc::clone(cm));
}
return None;
}
_ => return None,
}
}
None
}
fn program_is_match_only(program: &Program) -> bool {
program_match_only(program).is_some()
}
pub(super) fn program_is_current_only(program: &Program) -> bool {
program.ops.iter().all(opcode_is_current_only)
}
pub(super) fn opcode_is_current_only(opcode: &Opcode) -> bool {
match opcode {
Opcode::PushRoot | Opcode::RootChain(_) => false,
Opcode::PipelineRun { .. }
| Opcode::LetExpr { .. }
| Opcode::ListComp(_)
| Opcode::DictComp(_)
| Opcode::SetComp(_)
| Opcode::PatchEval(_)
| Opcode::UpdateBatchEval(_)
| Opcode::Match(_)
| Opcode::DeepMatchAll(_)
| Opcode::DeepMatchFirst(_) => false,
Opcode::DynIndex(prog)
| Opcode::InlineFilter(prog)
| Opcode::AndOp(prog)
| Opcode::OrOp(prog)
| Opcode::CoalesceOp(prog) => program_is_current_only(prog),
Opcode::BindLamCurrent { body, .. } => program_is_current_only(body),
Opcode::CallMethod(call) | Opcode::CallOptMethod(call) => call
.sub_progs
.iter()
.all(|prog| program_is_current_only(prog)),
Opcode::IfElse { then_, else_ } => {
program_is_current_only(then_) && program_is_current_only(else_)
}
Opcode::TryExpr { body, default } => {
program_is_current_only(body) && program_is_current_only(default)
}
Opcode::MakeArr(items) => items
.iter()
.all(|(prog, _spread)| program_is_current_only(prog)),
Opcode::FString(parts) => parts.iter().all(|part| match part {
crate::vm::CompiledFSPart::Lit(_) => true,
crate::vm::CompiledFSPart::Interp { prog, .. } => program_is_current_only(prog),
}),
Opcode::MakeObj(entries) => entries.iter().all(obj_entry_is_current_only),
Opcode::PushNull
| Opcode::PushBool(_)
| Opcode::PushInt(_)
| Opcode::PushFloat(_)
| Opcode::PushStr(_)
| Opcode::PushCurrent
| Opcode::LoadIdent(_)
| Opcode::GetField(_)
| Opcode::GetIndex(_)
| Opcode::GetSlice(_, _, _)
| Opcode::OptField(_)
| Opcode::Descendant(_)
| Opcode::DescendAll
| Opcode::Quantifier(_)
| Opcode::FieldChain(_)
| Opcode::Add
| Opcode::Sub
| Opcode::Mul
| Opcode::Div
| Opcode::Mod
| Opcode::Eq
| Opcode::Neq
| Opcode::Lt
| Opcode::Lte
| Opcode::Gt
| Opcode::Gte
| Opcode::Fuzzy
| Opcode::Not
| Opcode::Neg
| Opcode::CastOp(_)
| Opcode::KindCheck { .. }
| Opcode::SetCurrent
| Opcode::DeleteMarkErr => true,
}
}
pub(super) fn obj_entry_is_current_only(entry: &CompiledObjEntry) -> bool {
match entry {
CompiledObjEntry::Short { .. } | CompiledObjEntry::KvPath { .. } => true,
CompiledObjEntry::Kv { prog, cond, .. } => {
program_is_current_only(prog)
&& cond
.as_ref()
.is_none_or(|cond| program_is_current_only(cond))
}
CompiledObjEntry::Dynamic { key, val } => {
program_is_current_only(key) && program_is_current_only(val)
}
CompiledObjEntry::Spread(prog) | CompiledObjEntry::SpreadDeep(prog) => {
program_is_current_only(prog)
}
}
}
impl PipelineBody {
pub(crate) fn can_run_with_materialized_receiver(&self) -> bool {
stages_can_run_with_materialized_receiver(&self.stages)
&& self
.sink
.can_run_with_receiver_only(program_is_current_only)
}
pub(crate) fn suffix_can_run_with_materialized_receiver(&self, consumed_stages: usize) -> bool {
consumed_stages <= self.stages.len()
&& stages_can_run_with_materialized_receiver(&self.stages[consumed_stages..])
&& self
.sink
.can_run_with_receiver_only(program_is_current_only)
}
}
impl Pipeline {
pub fn segment_source_demand(stages: &[Stage], sink: &Sink) -> SinkDemand {
stages
.iter()
.rev()
.fold(sink.demand(), |demand, stage| stage.upstream_demand(demand))
}
pub fn source_demand(&self) -> SinkDemand {
self.source_demand
}
#[allow(dead_code)]
pub fn segment_payload_demand(
stages: &[Stage],
stage_kernels: &[BodyKernel],
sink: &Sink,
sink_kernels: &[BodyKernel],
) -> PayloadDemand {
let mut lanes = sink_payload_lanes(sink, sink_kernels);
for (idx, stage) in stages.iter().enumerate().rev() {
let kernel = stage_kernels.get(idx).unwrap_or(&BodyKernel::Generic);
lanes = stage_payload_lanes(stage, kernel, lanes);
}
lanes.into()
}
#[allow(dead_code)]
pub fn payload_demand(&self) -> PayloadDemand {
self.payload_demand.clone()
}
pub fn late_projection_for(
stages: &[Stage],
stage_kernels: &[BodyKernel],
) -> Option<LateProjection> {
let mut idx = stages.len();
let mut kernel = BodyKernel::Current;
let mut found = false;
while idx > 0 {
let stage_idx = idx - 1;
let Some(stage_kernel) =
trailing_projection_kernel(&stages[stage_idx], stage_kernels.get(stage_idx))
else {
break;
};
kernel = compose_projection_kernel(stage_kernel, kernel);
found = true;
idx -= 1;
}
found.then_some(LateProjection {
prefix_len: idx,
kernel,
})
}
pub fn fallback_boundary_for(
stages: &[Stage],
exec_path: PhysicalExecPath,
) -> FallbackBoundary {
if let Some(index) = stages.iter().position(Stage::requires_legacy_fallback) {
return FallbackBoundary::LegacyStage { index };
}
match exec_path {
PhysicalExecPath::Legacy => FallbackBoundary::MaterializedExecution,
PhysicalExecPath::Indexed | PhysicalExecPath::Columnar | PhysicalExecPath::Composed => {
FallbackBoundary::None
}
}
}
}
fn trailing_projection_kernel(stage: &Stage, kernel: Option<&BodyKernel>) -> Option<BodyKernel> {
match stage {
Stage::Map(_, _) => {
let kernel = kernel?;
kernel.is_view_native().then(|| kernel.clone())
}
Stage::Builtin(call)
if call.spec().pure
&& call.spec().view_scalar
&& call.spec().cardinality == crate::builtins::BuiltinCardinality::OneToOne =>
{
Some(BodyKernel::BuiltinCall {
receiver: Box::new(BodyKernel::Current),
call: call.clone(),
})
}
_ => None,
}
}
fn compose_projection_kernel(first: BodyKernel, then: BodyKernel) -> BodyKernel {
if matches!(then, BodyKernel::Current) {
return first;
}
BodyKernel::Compose {
first: Box::new(first),
then: Box::new(then),
}
}
#[allow(dead_code)]
fn sink_payload_lanes(sink: &Sink, sink_kernels: &[BodyKernel]) -> DemandLanes {
match sink {
Sink::Collect | Sink::Terminal(_) | Sink::SelectMany { .. } | Sink::Nth(_) => {
DemandLanes::RESULT
}
Sink::Reducer(spec) if spec.op == ReducerOp::Count => {
let mut lanes = DemandLanes::NONE;
if let Some(idx) = spec.predicate_kernel_index() {
lanes.merge_scan(kernel_payload_need(sink_kernels, idx));
}
lanes
}
Sink::Reducer(spec) => {
let mut lanes = DemandLanes::NONE;
if let Some(idx) = spec.predicate_kernel_index() {
lanes.merge_scan(kernel_payload_need(sink_kernels, idx));
}
lanes.merge_scan(match spec.projection_kernel_index() {
Some(idx) => kernel_payload_need(sink_kernels, idx),
None => FieldDemand::Whole,
});
lanes
}
Sink::Predicate(spec) => {
let mut lanes = DemandLanes::NONE;
lanes.merge_scan(kernel_payload_need(
sink_kernels,
spec.predicate_kernel_index(),
));
if spec.op == PredicateSinkOp::FindOne {
lanes.merge_result(FieldDemand::Whole);
}
lanes
}
Sink::Membership(_) | Sink::ApproxCountDistinct => DemandLanes {
scan_need: FieldDemand::Whole,
result_need: FieldDemand::None,
},
Sink::ArgExtreme(spec) => {
let mut lanes = DemandLanes::RESULT;
lanes.merge_scan(kernel_payload_need(sink_kernels, spec.key_kernel_index()));
lanes
}
}
}
#[allow(dead_code)]
fn stage_payload_lanes(stage: &Stage, kernel: &BodyKernel, downstream: DemandLanes) -> DemandLanes {
match stage {
Stage::Filter(_, _) => {
let mut lanes = downstream;
lanes.merge_scan(kernel.field_demand());
lanes
}
Stage::Map(_, _) | Stage::CompiledMap(_) => DemandLanes {
scan_need: map_lane_payload(&downstream.scan_need, kernel),
result_need: map_lane_payload(&downstream.result_need, kernel),
},
Stage::FlatMap(_, _) => DemandLanes {
scan_need: FieldDemand::Whole,
result_need: FieldDemand::Whole,
},
Stage::Sort(spec) => {
let mut lanes = downstream;
lanes.merge_scan(match spec.key {
Some(_) => kernel.field_demand(),
None => FieldDemand::Whole,
});
lanes
}
Stage::UniqueBy(Some(_)) | Stage::SortedDedup(Some(_)) => {
let mut lanes = downstream;
lanes.merge_scan(kernel.field_demand());
lanes
}
Stage::UniqueBy(None) | Stage::SortedDedup(None) => {
let mut lanes = downstream;
lanes.merge_scan(FieldDemand::Whole);
lanes
}
Stage::ExprBuiltin { method, .. }
if matches!(
method,
BuiltinMethod::TakeWhile | BuiltinMethod::DropWhile | BuiltinMethod::FilterKeys
) =>
{
let mut lanes = downstream;
lanes.merge_scan(kernel.field_demand());
lanes
}
Stage::ExprBuiltin {
method:
BuiltinMethod::Map
| BuiltinMethod::TransformKeys
| BuiltinMethod::TransformValues
| BuiltinMethod::FilterValues,
..
} => DemandLanes {
scan_need: map_lane_payload(&downstream.scan_need, kernel),
result_need: map_lane_payload(&downstream.result_need, kernel),
},
Stage::Builtin(call)
if call.method.spec().cardinality == crate::builtins::BuiltinCardinality::OneToOne =>
{
if downstream.scan_need.is_none() && downstream.result_need.is_none() {
downstream
} else {
DemandLanes {
scan_need: if downstream.scan_need.is_none() {
FieldDemand::None
} else {
FieldDemand::Whole
},
result_need: if downstream.result_need.is_none() {
FieldDemand::None
} else {
FieldDemand::Whole
},
}
}
}
_ if stage.consumes_input_value() => DemandLanes {
scan_need: FieldDemand::Whole,
result_need: downstream.result_need,
},
_ => downstream,
}
}
#[allow(dead_code)]
fn map_lane_payload(demand: &FieldDemand, kernel: &BodyKernel) -> FieldDemand {
if demand.is_none() {
FieldDemand::None
} else {
kernel.field_demand()
}
}
#[allow(dead_code)]
fn kernel_payload_need(kernels: &[BodyKernel], idx: usize) -> FieldDemand {
kernels
.get(idx)
.map(BodyKernel::field_demand)
.unwrap_or(FieldDemand::Whole)
}