use crate::builtins::{
BuiltinKeyedReducer, BuiltinSinkAccumulator, BuiltinSinkSpec, BuiltinViewInputMode,
BuiltinViewOutputMode, BuiltinViewStage,
};
use crate::data::value::Val;
use crate::plan::demand::{FieldDemand, PullDemand};
use crate::vm::Program;
use super::{MembershipSinkOp, MembershipSinkTarget, PipelineBody, PredicateSinkOp, Stage};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) struct SourceCapabilities {
pub forward_stream: bool,
pub reverse_stream: bool,
pub indexed_array_child: bool,
pub tape_view: bool,
pub field_key_read: bool,
pub subtree_skip: bool,
pub selected_row_materialization: bool,
pub materialized_fallback: bool,
}
impl SourceCapabilities {
pub(crate) const VIEW_ARRAY: Self = Self {
forward_stream: true,
reverse_stream: true,
indexed_array_child: true,
tape_view: true,
field_key_read: true,
subtree_skip: true,
selected_row_materialization: true,
materialized_fallback: true,
};
pub(crate) const MATERIALIZED_ARRAY: Self = Self {
forward_stream: true,
reverse_stream: true,
indexed_array_child: true,
tape_view: false,
field_key_read: true,
subtree_skip: false,
selected_row_materialization: true,
materialized_fallback: true,
};
pub(crate) fn choose_access(self, demand: PullDemand) -> SourceAccessMode {
match demand {
PullDemand::NthInput(idx) if self.indexed_array_child => SourceAccessMode::Indexed(idx),
PullDemand::LastInput(1) if self.indexed_array_child => {
SourceAccessMode::IndexedFromEnd(0)
}
PullDemand::FirstInput(1) if self.indexed_array_child => SourceAccessMode::Indexed(0),
PullDemand::LastInput(n) if self.reverse_stream => {
SourceAccessMode::Reverse { outputs: n }
}
PullDemand::FirstInput(n) if self.forward_stream => SourceAccessMode::ForwardBounded(n),
_ if self.forward_stream => SourceAccessMode::Forward,
_ => SourceAccessMode::MaterializedFallback,
}
}
pub(crate) fn choose_view_access(
self,
demand: PullDemand,
stages: &[ViewStageCapability],
) -> SourceAccessMode {
let access = self.choose_access(demand);
if matches!(access, SourceAccessMode::IndexedFromEnd(_))
&& !ViewStageCapability::all_preserve_cardinality(stages)
{
if self.reverse_stream {
return SourceAccessMode::Reverse { outputs: 1 };
}
if self.forward_stream {
return SourceAccessMode::Forward;
}
}
if matches!(access, SourceAccessMode::Indexed(_))
&& !ViewStageCapability::all_preserve_cardinality(stages)
{
if self.forward_stream {
return SourceAccessMode::Forward;
}
}
access
}
pub(crate) fn supports_payload_lanes(
self,
scan_need: &FieldDemand,
result_need: &FieldDemand,
) -> bool {
payload_lane_supported(scan_need, self.field_key_read, self.subtree_skip)
&& payload_lane_supported(
result_need,
self.field_key_read,
self.selected_row_materialization,
)
}
pub(crate) fn supports_selected_materialization(self, demand: PullDemand) -> bool {
self.selected_row_materialization
&& matches!(
demand,
PullDemand::FirstInput(_)
| PullDemand::LastInput(_)
| PullDemand::NthInput(_)
| PullDemand::UntilOutput(_)
)
}
}
fn payload_lane_supported(need: &FieldDemand, field_key_read: bool, whole_value_ok: bool) -> bool {
match need {
FieldDemand::None => true,
FieldDemand::Fields(_) => field_key_read,
FieldDemand::Whole => whole_value_ok,
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum SourceAccessMode {
Forward,
ForwardBounded(usize),
Reverse {
outputs: usize,
},
Indexed(usize),
IndexedFromEnd(usize),
MaterializedFallback,
}
#[cfg(test)]
mod source_capability_tests {
use super::{SourceAccessMode, SourceCapabilities, ViewStageCapability};
use crate::data::value::Val;
use crate::plan::demand::{FieldDemand, FieldSet, PullDemand};
use std::sync::Arc;
#[test]
fn indexed_sources_choose_direct_positional_access() {
assert_eq!(
SourceCapabilities::MATERIALIZED_ARRAY.choose_access(PullDemand::NthInput(3)),
SourceAccessMode::Indexed(3)
);
assert_eq!(
SourceCapabilities::MATERIALIZED_ARRAY.choose_access(PullDemand::LastInput(2)),
SourceAccessMode::Reverse { outputs: 2 }
);
assert_eq!(
SourceCapabilities::MATERIALIZED_ARRAY.choose_access(PullDemand::LastInput(1)),
SourceAccessMode::IndexedFromEnd(0)
);
assert_eq!(
SourceCapabilities::MATERIALIZED_ARRAY.choose_access(PullDemand::FirstInput(4)),
SourceAccessMode::ForwardBounded(4)
);
assert_eq!(
SourceCapabilities::MATERIALIZED_ARRAY.choose_access(PullDemand::FirstInput(1)),
SourceAccessMode::Indexed(0)
);
}
#[test]
fn view_array_sources_advertise_tape_backed_access() {
let caps = SourceCapabilities::VIEW_ARRAY;
assert!(caps.tape_view);
assert!(caps.forward_stream);
assert!(caps.reverse_stream);
assert!(caps.indexed_array_child);
assert!(caps.materialized_fallback);
assert!(caps.field_key_read);
assert!(caps.subtree_skip);
assert!(caps.selected_row_materialization);
assert_eq!(
caps.choose_access(PullDemand::NthInput(2)),
SourceAccessMode::Indexed(2)
);
assert_eq!(
caps.choose_access(PullDemand::LastInput(1)),
SourceAccessMode::IndexedFromEnd(0)
);
}
#[test]
fn selective_view_prefix_demotes_indexed_last_to_reverse_scan() {
let access = SourceCapabilities::VIEW_ARRAY.choose_view_access(
PullDemand::LastInput(1),
&[ViewStageCapability::Filter { kernel: 0 }],
);
assert_eq!(access, SourceAccessMode::Reverse { outputs: 1 });
}
#[test]
fn selective_view_prefix_demotes_indexed_last_to_forward_without_reverse() {
let caps = SourceCapabilities {
reverse_stream: false,
..SourceCapabilities::VIEW_ARRAY
};
let access = caps.choose_view_access(
PullDemand::LastInput(1),
&[ViewStageCapability::RemoveValue(Val::Int(2))],
);
assert_eq!(access, SourceAccessMode::Forward);
}
#[test]
fn cardinality_preserving_view_prefix_keeps_indexed_last_seek() {
let access = SourceCapabilities::VIEW_ARRAY.choose_view_access(
PullDemand::LastInput(1),
&[ViewStageCapability::Map { kernel: 0 }],
);
assert_eq!(access, SourceAccessMode::IndexedFromEnd(0));
}
#[test]
fn non_seekable_sources_fall_back_to_forward_streaming() {
let forward_only = SourceCapabilities {
forward_stream: true,
reverse_stream: false,
indexed_array_child: false,
tape_view: false,
field_key_read: false,
subtree_skip: false,
selected_row_materialization: false,
materialized_fallback: true,
};
assert_eq!(
forward_only.choose_access(PullDemand::NthInput(3)),
SourceAccessMode::Forward
);
assert_eq!(
forward_only.choose_access(PullDemand::LastInput(1)),
SourceAccessMode::Forward
);
assert_eq!(
forward_only.choose_access(PullDemand::FirstInput(2)),
SourceAccessMode::ForwardBounded(2)
);
}
#[test]
fn indexed_without_reverse_still_scans_forward_for_last_demand() {
let indexed_forward = SourceCapabilities {
forward_stream: true,
reverse_stream: false,
indexed_array_child: true,
tape_view: false,
field_key_read: true,
subtree_skip: false,
selected_row_materialization: true,
materialized_fallback: true,
};
assert_eq!(
indexed_forward.choose_access(PullDemand::NthInput(5)),
SourceAccessMode::Indexed(5)
);
assert_eq!(
indexed_forward.choose_access(PullDemand::LastInput(1)),
SourceAccessMode::IndexedFromEnd(0)
);
}
#[test]
fn indexed_only_sources_seek_single_positional_demands_and_materialize_ranges() {
let indexed_only = SourceCapabilities {
forward_stream: false,
reverse_stream: false,
indexed_array_child: true,
tape_view: true,
field_key_read: true,
subtree_skip: true,
selected_row_materialization: true,
materialized_fallback: true,
};
assert_eq!(
indexed_only.choose_access(PullDemand::NthInput(7)),
SourceAccessMode::Indexed(7)
);
assert_eq!(
indexed_only.choose_access(PullDemand::FirstInput(1)),
SourceAccessMode::Indexed(0)
);
assert_eq!(
indexed_only.choose_access(PullDemand::FirstInput(2)),
SourceAccessMode::MaterializedFallback
);
assert_eq!(
indexed_only.choose_access(PullDemand::LastInput(1)),
SourceAccessMode::IndexedFromEnd(0)
);
}
#[test]
fn non_streaming_sources_request_materialized_fallback() {
let fallback_only = SourceCapabilities {
forward_stream: false,
reverse_stream: false,
indexed_array_child: false,
tape_view: false,
field_key_read: false,
subtree_skip: false,
selected_row_materialization: false,
materialized_fallback: true,
};
assert_eq!(
fallback_only.choose_access(PullDemand::All),
SourceAccessMode::MaterializedFallback
);
}
#[test]
fn payload_lanes_require_matching_source_capabilities() {
let fields = FieldDemand::Fields(FieldSet::single(Arc::from("price")));
assert!(SourceCapabilities::VIEW_ARRAY.supports_payload_lanes(&fields, &fields));
assert!(SourceCapabilities::MATERIALIZED_ARRAY
.supports_payload_lanes(&fields, &FieldDemand::Whole));
assert!(!SourceCapabilities::MATERIALIZED_ARRAY
.supports_payload_lanes(&FieldDemand::Whole, &fields));
}
#[test]
fn selected_materialization_requires_bounded_demand() {
assert!(SourceCapabilities::VIEW_ARRAY
.supports_selected_materialization(PullDemand::LastInput(1)));
assert!(SourceCapabilities::MATERIALIZED_ARRAY
.supports_selected_materialization(PullDemand::UntilOutput(3)));
assert!(!SourceCapabilities::VIEW_ARRAY.supports_selected_materialization(PullDemand::All));
let no_selected = SourceCapabilities {
selected_row_materialization: false,
..SourceCapabilities::MATERIALIZED_ARRAY
};
assert!(!no_selected.supports_selected_materialization(PullDemand::FirstInput(1)));
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum ViewInputMode {
ReadsView,
SkipsViewRead,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum ViewOutputMode {
PreservesInputView,
BorrowedSubview,
BorrowedSubviews,
EmitsOwnedValue,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum ViewMaterialization {
Never,
StageFinalValue,
SinkOutputRows,
SinkFinalRow,
SinkNumericInput,
SinkInputRows,
}
#[derive(Debug, Clone)]
pub(crate) struct ViewPipelineCapabilities {
pub stages: Vec<ViewStageCapability>,
pub sink: ViewSinkCapability,
}
#[derive(Debug, Clone)]
pub(crate) struct ViewPrefixCapabilities {
pub stages: Vec<ViewStageCapability>,
pub consumed_stages: usize,
}
#[derive(Debug, Clone)]
pub(crate) enum ViewStageCapability {
Filter {
kernel: usize,
},
Compact,
RemoveValue(Val),
Map {
kernel: usize,
},
FlatMap {
kernel: usize,
},
TakeWhile {
kernel: usize,
},
DropWhile {
kernel: usize,
},
Distinct {
kernel: Option<usize>,
},
KeyedReduce {
kind: BuiltinKeyedReducer,
kernel: usize,
},
Take(usize),
Skip(usize),
}
impl ViewStageCapability {
pub(crate) fn from_stage_metadata(
stage: BuiltinViewStage,
usize_arg: Option<usize>,
kernel_index: usize,
kernel_is_view_native: bool,
) -> Option<Self> {
match stage {
BuiltinViewStage::Filter if kernel_is_view_native => Some(Self::Filter {
kernel: kernel_index,
}),
BuiltinViewStage::Compact => Some(Self::Compact),
BuiltinViewStage::Map if kernel_is_view_native => Some(Self::Map {
kernel: kernel_index,
}),
BuiltinViewStage::FlatMap if kernel_is_view_native => Some(Self::FlatMap {
kernel: kernel_index,
}),
BuiltinViewStage::TakeWhile if kernel_is_view_native => Some(Self::TakeWhile {
kernel: kernel_index,
}),
BuiltinViewStage::DropWhile if kernel_is_view_native => Some(Self::DropWhile {
kernel: kernel_index,
}),
BuiltinViewStage::Take => Some(Self::Take(usize_arg?)),
BuiltinViewStage::Skip => Some(Self::Skip(usize_arg?)),
_ => None,
}
}
pub(crate) fn view_stage(&self) -> BuiltinViewStage {
match self {
Self::Filter { .. } => BuiltinViewStage::Filter,
Self::Compact => BuiltinViewStage::Compact,
Self::RemoveValue(_) => BuiltinViewStage::RemoveValue,
Self::Map { .. } => BuiltinViewStage::Map,
Self::FlatMap { .. } => BuiltinViewStage::FlatMap,
Self::TakeWhile { .. } => BuiltinViewStage::TakeWhile,
Self::DropWhile { .. } => BuiltinViewStage::DropWhile,
Self::Distinct { .. } => BuiltinViewStage::Distinct,
Self::KeyedReduce { .. } => BuiltinViewStage::KeyedReduce,
Self::Take(_) => BuiltinViewStage::Take,
Self::Skip(_) => BuiltinViewStage::Skip,
}
}
pub(crate) fn input_mode(&self) -> ViewInputMode {
view_input_mode(self.view_stage().input_mode())
}
pub(crate) fn output_mode(&self) -> ViewOutputMode {
view_output_mode(self.view_stage().output_mode())
}
pub(crate) fn materialization(&self) -> ViewMaterialization {
if matches!(self, Self::KeyedReduce { .. }) {
return ViewMaterialization::StageFinalValue;
}
ViewMaterialization::Never
}
pub(crate) fn preserves_cardinality(&self) -> bool {
matches!(self, Self::Map { .. })
}
pub(crate) fn all_preserve_cardinality(stages: &[Self]) -> bool {
stages.iter().all(Self::preserves_cardinality)
}
}
#[derive(Debug, Clone)]
pub(crate) enum ViewSinkCapability {
Collect,
Builtin {
accumulator: BuiltinSinkAccumulator,
predicate_kernel: Option<usize>,
project_kernel: Option<usize>,
materialization: ViewMaterialization,
},
Nth {
index: usize,
},
Predicate {
op: PredicateSinkOp,
predicate_kernel: usize,
},
Membership {
op: MembershipSinkOp,
target: ViewMembershipTarget,
},
ArgExtreme {
want_max: bool,
key_kernel: usize,
},
SelectMany {
n: usize,
from_end: bool,
source_reversed: bool,
},
}
impl ViewSinkCapability {
pub(crate) fn from_sink_spec(
spec: BuiltinSinkSpec,
predicate_kernel: Option<usize>,
project_kernel: Option<usize>,
) -> Self {
Self::Builtin {
accumulator: spec.accumulator,
predicate_kernel,
project_kernel,
materialization: sink_materialization(spec),
}
}
pub(crate) fn materialization(&self) -> ViewMaterialization {
match self {
Self::Collect => ViewMaterialization::SinkOutputRows,
Self::Builtin {
materialization, ..
} => *materialization,
Self::Nth { .. } => ViewMaterialization::SinkFinalRow,
Self::Predicate { op, .. } => {
if *op == PredicateSinkOp::FindOne {
ViewMaterialization::SinkFinalRow
} else {
ViewMaterialization::Never
}
}
Self::Membership { target, .. } => {
if target.is_scalar_literal() {
ViewMaterialization::Never
} else {
ViewMaterialization::SinkInputRows
}
}
Self::ArgExtreme { .. } => ViewMaterialization::SinkFinalRow,
Self::SelectMany { .. } => ViewMaterialization::SinkOutputRows,
}
}
}
#[derive(Debug, Clone)]
pub(crate) enum ViewMembershipTarget {
Literal(Val),
Program(std::sync::Arc<Program>),
}
impl ViewMembershipTarget {
fn is_scalar_literal(&self) -> bool {
match self {
Self::Literal(value) => target_is_scalar(value),
Self::Program(_) => false,
}
}
}
impl From<&MembershipSinkTarget> for ViewMembershipTarget {
fn from(target: &MembershipSinkTarget) -> Self {
match target {
MembershipSinkTarget::Literal(value) => Self::Literal(value.clone()),
MembershipSinkTarget::Program(program) => Self::Program(std::sync::Arc::clone(program)),
}
}
}
fn target_is_scalar(value: &Val) -> bool {
matches!(
value,
Val::Null | Val::Bool(_) | Val::Int(_) | Val::Float(_) | Val::Str(_) | Val::StrSlice(_)
)
}
fn sink_materialization(spec: BuiltinSinkSpec) -> ViewMaterialization {
match spec.accumulator {
BuiltinSinkAccumulator::Count | BuiltinSinkAccumulator::ApproxDistinct => {
ViewMaterialization::Never
}
BuiltinSinkAccumulator::Numeric => ViewMaterialization::SinkNumericInput,
BuiltinSinkAccumulator::SelectOne(_) => ViewMaterialization::SinkFinalRow,
}
}
fn view_input_mode(mode: BuiltinViewInputMode) -> ViewInputMode {
match mode {
BuiltinViewInputMode::ReadsView => ViewInputMode::ReadsView,
BuiltinViewInputMode::SkipsViewRead => ViewInputMode::SkipsViewRead,
}
}
fn view_output_mode(mode: BuiltinViewOutputMode) -> ViewOutputMode {
match mode {
BuiltinViewOutputMode::PreservesInputView => ViewOutputMode::PreservesInputView,
BuiltinViewOutputMode::BorrowedSubview => ViewOutputMode::BorrowedSubview,
BuiltinViewOutputMode::BorrowedSubviews => ViewOutputMode::BorrowedSubviews,
BuiltinViewOutputMode::EmitsOwnedValue => ViewOutputMode::EmitsOwnedValue,
}
}
pub(crate) fn view_capabilities(body: &PipelineBody) -> Option<ViewPipelineCapabilities> {
Some(ViewPipelineCapabilities {
stages: view_stage_capabilities(body)?,
sink: view_sink_capability(body)?,
})
}
pub(crate) fn view_prefix_capabilities(body: &PipelineBody) -> Option<ViewPrefixCapabilities> {
let mut stages = Vec::new();
for (idx, stage) in body.stages.iter().enumerate() {
let Some(capability) = view_stage_capability(body, idx, stage) else {
break;
};
if !matches!(capability.materialization(), ViewMaterialization::Never) {
break;
}
stages.push(capability);
}
if stages.is_empty() {
return None;
}
Some(ViewPrefixCapabilities {
consumed_stages: stages.len(),
stages,
})
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use crate::builtins::{
BuiltinMethod, BuiltinSelectionPosition, BuiltinSinkAccumulator, BuiltinViewStage,
};
use crate::data::value::Val;
use crate::exec::pipeline::{
ArgExtremeSinkSpec, BodyKernel, MembershipSinkOp, MembershipSinkSpec, MembershipSinkTarget,
NumOp, PipelineBody, PredicateSinkOp, PredicateSinkSpec, ReducerOp, ReducerSpec, Sink,
Stage, ViewInputMode, ViewMaterialization, ViewMembershipTarget, ViewOutputMode,
ViewSinkCapability, ViewStageCapability,
};
use crate::parse::ast::BinOp;
use super::{view_capabilities, view_prefix_capabilities};
#[test]
fn view_stage_metadata_describes_borrowing_and_materialization() {
let filter = ViewStageCapability::Filter { kernel: 0 };
assert_eq!(filter.input_mode(), ViewInputMode::ReadsView);
assert_eq!(filter.output_mode(), ViewOutputMode::PreservesInputView);
assert_eq!(filter.materialization(), ViewMaterialization::Never);
let map = ViewStageCapability::Map { kernel: 0 };
assert_eq!(map.input_mode(), ViewInputMode::ReadsView);
assert_eq!(map.output_mode(), ViewOutputMode::BorrowedSubview);
assert_eq!(map.materialization(), ViewMaterialization::Never);
let flat_map = ViewStageCapability::FlatMap { kernel: 0 };
assert_eq!(flat_map.input_mode(), ViewInputMode::ReadsView);
assert_eq!(flat_map.output_mode(), ViewOutputMode::BorrowedSubviews);
assert_eq!(flat_map.materialization(), ViewMaterialization::Never);
assert!(!flat_map.preserves_cardinality());
let remove = ViewStageCapability::RemoveValue(Val::Int(2));
assert_eq!(remove.input_mode(), ViewInputMode::ReadsView);
assert_eq!(remove.output_mode(), ViewOutputMode::PreservesInputView);
assert_eq!(remove.materialization(), ViewMaterialization::Never);
assert!(!remove.preserves_cardinality());
let take = ViewStageCapability::Take(2);
assert_eq!(take.input_mode(), ViewInputMode::SkipsViewRead);
assert_eq!(take.output_mode(), ViewOutputMode::PreservesInputView);
assert_eq!(take.materialization(), ViewMaterialization::Never);
assert!(!take.preserves_cardinality());
assert!(map.preserves_cardinality());
assert!(!filter.preserves_cardinality());
assert!(!ViewStageCapability::Compact.preserves_cardinality());
}
#[test]
fn stage_view_capability_comes_from_stage_metadata() {
let prog = Arc::new(crate::vm::Program::new(Vec::new(), ""));
let filter = Stage::Filter(prog.clone(), BuiltinViewStage::Filter)
.view_capability(4, Some(&BodyKernel::FieldRead(Arc::<str>::from("score"))))
.unwrap();
let map = Stage::Map(prog, BuiltinViewStage::Map)
.view_capability(5, Some(&BodyKernel::FieldRead(Arc::<str>::from("name"))))
.unwrap();
let flat_map = Stage::FlatMap(
Arc::new(crate::vm::Program::new(Vec::new(), "")),
BuiltinViewStage::FlatMap,
)
.view_capability(6, Some(&BodyKernel::FieldRead(Arc::<str>::from("items"))))
.unwrap();
let take = Stage::UsizeBuiltin {
method: BuiltinMethod::Take,
value: 2,
}
.view_capability(7, None)
.unwrap();
let skip = Stage::UsizeBuiltin {
method: BuiltinMethod::Skip,
value: 1,
}
.view_capability(8, None)
.unwrap();
let compact = Stage::Builtin(crate::builtins::BuiltinCall::new(
BuiltinMethod::Compact,
crate::builtins::BuiltinArgs::None,
))
.view_capability(9, None)
.unwrap();
let remove = Stage::Builtin(crate::builtins::BuiltinCall::new(
BuiltinMethod::Remove,
crate::builtins::BuiltinArgs::Val(Val::Int(2)),
))
.view_capability(10, None)
.unwrap();
assert!(matches!(filter, ViewStageCapability::Filter { kernel: 4 }));
assert_eq!(map.output_mode(), ViewOutputMode::BorrowedSubview);
assert_eq!(flat_map.output_mode(), ViewOutputMode::BorrowedSubviews);
assert!(matches!(take, ViewStageCapability::Take(2)));
assert!(matches!(skip, ViewStageCapability::Skip(1)));
assert!(matches!(compact, ViewStageCapability::Compact));
assert!(matches!(remove, ViewStageCapability::RemoveValue(Val::Int(2))));
let cancel = crate::builtins::BuiltinMethod::Reverse
.spec()
.cancellation
.unwrap();
assert!(Stage::Reverse(cancel).view_capability(9, None).is_none());
}
#[test]
fn view_sink_metadata_describes_materialization_policy() {
assert_eq!(
ViewSinkCapability::Collect.materialization(),
ViewMaterialization::SinkOutputRows
);
assert_eq!(
ViewSinkCapability::Builtin {
accumulator: BuiltinSinkAccumulator::Count,
predicate_kernel: None,
project_kernel: None,
materialization: ViewMaterialization::Never,
}
.materialization(),
ViewMaterialization::Never
);
assert_eq!(
ViewSinkCapability::Builtin {
accumulator: BuiltinSinkAccumulator::Numeric,
predicate_kernel: None,
project_kernel: Some(0),
materialization: ViewMaterialization::SinkNumericInput,
}
.materialization(),
ViewMaterialization::SinkNumericInput
);
assert_eq!(
ViewSinkCapability::Builtin {
accumulator: BuiltinSinkAccumulator::SelectOne(BuiltinSelectionPosition::First),
predicate_kernel: None,
project_kernel: None,
materialization: ViewMaterialization::SinkFinalRow,
}
.materialization(),
ViewMaterialization::SinkFinalRow
);
assert_eq!(
ViewSinkCapability::Predicate {
op: PredicateSinkOp::Any,
predicate_kernel: 0,
}
.materialization(),
ViewMaterialization::Never
);
assert_eq!(
ViewSinkCapability::Predicate {
op: PredicateSinkOp::FindOne,
predicate_kernel: 0,
}
.materialization(),
ViewMaterialization::SinkFinalRow
);
assert_eq!(
ViewSinkCapability::Membership {
op: MembershipSinkOp::Includes,
target: ViewMembershipTarget::Literal(Val::Int(3)),
}
.materialization(),
ViewMaterialization::Never
);
assert_eq!(
ViewSinkCapability::Membership {
op: MembershipSinkOp::Includes,
target: ViewMembershipTarget::Literal(Val::arr(vec![Val::Int(3)])),
}
.materialization(),
ViewMaterialization::SinkInputRows
);
assert_eq!(
ViewSinkCapability::ArgExtreme {
want_max: true,
key_kernel: 0,
}
.materialization(),
ViewMaterialization::SinkFinalRow
);
assert_eq!(
ViewSinkCapability::SelectMany {
n: 2,
from_end: true,
source_reversed: true,
}
.materialization(),
ViewMaterialization::SinkOutputRows
);
}
#[test]
fn sink_view_capability_uses_carried_metadata() {
assert!(matches!(
Sink::Reducer(ReducerSpec::count()).view_capability(&[]),
Some(ViewSinkCapability::Builtin {
accumulator: BuiltinSinkAccumulator::Count,
predicate_kernel: None,
project_kernel: None,
materialization: ViewMaterialization::Never,
})
));
assert!(matches!(
Sink::Terminal(BuiltinMethod::First).view_capability(&[]),
Some(ViewSinkCapability::Builtin {
accumulator: BuiltinSinkAccumulator::SelectOne(BuiltinSelectionPosition::First),
predicate_kernel: None,
project_kernel: None,
materialization: ViewMaterialization::SinkFinalRow,
})
));
assert!(matches!(
Sink::Terminal(BuiltinMethod::Last).view_capability(&[]),
Some(ViewSinkCapability::Builtin {
accumulator: BuiltinSinkAccumulator::SelectOne(BuiltinSelectionPosition::Last),
predicate_kernel: None,
project_kernel: None,
materialization: ViewMaterialization::SinkFinalRow,
})
));
assert!(matches!(
Sink::Predicate(PredicateSinkSpec {
op: PredicateSinkOp::Any,
predicate: Arc::new(crate::vm::Program::new(Vec::new(), "")),
})
.view_capability(&[BodyKernel::FieldCmpLit(
Arc::from("score"),
BinOp::Gt,
Val::Int(10),
)]),
Some(ViewSinkCapability::Predicate {
op: PredicateSinkOp::Any,
predicate_kernel: 0,
})
));
assert!(matches!(
Sink::SelectMany {
n: 2,
from_end: true,
}
.view_capability(&[]),
Some(ViewSinkCapability::SelectMany {
n: 2,
from_end: true,
source_reversed: false,
})
));
assert!(matches!(
Sink::Membership(MembershipSinkSpec {
op: MembershipSinkOp::Includes,
target: MembershipSinkTarget::Literal(Val::Int(3)),
method: BuiltinMethod::Includes,
})
.view_capability(&[]),
Some(ViewSinkCapability::Membership {
op: MembershipSinkOp::Includes,
target: ViewMembershipTarget::Literal(Val::Int(3)),
})
));
assert!(matches!(
Sink::Membership(MembershipSinkSpec {
op: MembershipSinkOp::Includes,
target: MembershipSinkTarget::Program(Arc::new(crate::vm::Program::new(
Vec::new(),
""
))),
method: BuiltinMethod::Includes,
})
.view_capability(&[]),
Some(ViewSinkCapability::Membership {
op: MembershipSinkOp::Includes,
target: ViewMembershipTarget::Program(_),
})
));
assert!(matches!(
Sink::ArgExtreme(ArgExtremeSinkSpec {
want_max: true,
key: Arc::new(crate::vm::Program::new(Vec::new(), "")),
})
.view_capability(&[BodyKernel::FieldRead(Arc::from("score"))]),
Some(ViewSinkCapability::ArgExtreme {
want_max: true,
key_kernel: 0,
})
));
assert!(Sink::ArgExtreme(ArgExtremeSinkSpec {
want_max: false,
key: Arc::new(crate::vm::Program::new(Vec::new(), "")),
})
.view_capability(&[BodyKernel::Generic])
.is_none());
}
#[test]
fn view_capabilities_preserve_expected_metadata() {
let body = PipelineBody {
stages: vec![
Stage::Filter(
Arc::new(crate::vm::Program::new(Vec::new(), "")),
BuiltinViewStage::Filter,
),
Stage::Map(
Arc::new(crate::vm::Program::new(Vec::new(), "")),
BuiltinViewStage::Map,
),
Stage::UsizeBuiltin {
method: BuiltinMethod::Take,
value: 2,
},
],
stage_exprs: Vec::new(),
sink: Sink::Reducer(ReducerSpec {
op: ReducerOp::Numeric(NumOp::Sum),
predicate: None,
projection: Some(Arc::new(crate::vm::Program::new(Vec::new(), ""))),
predicate_expr: None,
projection_expr: None,
}),
stage_kernels: vec![
BodyKernel::FieldCmpLit(Arc::from("score"), BinOp::Gt, Val::Int(10)),
BodyKernel::FieldRead(Arc::from("score")),
BodyKernel::Generic,
],
sink_kernels: vec![BodyKernel::FieldRead(Arc::from("score"))],
};
let capabilities = view_capabilities(&body).unwrap();
assert_eq!(capabilities.stages.len(), 3);
assert_eq!(
capabilities.stages[0].output_mode(),
ViewOutputMode::PreservesInputView
);
assert_eq!(
capabilities.stages[1].output_mode(),
ViewOutputMode::BorrowedSubview
);
assert_eq!(
capabilities.sink.materialization(),
ViewMaterialization::SinkNumericInput
);
}
#[test]
fn view_prefix_stops_at_first_non_view_stage() {
let body = PipelineBody {
stages: vec![
Stage::Filter(
Arc::new(crate::vm::Program::new(Vec::new(), "")),
BuiltinViewStage::Filter,
),
Stage::Map(
Arc::new(crate::vm::Program::new(Vec::new(), "")),
BuiltinViewStage::Map,
),
Stage::Builtin(crate::exec::pipeline::PipelineBuiltinCall {
method: crate::builtins::BuiltinMethod::Upper,
args: crate::builtins::BuiltinArgs::None,
}),
],
stage_exprs: Vec::new(),
sink: Sink::Collect,
stage_kernels: vec![
BodyKernel::FieldCmpLit(Arc::from("score"), BinOp::Gt, Val::Int(10)),
BodyKernel::FieldRead(Arc::from("name")),
BodyKernel::Generic,
],
sink_kernels: Vec::new(),
};
assert!(view_capabilities(&body).is_none());
let prefix = view_prefix_capabilities(&body).unwrap();
assert_eq!(prefix.consumed_stages, 2);
assert_eq!(prefix.stages.len(), 2);
}
#[test]
fn view_prefix_keeps_remove_value_before_materializing_stage() {
let body = PipelineBody {
stages: vec![
Stage::Map(
Arc::new(crate::vm::Program::new(Vec::new(), "")),
BuiltinViewStage::Map,
),
Stage::Builtin(crate::exec::pipeline::PipelineBuiltinCall {
method: crate::builtins::BuiltinMethod::Remove,
args: crate::builtins::BuiltinArgs::Val(Val::Int(2)),
}),
Stage::Builtin(crate::exec::pipeline::PipelineBuiltinCall {
method: crate::builtins::BuiltinMethod::Upper,
args: crate::builtins::BuiltinArgs::None,
}),
],
stage_exprs: Vec::new(),
sink: Sink::Collect,
stage_kernels: vec![BodyKernel::FieldRead(Arc::from("id"))],
sink_kernels: Vec::new(),
};
let prefix = view_prefix_capabilities(&body).unwrap();
assert_eq!(prefix.consumed_stages, 2);
assert!(matches!(prefix.stages[0], ViewStageCapability::Map { kernel: 0 }));
assert!(matches!(
prefix.stages[1],
ViewStageCapability::RemoveValue(Val::Int(2))
));
}
}
fn view_stage_capabilities(body: &PipelineBody) -> Option<Vec<ViewStageCapability>> {
let mut out = Vec::with_capacity(body.stages.len());
for (idx, stage) in body.stages.iter().enumerate() {
out.push(view_stage_capability(body, idx, stage)?);
}
Some(out)
}
fn view_stage_capability(
body: &PipelineBody,
idx: usize,
stage: &Stage,
) -> Option<ViewStageCapability> {
stage.view_capability(idx, body.stage_kernels.get(idx))
}
fn view_sink_capability(body: &PipelineBody) -> Option<ViewSinkCapability> {
body.sink.view_capability(&body.sink_kernels)
}