#![allow(dead_code)]
use crate::{
builtins::registry::{
propagate_demand as propagate_builtin_demand, BuiltinDemandArg, BuiltinId,
},
builtins::BuiltinMethod,
plan::demand::{Demand, DemandOperator},
};
#[cfg(test)]
use crate::plan::demand::{propagate_demands, source_demand, PullDemand, ValueNeed};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ValueKind {
Any,
Stream,
Scalar,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Cardinality {
OneToOne,
Filtering,
Expanding,
Bounded,
Reducing,
Barrier,
}
impl From<crate::builtins::BuiltinCardinality> for Cardinality {
fn from(value: crate::builtins::BuiltinCardinality) -> Self {
match value {
crate::builtins::BuiltinCardinality::OneToOne => Self::OneToOne,
crate::builtins::BuiltinCardinality::Filtering => Self::Filtering,
crate::builtins::BuiltinCardinality::Expanding => Self::Expanding,
crate::builtins::BuiltinCardinality::Bounded => Self::Bounded,
crate::builtins::BuiltinCardinality::Reducing => Self::Reducing,
crate::builtins::BuiltinCardinality::Barrier => Self::Barrier,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ChainOp {
Builtin {
id: BuiltinId,
demand_arg: BuiltinDemandArg,
},
Match {
role: MatchRole,
},
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum MatchRole {
Predicate,
Transform,
Multi,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct OpSpec {
pub input: ValueKind,
pub output: ValueKind,
pub cardinality: Cardinality,
pub preserves_order: bool,
}
impl ChainOp {
pub fn builtin(method: BuiltinMethod) -> Self {
Self::Builtin {
id: BuiltinId::from_method(method),
demand_arg: BuiltinDemandArg::None,
}
}
pub fn builtin_usize(method: BuiltinMethod, n: usize) -> Self {
Self::Builtin {
id: BuiltinId::from_method(method),
demand_arg: BuiltinDemandArg::Usize(n),
}
}
pub fn match_role(role: MatchRole) -> Self {
Self::Match { role }
}
pub fn spec(&self) -> OpSpec {
match self {
ChainOp::Match { role } => {
let cardinality = match role {
MatchRole::Predicate => Cardinality::Filtering,
MatchRole::Transform => Cardinality::OneToOne,
MatchRole::Multi => Cardinality::Expanding,
};
OpSpec {
input: ValueKind::Stream,
output: ValueKind::Stream,
cardinality,
preserves_order: true,
}
}
ChainOp::Builtin { id, .. } => {
use crate::builtins::BuiltinCategory as Cat;
let Some(method) = id.method() else {
return OpSpec {
input: ValueKind::Any,
output: ValueKind::Any,
cardinality: Cardinality::OneToOne,
preserves_order: true,
};
};
let spec = method.spec();
let input = match spec.category {
Cat::StreamingOneToOne
| Cat::StreamingFilter
| Cat::StreamingExpand
| Cat::Reducer
| Cat::Positional
| Cat::Barrier
| Cat::Relational => ValueKind::Stream,
_ => ValueKind::Any,
};
let output = match spec.category {
Cat::Reducer | Cat::Positional => ValueKind::Scalar,
Cat::StreamingOneToOne | Cat::StreamingFilter | Cat::StreamingExpand => {
ValueKind::Stream
}
_ => ValueKind::Any,
};
OpSpec {
input,
output,
cardinality: spec.cardinality.into(),
preserves_order: spec.view_native
|| !matches!(
spec.cardinality,
crate::builtins::BuiltinCardinality::Barrier
),
}
}
}
}
pub fn propagate_demand(&self, downstream: Demand) -> Demand {
<Self as DemandOperator>::propagate_demand(self, downstream)
}
}
impl DemandOperator for ChainOp {
fn propagate_demand(&self, downstream: Demand) -> Demand {
match self {
ChainOp::Match { role } => match role {
MatchRole::Predicate => Demand {
pull: match downstream.pull {
crate::plan::demand::PullDemand::FirstInput(n) => {
crate::plan::demand::PullDemand::UntilOutput(n)
}
crate::plan::demand::PullDemand::UntilOutput(n) => {
crate::plan::demand::PullDemand::UntilOutput(n)
}
other => other,
},
value: downstream.value,
order: downstream.order,
},
MatchRole::Transform => downstream,
MatchRole::Multi => Demand {
pull: match downstream.pull {
crate::plan::demand::PullDemand::FirstInput(_)
| crate::plan::demand::PullDemand::UntilOutput(_) => {
crate::plan::demand::PullDemand::All
}
other => other,
},
value: downstream.value,
order: downstream.order,
},
},
ChainOp::Builtin { id, demand_arg } => {
propagate_builtin_demand(*id, *demand_arg, downstream)
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
fn op(method: BuiltinMethod) -> ChainOp {
ChainOp::builtin(method)
}
fn op_usize(method: BuiltinMethod, n: usize) -> ChainOp {
ChainOp::builtin_usize(method, n)
}
fn match_op(role: MatchRole) -> ChainOp {
ChainOp::match_role(role)
}
#[test]
fn match_predicate_classifies_as_filter() {
let spec = match_op(MatchRole::Predicate).spec();
assert_eq!(spec.cardinality, Cardinality::Filtering);
assert_eq!(spec.input, ValueKind::Stream);
assert_eq!(spec.output, ValueKind::Stream);
assert!(spec.preserves_order);
}
#[test]
fn match_transform_classifies_as_map() {
let spec = match_op(MatchRole::Transform).spec();
assert_eq!(spec.cardinality, Cardinality::OneToOne);
}
#[test]
fn match_multi_classifies_as_flat_map() {
let spec = match_op(MatchRole::Multi).spec();
assert_eq!(spec.cardinality, Cardinality::Expanding);
}
#[test]
fn match_predicate_first_scans_until_one_output() {
let ops = [match_op(MatchRole::Predicate), op(BuiltinMethod::First)];
let demand = source_demand(&ops, Demand::RESULT);
assert_eq!(demand.pull, PullDemand::UntilOutput(1));
assert_eq!(demand.value, ValueNeed::Whole);
}
#[test]
fn match_transform_take_caps_upstream() {
let ops = [
match_op(MatchRole::Transform),
op_usize(BuiltinMethod::Take, 3),
];
let demand = source_demand(&ops, Demand::RESULT);
assert_eq!(demand.pull, PullDemand::FirstInput(3));
}
#[test]
fn match_predicate_take_widens_to_scan() {
let ops = [
match_op(MatchRole::Predicate),
op_usize(BuiltinMethod::Take, 3),
];
let demand = source_demand(&ops, Demand::RESULT);
assert_eq!(demand.pull, PullDemand::UntilOutput(3));
}
#[test]
fn filter_first_scans_until_one_output() {
let ops = [op(BuiltinMethod::Filter), op(BuiltinMethod::First)];
let demand = source_demand(&ops, Demand::RESULT);
assert_eq!(demand.pull, PullDemand::UntilOutput(1));
assert_eq!(demand.value, ValueNeed::Whole);
}
#[test]
fn filter_last_requests_reverse_until_output() {
let ops = [op(BuiltinMethod::Filter), op(BuiltinMethod::Last)];
let demand = source_demand(&ops, Demand::RESULT);
assert_eq!(demand.pull, PullDemand::LastInput(1));
assert_eq!(demand.value, ValueNeed::Whole);
assert!(demand.order);
}
#[test]
fn map_last_requests_last_input() {
let ops = [op(BuiltinMethod::Map), op(BuiltinMethod::Last)];
let demand = source_demand(&ops, Demand::RESULT);
assert_eq!(demand.pull, PullDemand::LastInput(1));
assert_eq!(demand.value, ValueNeed::Whole);
}
#[test]
fn map_nth_requests_nth_input() {
let ops = [op(BuiltinMethod::Map), op_usize(BuiltinMethod::Nth, 2)];
let demand = source_demand(&ops, Demand::RESULT);
assert_eq!(demand.pull, PullDemand::NthInput(2));
assert_eq!(demand.value, ValueNeed::Whole);
}
#[test]
fn filter_nth_falls_back_to_all_input() {
let ops = [op(BuiltinMethod::Filter), op_usize(BuiltinMethod::Nth, 2)];
let demand = source_demand(&ops, Demand::RESULT);
assert_eq!(demand.pull, PullDemand::All);
assert_eq!(demand.value, ValueNeed::Whole);
}
#[test]
fn take_filter_first_caps_upstream_to_take_bound() {
let ops = [
op(BuiltinMethod::Map),
op_usize(BuiltinMethod::Take, 3),
op(BuiltinMethod::Filter),
op(BuiltinMethod::First),
];
let steps = propagate_demands(&ops, Demand::RESULT);
assert_eq!(steps[0].upstream.pull, PullDemand::FirstInput(3));
assert_eq!(
source_demand(&ops, Demand::RESULT).pull,
PullDemand::FirstInput(3)
);
}
#[test]
fn filter_take_collect_scans_until_take_outputs() {
let ops = [op(BuiltinMethod::Filter), op_usize(BuiltinMethod::Take, 3)];
let demand = source_demand(&ops, Demand::RESULT);
assert_eq!(demand.pull, PullDemand::UntilOutput(3));
}
#[test]
fn compact_and_remove_are_filter_like() {
let ops = [op(BuiltinMethod::Compact), op(BuiltinMethod::First)];
let demand = source_demand(&ops, Demand::RESULT);
assert_eq!(demand.pull, PullDemand::UntilOutput(1));
assert_eq!(demand.value, ValueNeed::Whole);
let ops = [op(BuiltinMethod::Remove), op(BuiltinMethod::Last)];
let demand = source_demand(&ops, Demand::RESULT);
assert_eq!(demand.pull, PullDemand::LastInput(1));
assert_eq!(demand.value, ValueNeed::Whole);
}
#[test]
fn find_first_is_filter_like_before_first_sink() {
let ops = [op(BuiltinMethod::FindFirst)];
let demand = source_demand(&ops, Demand::first(ValueNeed::Whole));
assert_eq!(demand.pull, PullDemand::UntilOutput(1));
assert_eq!(demand.value, ValueNeed::Whole);
}
#[test]
fn take_while_take_collect_needs_only_input_prefix() {
let ops = [
op(BuiltinMethod::TakeWhile),
op_usize(BuiltinMethod::Take, 3),
];
let demand = source_demand(&ops, Demand::RESULT);
assert_eq!(demand.pull, PullDemand::FirstInput(3));
assert_eq!(demand.value, ValueNeed::Whole);
}
#[test]
fn count_does_not_need_whole_values() {
let ops = [op(BuiltinMethod::Map), op(BuiltinMethod::Count)];
let demand = source_demand(&ops, Demand::RESULT);
assert_eq!(demand.pull, PullDemand::All);
assert_eq!(demand.value, ValueNeed::Whole);
let ops = [op(BuiltinMethod::Count)];
let demand = source_demand(&ops, Demand::RESULT);
assert_eq!(demand.value, ValueNeed::None);
}
}