use crate::{
builtins::{
BuiltinDemandLaw, BuiltinMethod, BuiltinPipelineLowering, BuiltinPipelineMaterialization,
BuiltinPipelineOrderEffect, BuiltinPipelineShape, BuiltinSinkAccumulator,
BuiltinStructural,
},
chain_ir::{Demand, PullDemand, ValueNeed},
};
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub(crate) struct BuiltinId(pub(crate) u16);
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum BuiltinDemandArg {
None,
Usize(usize),
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum BuiltinPipelineArity {
Exact(usize),
Range { min: usize, max: usize },
}
impl BuiltinPipelineArity {
#[inline]
pub(crate) fn accepts(self, arity: usize) -> bool {
match self {
Self::Exact(n) => arity == n,
Self::Range { min, max } => (min..=max).contains(&arity),
}
}
}
#[inline]
pub(crate) fn propagate_demand(id: BuiltinId, arg: BuiltinDemandArg, downstream: Demand) -> Demand {
match demand_law(id) {
BuiltinDemandLaw::Identity => downstream,
BuiltinDemandLaw::FilterLike => Demand {
pull: match downstream.pull {
PullDemand::All => PullDemand::All,
PullDemand::FirstInput(n) | PullDemand::UntilOutput(n) => {
PullDemand::UntilOutput(n)
}
},
value: downstream.value.merge(ValueNeed::Predicate),
order: downstream.order,
},
BuiltinDemandLaw::TakeWhile => Demand {
pull: match downstream.pull {
PullDemand::All => PullDemand::All,
PullDemand::FirstInput(n) | PullDemand::UntilOutput(n) => PullDemand::FirstInput(n),
},
value: downstream.value.merge(ValueNeed::Predicate),
order: downstream.order,
},
BuiltinDemandLaw::UniqueLike => Demand {
pull: match downstream.pull {
PullDemand::All => PullDemand::All,
PullDemand::FirstInput(n) | PullDemand::UntilOutput(n) => {
PullDemand::UntilOutput(n)
}
},
value: downstream.value.merge(ValueNeed::Whole),
order: downstream.order,
},
BuiltinDemandLaw::MapLike => Demand {
value: downstream.value.merge(ValueNeed::Whole),
..downstream
},
BuiltinDemandLaw::FlatMapLike => Demand::all(ValueNeed::Whole),
BuiltinDemandLaw::Take => match arg {
BuiltinDemandArg::Usize(n) => Demand {
pull: downstream.pull.cap_inputs(n),
..downstream
},
BuiltinDemandArg::None => downstream,
},
BuiltinDemandLaw::Skip => match arg {
BuiltinDemandArg::Usize(n) => Demand {
pull: match downstream.pull {
PullDemand::FirstInput(m) => PullDemand::FirstInput(n.saturating_add(m)),
PullDemand::All | PullDemand::UntilOutput(_) => PullDemand::All,
},
..downstream
},
BuiltinDemandArg::None => downstream,
},
BuiltinDemandLaw::First => Demand::first(ValueNeed::Whole),
BuiltinDemandLaw::Last => Demand {
pull: PullDemand::All,
value: ValueNeed::Whole,
order: true,
},
BuiltinDemandLaw::Count => Demand {
pull: PullDemand::All,
value: ValueNeed::None,
order: false,
},
BuiltinDemandLaw::NumericReducer => Demand {
pull: PullDemand::All,
value: ValueNeed::Numeric,
order: false,
},
BuiltinDemandLaw::KeyedReducer => Demand {
pull: PullDemand::All,
value: ValueNeed::Predicate,
order: false,
},
BuiltinDemandLaw::OrderBarrier => Demand {
pull: PullDemand::All,
value: downstream.value.merge(ValueNeed::Whole),
order: true,
},
}
}
#[inline]
pub(crate) fn participates_in_demand(id: BuiltinId) -> bool {
demand_law(id) != BuiltinDemandLaw::Identity
}
#[inline]
pub(crate) fn pipeline_materialization(id: BuiltinId) -> BuiltinPipelineMaterialization {
id.method()
.map(|m| m.spec().materialization)
.unwrap_or(BuiltinPipelineMaterialization::Streaming)
}
#[inline]
pub(crate) fn pipeline_shape(id: BuiltinId) -> Option<BuiltinPipelineShape> {
id.method().map(|m| m.spec().pipeline_shape).flatten()
}
#[inline]
pub(crate) fn pipeline_order_effect(id: BuiltinId) -> Option<BuiltinPipelineOrderEffect> {
id.method().map(|m| m.spec().order_effect).flatten()
}
#[inline]
pub(crate) fn pipeline_lowering(id: BuiltinId) -> Option<BuiltinPipelineLowering> {
id.method().map(|m| m.spec().lowering).flatten()
}
#[inline]
pub(crate) fn pipeline_accepts_arity(id: BuiltinId, arity: usize, is_last: bool) -> bool {
pipeline_arity(id, is_last).is_some_and(|accepted| accepted.accepts(arity))
}
#[inline]
pub(crate) fn pipeline_arity(id: BuiltinId, is_last: bool) -> Option<BuiltinPipelineArity> {
let Some(method) = id.method() else {
return None;
};
match pipeline_lowering(id) {
Some(BuiltinPipelineLowering::ExprArg)
| Some(BuiltinPipelineLowering::TerminalExprArg { .. })
| Some(BuiltinPipelineLowering::UsizeArg { .. })
| Some(BuiltinPipelineLowering::StringArg) => Some(BuiltinPipelineArity::Exact(1)),
Some(BuiltinPipelineLowering::Nullary) => Some(BuiltinPipelineArity::Exact(0)),
Some(BuiltinPipelineLowering::StringPairArg) => Some(BuiltinPipelineArity::Exact(2)),
Some(BuiltinPipelineLowering::IntRangeArg) => {
Some(BuiltinPipelineArity::Range { min: 1, max: 2 })
}
Some(BuiltinPipelineLowering::Sort) => Some(BuiltinPipelineArity::Range { min: 0, max: 1 }),
Some(BuiltinPipelineLowering::TerminalSink) => {
is_last.then(|| terminal_sink_arity(method))?
}
None => is_last.then(|| terminal_sink_arity(method))?,
}
}
#[inline]
fn terminal_sink_arity(method: BuiltinMethod) -> Option<BuiltinPipelineArity> {
let Some(sink) = method.spec().sink else {
return None;
};
Some(match sink.accumulator {
BuiltinSinkAccumulator::Count => {
if method == BuiltinMethod::Count {
BuiltinPipelineArity::Range { min: 0, max: 1 }
} else {
BuiltinPipelineArity::Exact(0)
}
}
BuiltinSinkAccumulator::Numeric => BuiltinPipelineArity::Range { min: 0, max: 1 },
BuiltinSinkAccumulator::SelectOne(_) | BuiltinSinkAccumulator::ApproxDistinct => {
BuiltinPipelineArity::Exact(0)
}
})
}
#[inline]
pub(crate) fn pipeline_element(id: BuiltinId) -> bool {
id.method().map(|m| m.spec().is_element).unwrap_or(false)
}
#[inline]
pub(crate) fn structural(id: BuiltinId) -> Option<BuiltinStructural> {
id.method().map(|m| m.spec().structural).flatten()
}
#[inline]
fn demand_law(id: BuiltinId) -> BuiltinDemandLaw {
id.method()
.map(|m| m.spec().demand_law)
.unwrap_or(BuiltinDemandLaw::Identity)
}
impl BuiltinId {
#[inline]
pub(crate) fn from_method(method: BuiltinMethod) -> Self {
BuiltinId(method as u16)
}
#[inline]
pub(crate) fn method(self) -> Option<BuiltinMethod> {
method_from_id(self)
}
}
#[inline]
pub(crate) fn method_from_id(id: BuiltinId) -> Option<BuiltinMethod> {
macro_rules! check {
( $( $variant:ident ),* $(,)? ) => {
$(
if id.0 == BuiltinMethod::$variant as u16 {
return Some(BuiltinMethod::$variant);
}
)*
};
}
crate::for_each_builtin!(check);
None
}
#[inline]
pub(crate) fn by_name(name: &str) -> Option<BuiltinId> {
macro_rules! check {
( $( $variant:ident ),* $(,)? ) => {
$(
if name == <crate::builtins::defs::$variant as crate::builtins::builtin_def::Builtin>::NAME {
return Some(BuiltinId(BuiltinMethod::$variant as u16));
}
if <crate::builtins::defs::$variant as crate::builtins::builtin_def::Builtin>::ALIASES
.contains(&name)
{
return Some(BuiltinId(BuiltinMethod::$variant as u16));
}
)*
};
}
crate::for_each_builtin!(check);
None
}
#[cfg(test)]
pub(crate) fn all_method_entries() -> Vec<(BuiltinMethod, &'static str, &'static [&'static str])> {
macro_rules! collect {
( $( $variant:ident ),* $(,)? ) => {
vec![
$(
(BuiltinMethod::$variant,
<crate::builtins::defs::$variant as crate::builtins::builtin_def::Builtin>::NAME,
<crate::builtins::defs::$variant as crate::builtins::builtin_def::Builtin>::ALIASES),
)*
]
};
}
crate::for_each_builtin!(collect)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::builtins::{
BuiltinPipelineLowering, BuiltinPipelineMaterialization, BuiltinPipelineOrderEffect,
};
#[test]
fn registry_name_lookup_matches_legacy_lookup() {
for (method, canonical, aliases) in all_method_entries() {
assert_eq!(
by_name(canonical).and_then(BuiltinId::method),
Some(method)
);
for alias in aliases {
assert_eq!(
by_name(alias).and_then(BuiltinId::method),
Some(method)
);
}
}
assert_eq!(by_name("missing_builtin"), None);
}
#[test]
fn registry_does_not_accept_obsolete_camel_case_aliases() {
for name in [
"toString",
"flatMap",
"groupBy",
"sortBy",
"uniqueBy",
"transformKeys",
"getPath",
"isBlank",
"parseInt",
"startsWith",
"replaceAll",
] {
assert_eq!(by_name(name), None);
assert_eq!(BuiltinMethod::from_name(name), BuiltinMethod::Unknown);
}
assert_eq!(BuiltinMethod::from_name("group_by"), BuiltinMethod::GroupBy);
assert_eq!(BuiltinMethod::from_name("exists"), BuiltinMethod::Any);
assert_eq!(BuiltinMethod::from_name("distinct"), BuiltinMethod::Unique);
assert_eq!(BuiltinMethod::from_name("lstrip"), BuiltinMethod::TrimLeft);
}
#[test]
fn registry_propagates_core_streaming_demands() {
let filter = BuiltinId::from_method(BuiltinMethod::Filter);
let take = BuiltinId::from_method(BuiltinMethod::Take);
let count = BuiltinId::from_method(BuiltinMethod::Count);
let unique = BuiltinId::from_method(BuiltinMethod::Unique);
let count_by = BuiltinId::from_method(BuiltinMethod::CountBy);
let sort = BuiltinId::from_method(BuiltinMethod::Sort);
let demand = propagate_demand(take, BuiltinDemandArg::Usize(3), Demand::RESULT);
assert_eq!(demand.pull, PullDemand::FirstInput(3));
let demand = propagate_demand(filter, BuiltinDemandArg::None, demand);
assert_eq!(demand.pull, PullDemand::UntilOutput(3));
assert_eq!(demand.value, ValueNeed::Whole);
let demand = propagate_demand(count, BuiltinDemandArg::None, Demand::RESULT);
assert_eq!(demand.pull, PullDemand::All);
assert_eq!(demand.value, ValueNeed::None);
assert!(!demand.order);
let downstream = Demand {
pull: PullDemand::FirstInput(2),
value: ValueNeed::Whole,
order: true,
};
let demand = propagate_demand(unique, BuiltinDemandArg::None, downstream);
assert_eq!(demand.pull, PullDemand::UntilOutput(2));
assert_eq!(demand.value, ValueNeed::Whole);
assert!(demand.order);
let demand = propagate_demand(count_by, BuiltinDemandArg::None, Demand::RESULT);
assert_eq!(demand.pull, PullDemand::All);
assert_eq!(demand.value, ValueNeed::Predicate);
assert!(!demand.order);
let downstream = Demand {
pull: PullDemand::FirstInput(5),
value: ValueNeed::Predicate,
order: false,
};
let demand = propagate_demand(sort, BuiltinDemandArg::None, downstream);
assert_eq!(demand.pull, PullDemand::All);
assert_eq!(demand.value, ValueNeed::Whole);
assert!(demand.order);
}
#[test]
fn registry_drives_pipeline_execution_policy() {
assert_eq!(
pipeline_materialization(BuiltinId::from_method(BuiltinMethod::Sort)),
BuiltinPipelineMaterialization::ComposedBarrier
);
assert_eq!(
pipeline_materialization(BuiltinId::from_method(BuiltinMethod::Reverse)),
BuiltinPipelineMaterialization::ComposedBarrier
);
assert_eq!(
pipeline_materialization(BuiltinId::from_method(BuiltinMethod::Split)),
BuiltinPipelineMaterialization::LegacyMaterialized
);
assert_eq!(
pipeline_materialization(BuiltinId::from_method(BuiltinMethod::TakeWhile)),
BuiltinPipelineMaterialization::Streaming
);
assert_eq!(
pipeline_shape(BuiltinId::from_method(BuiltinMethod::Split))
.unwrap()
.can_indexed,
true
);
assert_eq!(
pipeline_shape(BuiltinId::from_method(BuiltinMethod::Chunk))
.unwrap()
.cost,
2.0
);
assert_eq!(
pipeline_order_effect(BuiltinId::from_method(BuiltinMethod::Filter)),
Some(BuiltinPipelineOrderEffect::PredicatePrefix)
);
assert_eq!(
pipeline_order_effect(BuiltinId::from_method(BuiltinMethod::Replace)),
Some(BuiltinPipelineOrderEffect::Preserves)
);
}
#[test]
fn registry_drives_pipeline_lowering() {
assert_eq!(
pipeline_lowering(BuiltinId::from_method(BuiltinMethod::Filter)),
Some(BuiltinPipelineLowering::ExprArg)
);
assert_eq!(
pipeline_lowering(BuiltinId::from_method(BuiltinMethod::Map)),
Some(BuiltinPipelineLowering::ExprArg)
);
assert_eq!(
pipeline_lowering(BuiltinId::from_method(BuiltinMethod::FindOne)),
Some(BuiltinPipelineLowering::TerminalExprArg {
terminal: BuiltinMethod::First,
})
);
assert_eq!(
pipeline_lowering(BuiltinId::from_method(BuiltinMethod::Take)),
Some(BuiltinPipelineLowering::UsizeArg { min: 0 })
);
assert_eq!(
pipeline_lowering(BuiltinId::from_method(BuiltinMethod::Sort)),
Some(BuiltinPipelineLowering::Sort)
);
assert_eq!(
pipeline_lowering(BuiltinId::from_method(BuiltinMethod::Reverse)),
Some(BuiltinPipelineLowering::Nullary)
);
assert_eq!(
pipeline_lowering(BuiltinId::from_method(BuiltinMethod::Split)),
Some(BuiltinPipelineLowering::StringArg)
);
assert_eq!(
pipeline_lowering(BuiltinId::from_method(BuiltinMethod::ReplaceAll)),
Some(BuiltinPipelineLowering::StringPairArg)
);
assert_eq!(
pipeline_lowering(BuiltinId::from_method(BuiltinMethod::Slice)),
Some(BuiltinPipelineLowering::IntRangeArg)
);
assert_eq!(
pipeline_lowering(BuiltinId::from_method(BuiltinMethod::Count)),
Some(BuiltinPipelineLowering::TerminalSink)
);
}
#[test]
fn registry_classifies_pipeline_arity_without_method_special_cases() {
assert_eq!(
pipeline_arity(BuiltinId::from_method(BuiltinMethod::Filter), false),
Some(BuiltinPipelineArity::Exact(1))
);
assert!(pipeline_accepts_arity(
BuiltinId::from_method(BuiltinMethod::Filter),
1,
false
));
assert!(!pipeline_accepts_arity(
BuiltinId::from_method(BuiltinMethod::Filter),
0,
false
));
assert!(pipeline_accepts_arity(
BuiltinId::from_method(BuiltinMethod::Sort),
0,
false
));
assert_eq!(
pipeline_arity(BuiltinId::from_method(BuiltinMethod::Sort), false),
Some(BuiltinPipelineArity::Range { min: 0, max: 1 })
);
assert!(pipeline_accepts_arity(
BuiltinId::from_method(BuiltinMethod::Sort),
1,
false
));
assert!(!pipeline_accepts_arity(
BuiltinId::from_method(BuiltinMethod::Sort),
2,
false
));
assert!(pipeline_accepts_arity(
BuiltinId::from_method(BuiltinMethod::Slice),
2,
false
));
assert!(!pipeline_accepts_arity(
BuiltinId::from_method(BuiltinMethod::Count),
1,
false
));
assert_eq!(
pipeline_arity(BuiltinId::from_method(BuiltinMethod::Count), false),
None
);
assert_eq!(
pipeline_arity(BuiltinId::from_method(BuiltinMethod::Count), true),
Some(BuiltinPipelineArity::Range { min: 0, max: 1 })
);
assert!(pipeline_accepts_arity(
BuiltinId::from_method(BuiltinMethod::Count),
1,
true
));
assert!(pipeline_accepts_arity(
BuiltinId::from_method(BuiltinMethod::Sum),
1,
true
));
assert!(!pipeline_accepts_arity(
BuiltinId::from_method(BuiltinMethod::First),
1,
true
));
}
#[test]
fn registry_drives_pipeline_element_classification() {
for method in [
BuiltinMethod::Upper,
BuiltinMethod::StripPrefix,
BuiltinMethod::IsNumeric,
BuiltinMethod::Abs,
BuiltinMethod::ParseInt,
BuiltinMethod::Has,
BuiltinMethod::Lines,
BuiltinMethod::GetPath,
] {
assert!(pipeline_element(BuiltinId::from_method(method)));
}
for method in [
BuiltinMethod::Len,
BuiltinMethod::FromJson,
BuiltinMethod::Sort,
BuiltinMethod::Flatten,
] {
assert!(!pipeline_element(BuiltinId::from_method(method)));
}
}
#[test]
fn registry_drives_structural_lowering() {
assert_eq!(
structural(BuiltinId::from_method(BuiltinMethod::DeepFind)),
Some(BuiltinStructural::DeepFind)
);
assert_eq!(
structural(BuiltinId::from_method(BuiltinMethod::DeepShape)),
Some(BuiltinStructural::DeepShape)
);
assert_eq!(
structural(BuiltinId::from_method(BuiltinMethod::DeepLike)),
Some(BuiltinStructural::DeepLike)
);
assert_eq!(
structural(BuiltinId::from_method(BuiltinMethod::Walk)),
None
);
assert_eq!(
structural(BuiltinId::from_method(BuiltinMethod::Filter)),
None
);
}
#[test]
fn unknown_builtin_demand_is_identity() {
let downstream = Demand {
pull: PullDemand::FirstInput(7),
value: ValueNeed::Predicate,
order: false,
};
assert_eq!(
propagate_demand(
BuiltinId::from_method(BuiltinMethod::Unknown),
BuiltinDemandArg::None,
downstream
),
downstream
);
}
#[test]
fn descriptor_round_trips_method_identity() {
for (method, _, _) in all_method_entries() {
let id = BuiltinId::from_method(method);
assert_eq!(id.method(), Some(method));
let _ = method.spec();
}
}
}