use std::sync::Arc;
use crate::ast::Expr;
use crate::builtin_registry::{
participates_in_demand, pipeline_materialization, pipeline_order_effect,
pipeline_shape, BuiltinId,
};
use crate::builtins::{
BuiltinMethod, BuiltinPipelineMaterialization,
BuiltinPipelineOrderEffect, BuiltinSelectionPosition, BuiltinSinkAccumulator,
BuiltinSinkDemand, BuiltinSinkSpec, BuiltinSinkValueNeed, BuiltinViewStage,
};
use crate::chain_ir::{ChainOp, Demand as ChainDemand, PullDemand, ValueNeed};
use crate::vm::{CompiledObjEntry, Opcode, Program};
use super::{BodyKernel, Pipeline, PipelineBody, Sink, Stage, ViewSinkCapability, ViewStageCapability};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Position {
First,
Last,
}
#[derive(Debug, Clone, Copy)]
pub struct SinkDemand {
pub chain: ChainDemand,
pub positional: Option<Position>,
}
impl SinkDemand {
pub const RESULT: SinkDemand = SinkDemand {
chain: ChainDemand::RESULT,
positional: None,
};
}
impl Sink {
pub fn demand(&self) -> SinkDemand {
if let Some(spec) = self.builtin_sink_spec() {
return sink_demand_from_builtin(spec);
}
SinkDemand::RESULT
}
pub(crate) fn can_run_with_receiver_only<F>(&self, mut program_ok: F) -> bool
where
F: FnMut(&crate::vm::Program) -> bool,
{
match self {
Sink::Collect | Sink::Terminal(_) | Sink::ApproxCountDistinct => true,
Sink::Reducer(spec) => spec.sink_programs().all(|prog| program_ok(prog)),
}
}
pub(crate) fn view_capability(
&self,
sink_kernels: &[BodyKernel],
) -> Option<ViewSinkCapability> {
if matches!(self, Sink::Collect) {
return Some(ViewSinkCapability::Collect);
}
let sink_spec = self.builtin_sink_spec()?;
let reducer = self.reducer_spec();
let predicate_kernel = match reducer
.as_ref()
.and_then(|spec| spec.predicate_kernel_index())
{
Some(idx) => Some(view_native_sink_kernel(sink_kernels, idx)?),
None => None,
};
let project_kernel = match reducer
.as_ref()
.and_then(|spec| spec.projection_kernel_index())
{
Some(idx) => Some(view_native_sink_kernel(sink_kernels, idx)?),
None => None,
};
if sink_spec.accumulator == BuiltinSinkAccumulator::Numeric {
reducer.as_ref()?.numeric_op()?;
}
Some(ViewSinkCapability::from_sink_spec(
sink_spec,
predicate_kernel,
project_kernel,
))
}
pub(crate) fn builtin_sink_spec(&self) -> Option<BuiltinSinkSpec> {
match self {
Sink::Terminal(method) => method.spec().sink,
Sink::Reducer(spec) => spec.method()?.spec().sink,
Sink::ApproxCountDistinct => BuiltinMethod::ApproxCountDistinct.spec().sink,
Sink::Collect => None,
}
}
}
fn view_native_sink_kernel(sink_kernels: &[BodyKernel], idx: usize) -> Option<usize> {
sink_kernels.get(idx)?.is_view_native().then_some(idx)
}
fn sink_demand_from_builtin(spec: BuiltinSinkSpec) -> SinkDemand {
match spec.demand {
BuiltinSinkDemand::First { value } => SinkDemand {
chain: ChainDemand::first(sink_value_need(value)),
positional: match spec.accumulator {
BuiltinSinkAccumulator::SelectOne(position) => Some(position.into()),
_ => None,
},
},
BuiltinSinkDemand::All { value, order } => SinkDemand {
chain: ChainDemand {
pull: PullDemand::All,
value: sink_value_need(value),
order,
},
positional: match spec.accumulator {
BuiltinSinkAccumulator::SelectOne(position) => Some(position.into()),
_ => None,
},
},
}
}
fn sink_value_need(value: BuiltinSinkValueNeed) -> ValueNeed {
match value {
BuiltinSinkValueNeed::None => ValueNeed::None,
BuiltinSinkValueNeed::Whole => ValueNeed::Whole,
BuiltinSinkValueNeed::Numeric => ValueNeed::Numeric,
}
}
impl From<BuiltinSelectionPosition> for Position {
fn from(value: BuiltinSelectionPosition) -> Self {
match value {
BuiltinSelectionPosition::First => Position::First,
BuiltinSelectionPosition::Last => Position::Last,
}
}
}
#[derive(Debug, Clone, Copy)]
pub enum StageStrategy {
Default,
SortTopK(usize),
SortBottomK(usize),
SortUntilOutput(usize),
}
#[derive(Debug, Clone, Copy)]
pub struct StageShape {
pub cardinality: crate::chain_ir::Cardinality,
pub can_indexed: bool,
pub cost: f64,
pub selectivity: f64,
}
impl StageShape {
pub(crate) fn from_view_stage(stage: BuiltinViewStage) -> Self {
Self {
cardinality: stage.cardinality().into(),
can_indexed: stage.can_indexed(),
cost: stage.cost(),
selectivity: stage.selectivity(),
}
}
pub(crate) fn from_builtin(method: BuiltinMethod) -> Self {
use crate::builtins::BuiltinCategory;
let spec = method.spec();
if let Some(shape) = pipeline_shape(BuiltinId::from_method(method)) {
return Self {
cardinality: shape.cardinality.into(),
can_indexed: shape.can_indexed,
cost: shape.cost,
selectivity: shape.selectivity,
};
}
Self {
cardinality: spec.cardinality.into(),
can_indexed: spec.can_indexed,
cost: spec.cost,
selectivity: if matches!(spec.category, BuiltinCategory::StreamingFilter) {
0.5
} else {
1.0
},
}
}
}
#[derive(Debug, Clone, Copy)]
pub(crate) struct StageDescriptor<'a> {
pub method: Option<BuiltinMethod>,
pub body: Option<&'a Program>,
pub usize_arg: Option<usize>,
view_stage_override: Option<BuiltinViewStage>,
allow_one_to_one_order_fallback: bool,
receiver_safe_without_body: bool,
}
impl<'a> StageDescriptor<'a> {
#[inline]
pub(crate) fn new(method: BuiltinMethod) -> Self {
Self {
method: Some(method),
body: None,
usize_arg: None,
view_stage_override: None,
allow_one_to_one_order_fallback: false,
receiver_safe_without_body: true,
}
}
#[inline]
pub(crate) fn special() -> Self {
Self {
method: None,
body: None,
usize_arg: None,
view_stage_override: None,
allow_one_to_one_order_fallback: false,
receiver_safe_without_body: true,
}
}
#[inline]
pub(crate) fn body(mut self, body: &'a Program) -> Self {
self.body = Some(body);
self
}
#[inline]
pub(crate) fn usize_arg(mut self, usize_arg: usize) -> Self {
self.usize_arg = Some(usize_arg);
self
}
#[inline]
pub(crate) fn with_view_stage(mut self, stage: BuiltinViewStage) -> Self {
self.view_stage_override = Some(stage);
self
}
#[inline]
pub(crate) fn allow_one_to_one_order_fallback(mut self) -> Self {
self.allow_one_to_one_order_fallback = true;
self
}
#[inline]
pub(crate) fn receiver_unsafe_without_body(mut self) -> Self {
self.receiver_safe_without_body = false;
self
}
#[inline]
pub(crate) fn view_stage(self) -> Option<BuiltinViewStage> {
self.view_stage_override
.or_else(|| self.method.and_then(|method| method.spec().view_stage))
}
#[inline]
pub(crate) fn columnar_stage(self) -> Option<crate::builtins::BuiltinColumnarStage> {
self.method.and_then(|method| method.spec().columnar_stage)
}
#[inline]
pub(crate) fn pipeline_materialization(self) -> BuiltinPipelineMaterialization {
self.method
.map(|method| pipeline_materialization(BuiltinId::from_method(method)))
.unwrap_or(BuiltinPipelineMaterialization::Streaming)
}
#[inline]
pub(crate) fn pipeline_order_effect(self) -> BuiltinPipelineOrderEffect {
let Some(method) = self.method else {
return BuiltinPipelineOrderEffect::Blocks;
};
let spec = method.spec();
if let Some(effect) = pipeline_order_effect(BuiltinId::from_method(method)) {
return effect;
}
if self.allow_one_to_one_order_fallback
&& spec.cardinality == crate::builtins::BuiltinCardinality::OneToOne
{
return BuiltinPipelineOrderEffect::Preserves;
}
BuiltinPipelineOrderEffect::Blocks
}
#[inline]
pub(crate) fn can_run_with_receiver_only<F>(self, program_ok: F) -> bool
where
F: FnMut(&crate::vm::Program) -> bool,
{
self.body
.map(program_ok)
.unwrap_or(self.receiver_safe_without_body)
}
}
macro_rules! view_body_stage_descriptor {
($stage:expr, { $($variant:ident => $method:ident),+ $(,)? }) => {
match $stage {
$(
Stage::$variant(prog, view_stage) => {
Some(StageDescriptor::new(BuiltinMethod::$method).body(prog).with_view_stage(*view_stage))
},
)+
_ => None,
}
};
}
macro_rules! method_stage_descriptor {
($stage:expr, { $($pattern:pat => $method:ident),+ $(,)? }) => {
match $stage {
$(
$pattern => Some(StageDescriptor::new(BuiltinMethod::$method)),
)+
_ => None,
}
};
}
impl Stage {
pub(crate) fn is_composed_barrier(&self) -> bool {
self.pipeline_materialization() == BuiltinPipelineMaterialization::ComposedBarrier
}
pub(crate) fn requires_legacy_materialization(&self) -> bool {
!matches!(
self.pipeline_materialization(),
BuiltinPipelineMaterialization::Streaming
)
}
pub(crate) fn view_capability(
&self,
idx: usize,
kernel: Option<&BodyKernel>,
) -> Option<ViewStageCapability> {
let desc = self.descriptor()?;
let stage = desc.view_stage()?;
if stage == BuiltinViewStage::Distinct {
return match desc.body {
Some(_) if kernel.is_some_and(BodyKernel::is_view_native) => {
Some(ViewStageCapability::Distinct { kernel: Some(idx) })
}
Some(_) => None,
None => Some(ViewStageCapability::Distinct { kernel: None }),
};
}
if stage == BuiltinViewStage::KeyedReduce {
return match (desc.method, desc.body) {
(Some(method), Some(_)) if kernel.is_some_and(BodyKernel::is_view_native) => {
let kind = method.spec().keyed_reducer?;
Some(ViewStageCapability::KeyedReduce { kind, kernel: idx })
}
_ => None,
};
}
ViewStageCapability::from_stage_metadata(
stage,
desc.usize_arg,
idx,
kernel.is_some_and(BodyKernel::is_view_native),
)
}
pub(crate) fn descriptor(&self) -> Option<StageDescriptor<'_>> {
if let Some(desc) = view_body_stage_descriptor!(self, {
Filter => Filter,
Map => Map,
FlatMap => FlatMap,
}) {
return Some(desc);
}
if let Some(desc) = method_stage_descriptor!(self, {
Stage::Reverse(_) => Reverse,
Stage::UniqueBy(None) => Unique,
}) {
return Some(desc);
}
match self {
Stage::UniqueBy(Some(prog)) => {
Some(StageDescriptor::new(BuiltinMethod::UniqueBy).body(prog))
}
Stage::Sort(super::SortSpec { key, .. }) => {
let desc = StageDescriptor::new(BuiltinMethod::Sort);
Some(if let Some(prog) = key {
desc.body(prog)
} else {
desc
})
}
Stage::UsizeBuiltin { method, value } => {
Some(StageDescriptor::new(*method).usize_arg(*value))
}
Stage::StringBuiltin { method, .. } | Stage::StringPairBuiltin { method, .. } => {
Some(StageDescriptor::new(*method))
}
Stage::IntRangeBuiltin { method, .. } => Some(StageDescriptor::new(*method)),
Stage::ExprBuiltin { method, body } => Some(StageDescriptor::new(*method).body(body)),
Stage::Builtin(call) => Some(
StageDescriptor::new(call.method).allow_one_to_one_order_fallback(),
),
Stage::SortedDedup(prog) => {
let desc = StageDescriptor::special();
Some(if let Some(prog) = prog {
desc.body(prog)
} else {
desc
})
}
Stage::CompiledMap(_) => Some(StageDescriptor::special().receiver_unsafe_without_body()),
_ => None,
}
}
fn pipeline_materialization(&self) -> BuiltinPipelineMaterialization {
match self {
Stage::CompiledMap(_) => BuiltinPipelineMaterialization::Streaming,
Stage::SortedDedup(_) => BuiltinPipelineMaterialization::LegacyMaterialized,
_ => self
.descriptor()
.map(StageDescriptor::pipeline_materialization)
.unwrap_or(BuiltinPipelineMaterialization::Streaming),
}
}
pub(crate) fn can_run_with_receiver_only<F>(&self, mut program_ok: F) -> bool
where
F: FnMut(&crate::vm::Program) -> bool,
{
self.descriptor()
.is_some_and(|desc| desc.can_run_with_receiver_only(&mut program_ok))
}
pub(crate) fn body_program(&self) -> Option<&crate::vm::Program> {
self.descriptor().and_then(|desc| desc.body)
}
pub(crate) fn can_use_terminal_map_collector(&self) -> bool {
match self {
Stage::Map(_, _) => true,
Stage::CompiledMap(_) => true,
_ => false,
}
}
pub(crate) fn is_symbolic_map_stage(&self) -> bool {
matches!(self, Stage::CompiledMap(_) | Stage::Map(_, _))
}
pub(crate) fn is_symbolic_filter_stage(&self) -> bool {
matches!(self, Stage::Filter(_, _))
}
pub(crate) fn is_positional_stage(&self) -> bool {
matches!(
self,
Stage::UsizeBuiltin {
method: BuiltinMethod::Take | BuiltinMethod::Skip,
..
}
)
}
pub(crate) fn is_order_only_stage(&self) -> bool {
matches!(self, Stage::Sort(_) | Stage::Reverse(_))
}
pub(crate) fn consumes_input_value(&self) -> bool {
match self {
Stage::Filter(_, _)
| Stage::Map(_, _)
| Stage::FlatMap(_, _)
| Stage::CompiledMap(_)
| Stage::ExprBuiltin { .. }
| Stage::UniqueBy(Some(_))
| Stage::SortedDedup(Some(_)) => true,
Stage::Sort(spec) => spec.key.is_some(),
_ => false,
}
}
pub(crate) fn can_drop_when_value_unused(&self) -> bool {
let Some(desc) = self.descriptor() else {
return false;
};
if !matches!(
self.shape().cardinality,
crate::chain_ir::Cardinality::OneToOne
) {
return false;
}
if desc.pipeline_order_effect() != BuiltinPipelineOrderEffect::Preserves {
return false;
}
match self {
Stage::Builtin(_) | Stage::IntRangeBuiltin { .. } | Stage::StringPairBuiltin { .. } => {
desc.method.is_some_and(|m| m.spec().pure)
}
Stage::ExprBuiltin {
method:
BuiltinMethod::TransformKeys
| BuiltinMethod::TransformValues
| BuiltinMethod::FilterKeys
| BuiltinMethod::FilterValues,
..
} => true,
_ => false,
}
}
pub fn chain_op(&self) -> Option<ChainOp> {
match self {
Stage::CompiledMap(_) => Some(ChainOp::builtin(BuiltinMethod::Map)),
Stage::SortedDedup(_) => None,
_ => self.chain_demand_op(),
}
}
fn chain_demand_op(&self) -> Option<ChainOp> {
let desc = self.descriptor()?;
let method = desc.method?;
match self {
_ if desc.usize_arg.is_some() => Some(ChainOp::builtin_usize(method, desc.usize_arg?)),
Stage::Builtin(_) => Some(ChainOp::builtin(method)),
_ if participates_in_demand(BuiltinId::from_method(method)) => {
Some(ChainOp::builtin(method))
}
_ => None,
}
}
pub fn upstream_demand(&self, demand: SinkDemand) -> SinkDemand {
let chain = match self.chain_op() {
Some(op) => op.propagate_demand(demand.chain),
None => ChainDemand::RESULT,
};
let positional = if matches!(
self.shape().cardinality,
crate::chain_ir::Cardinality::OneToOne
) {
demand.positional
} else {
None
};
SinkDemand { chain, positional }
}
pub(crate) fn ordered_prefix_effect(
&self,
sort: &super::SortSpec,
sort_kernel: &BodyKernel,
kernel: &BodyKernel,
) -> bool {
match self.pipeline_order_effect() {
BuiltinPipelineOrderEffect::Preserves => true,
BuiltinPipelineOrderEffect::PredicatePrefix => {
super::plan::predicate_is_order_prefix(sort, sort_kernel, kernel)
}
BuiltinPipelineOrderEffect::Blocks => false,
}
}
fn pipeline_order_effect(&self) -> BuiltinPipelineOrderEffect {
match self {
Stage::CompiledMap(_) => BuiltinPipelineOrderEffect::Preserves,
Stage::SortedDedup(_) => BuiltinPipelineOrderEffect::Blocks,
_ => self
.descriptor()
.map(StageDescriptor::pipeline_order_effect)
.unwrap_or(BuiltinPipelineOrderEffect::Blocks),
}
}
pub fn shape(&self) -> StageShape {
use crate::chain_ir::Cardinality;
match self {
Stage::CompiledMap(_) => StageShape {
cardinality: Cardinality::OneToOne,
can_indexed: true,
cost: 10.0,
selectivity: 1.0,
},
Stage::SortedDedup(_) => StageShape {
cardinality: Cardinality::OneToOne,
can_indexed: true,
cost: 1.0,
selectivity: 1.0,
},
_ => self.descriptor().map_or(
StageShape {
cardinality: Cardinality::OneToOne,
can_indexed: false,
cost: 1.0,
selectivity: 1.0,
},
|desc| {
desc.view_stage()
.map(StageShape::from_view_stage)
.or_else(|| desc.method.map(StageShape::from_builtin))
.unwrap_or(StageShape {
cardinality: Cardinality::OneToOne,
can_indexed: false,
cost: 1.0,
selectivity: 1.0,
})
},
),
}
}
pub fn merge_with(&self, other: &Self) -> Option<Self> {
if let Some(merged) = self.merge_with_usize_stage(other) {
return Some(merged);
}
match (self, other) {
(Stage::Sort(_), Stage::Sort(b)) => Some(Stage::Sort(b.clone())),
(Stage::UniqueBy(_), Stage::UniqueBy(b)) => Some(Stage::UniqueBy(b.clone())),
(Stage::UniqueBy(None), Stage::Sort(super::SortSpec { key: None, .. }))
| (Stage::Sort(super::SortSpec { key: None, .. }), Stage::UniqueBy(None)) => {
Some(Stage::SortedDedup(None))
}
(Stage::UniqueBy(Some(a)), Stage::Sort(super::SortSpec { key: Some(b), .. }))
| (Stage::Sort(super::SortSpec { key: Some(a), .. }), Stage::UniqueBy(Some(b)))
if Arc::ptr_eq(a, b) =>
{
Some(Stage::SortedDedup(Some(a.clone())))
}
(Stage::Builtin(a), Stage::Builtin(b)) if a.method == b.method && a.is_idempotent() => {
Some(Stage::Builtin(a.clone()))
}
_ => None,
}
}
fn merge_with_usize_stage(&self, other: &Self) -> Option<Self> {
let lhs = self.usize_stage_merge_parts()?;
let rhs = other.usize_stage_merge_parts()?;
if lhs.stage != rhs.stage || lhs.merge != rhs.merge {
return None;
}
self.with_usize_stage_value(lhs.merge.combine_usize(lhs.value, rhs.value))
}
fn usize_stage_merge_parts(&self) -> Option<UsizeStageMergeParts> {
match self {
Stage::UsizeBuiltin { method, value } => Some(UsizeStageMergeParts {
value: *value,
stage: method.spec().view_stage?,
merge: method.spec().stage_merge?,
}),
_ => None,
}
}
fn with_usize_stage_value(&self, value: usize) -> Option<Self> {
match self {
Stage::UsizeBuiltin { method, .. } => Some(Stage::UsizeBuiltin {
method: *method,
value,
}),
_ => None,
}
}
pub fn cancels_with(&self, other: &Self) -> bool {
match (self.cancellation(), other.cancellation()) {
(Some(a), Some(b)) => a.cancels_with(b),
_ => false,
}
}
fn cancellation(&self) -> Option<crate::builtins::BuiltinCancellation> {
match self {
Stage::Reverse(cancel) => Some(*cancel),
Stage::Builtin(call) => call.spec().cancellation,
_ => None,
}
}
}
#[derive(Debug, Clone, Copy)]
struct UsizeStageMergeParts {
value: usize,
stage: BuiltinViewStage,
merge: crate::builtins::BuiltinStageMerge,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Strategy {
IndexedDispatch,
BarrierMaterialise,
EarlyExit,
PullLoop,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PhysicalExecPath {
Indexed,
Columnar,
Composed,
Legacy,
}
#[derive(Debug, Clone)]
pub struct Plan {
pub stages: Vec<Stage>,
pub stage_exprs: Vec<Option<Arc<Expr>>>,
pub sink: Sink,
}
pub(super) fn stages_can_run_with_materialized_receiver(stages: &[Stage]) -> bool {
stages
.iter()
.all(|stage| stage.can_run_with_receiver_only(program_is_current_only))
}
pub(super) fn program_is_current_only(program: &Program) -> bool {
program.ops.iter().all(opcode_is_current_only)
}
pub(super) fn opcode_is_current_only(opcode: &Opcode) -> bool {
match opcode {
Opcode::PushRoot | Opcode::RootChain(_) => false,
Opcode::PipelineRun { .. }
| Opcode::LetExpr { .. }
| Opcode::ListComp(_)
| Opcode::DictComp(_)
| Opcode::SetComp(_)
| Opcode::PatchEval(_) => false,
Opcode::DynIndex(prog)
| Opcode::InlineFilter(prog)
| Opcode::AndOp(prog)
| Opcode::OrOp(prog)
| Opcode::CoalesceOp(prog) => program_is_current_only(prog),
Opcode::CallMethod(call) | Opcode::CallOptMethod(call) => call
.sub_progs
.iter()
.all(|prog| program_is_current_only(prog)),
Opcode::IfElse { then_, else_ } => {
program_is_current_only(then_) && program_is_current_only(else_)
}
Opcode::TryExpr { body, default } => {
program_is_current_only(body) && program_is_current_only(default)
}
Opcode::MakeArr(items) => items
.iter()
.all(|(prog, _spread)| program_is_current_only(prog)),
Opcode::FString(parts) => parts.iter().all(|part| match part {
crate::vm::CompiledFSPart::Lit(_) => true,
crate::vm::CompiledFSPart::Interp { prog, .. } => program_is_current_only(prog),
}),
Opcode::MakeObj(entries) => entries.iter().all(obj_entry_is_current_only),
Opcode::PushNull
| Opcode::PushBool(_)
| Opcode::PushInt(_)
| Opcode::PushFloat(_)
| Opcode::PushStr(_)
| Opcode::PushCurrent
| Opcode::LoadIdent(_)
| Opcode::GetField(_)
| Opcode::GetIndex(_)
| Opcode::GetSlice(_, _)
| Opcode::OptField(_)
| Opcode::Descendant(_)
| Opcode::DescendAll
| Opcode::Quantifier(_)
| Opcode::FieldChain(_)
| Opcode::Add
| Opcode::Sub
| Opcode::Mul
| Opcode::Div
| Opcode::Mod
| Opcode::Eq
| Opcode::Neq
| Opcode::Lt
| Opcode::Lte
| Opcode::Gt
| Opcode::Gte
| Opcode::Fuzzy
| Opcode::Not
| Opcode::Neg
| Opcode::CastOp(_)
| Opcode::KindCheck { .. }
| Opcode::SetCurrent
| Opcode::DeleteMarkErr => true,
}
}
pub(super) fn obj_entry_is_current_only(entry: &CompiledObjEntry) -> bool {
match entry {
CompiledObjEntry::Short { .. } | CompiledObjEntry::KvPath { .. } => true,
CompiledObjEntry::Kv { prog, cond, .. } => {
program_is_current_only(prog)
&& cond
.as_ref()
.is_none_or(|cond| program_is_current_only(cond))
}
CompiledObjEntry::Dynamic { key, val } => {
program_is_current_only(key) && program_is_current_only(val)
}
CompiledObjEntry::Spread(prog) | CompiledObjEntry::SpreadDeep(prog) => {
program_is_current_only(prog)
}
}
}
impl PipelineBody {
pub(crate) fn can_run_with_materialized_receiver(&self) -> bool {
stages_can_run_with_materialized_receiver(&self.stages)
&& self
.sink
.can_run_with_receiver_only(program_is_current_only)
}
pub(crate) fn suffix_can_run_with_materialized_receiver(&self, consumed_stages: usize) -> bool {
consumed_stages <= self.stages.len()
&& stages_can_run_with_materialized_receiver(&self.stages[consumed_stages..])
&& self
.sink
.can_run_with_receiver_only(program_is_current_only)
}
}
impl Pipeline {
pub fn segment_source_demand(stages: &[Stage], sink: &Sink) -> SinkDemand {
stages
.iter()
.rev()
.fold(sink.demand(), |demand, stage| stage.upstream_demand(demand))
}
pub fn source_demand(&self) -> SinkDemand {
Self::segment_source_demand(&self.stages, &self.sink)
}
}