use std::sync::Arc;
use crate::data::context::{Env, EvalError};
use crate::data::value::Val;
use crate::data::view::{scalar_view_to_owned_val, ValueView};
use crate::exec::pipeline;
use crate::plan::demand::PullDemand;
use crate::util::JsonView;
use crate::vm::VM;
mod key;
mod reducer_stage;
mod stage_flow;
use key::ViewKey;
use stage_flow::{ViewStageFlow, ViewStageState};
pub(crate) fn walk_fields<'a, V>(mut cur: V, keys: &[Arc<str>]) -> V
where
V: ValueView<'a>,
{
for key in keys {
cur = cur.field(key.as_ref());
}
cur
}
#[allow(dead_code)]
pub(crate) fn run_with_env<'a, V>(
source: V,
body: &pipeline::PipelineBody,
cache: Option<&dyn pipeline::PipelineData>,
base_env: &Env,
) -> Option<Result<Val, EvalError>>
where
V: ValueView<'a>,
{
let mut vm = VM::new();
run_with_env_and_vm(source, body, cache, base_env, &mut vm)
}
pub(crate) fn run_with_env_and_vm<'a, V>(
source: V,
body: &pipeline::PipelineBody,
cache: Option<&dyn pipeline::PipelineData>,
base_env: &Env,
vm: &mut VM,
) -> Option<Result<Val, EvalError>>
where
V: ValueView<'a>,
{
if let Some(result) = run_terminal_collect(source.clone(), body) {
return Some(result);
}
if let Some(result) = run_terminal_select_projection(source.clone(), body) {
return Some(result);
}
if let Some(result) = run_full_with_env(source.clone(), body, Some(base_env), vm) {
return Some(result);
}
if let Some(result) =
run_reducing_stage_prefix_then_materialized_suffix(source.clone(), body, cache, base_env, vm)
{
return Some(result);
}
if let Some(result) =
run_sort_prefix_then_materialized_suffix(source.clone(), body, cache, base_env, vm)
{
return Some(result);
}
run_prefix_then_materialized_suffix(source, body, cache, base_env, vm)
}
#[cfg(test)]
fn run_full<'a, V>(source: V, body: &pipeline::PipelineBody) -> Option<Result<Val, EvalError>>
where
V: ValueView<'a>,
{
let mut vm = VM::new();
run_full_with_env(source, body, None, &mut vm)
}
fn run_full_with_env<'a, V>(
source: V,
body: &pipeline::PipelineBody,
base_env: Option<&Env>,
vm: &mut VM,
) -> Option<Result<Val, EvalError>>
where
V: ValueView<'a>,
{
let capabilities = pipeline::view_capabilities(body)?;
let mut sink_acc = pipeline::SinkAccumulator::new(&body.sink);
let source_demand = body.pull_demand();
let sink = match (source_demand, capabilities.sink) {
(PullDemand::NthInput(_), pipeline::ViewSinkCapability::Nth { .. }) => {
pipeline::ViewSinkCapability::Nth { index: 0 }
}
(
PullDemand::LastInput(_),
pipeline::ViewSinkCapability::SelectMany { n, from_end, .. },
) => pipeline::ViewSinkCapability::SelectMany {
n,
from_end,
source_reversed: true,
},
(_, sink) => sink,
};
let sink = match resolve_view_sink(sink, base_env, vm) {
Some(Ok(sink)) => sink,
Some(Err(err)) => return Some(Err(err)),
None => return None,
};
drive_view_frontier(
source,
pipeline::SourceCapabilities::VIEW_ARRAY,
&capabilities.stages,
&body.stage_kernels,
source_demand,
|item| observe_view_sink(item, &sink, &mut sink_acc, &body.sink_kernels),
)?;
Some(sink_acc.finish_result(false))
}
fn resolve_view_sink(
sink: pipeline::ViewSinkCapability,
base_env: Option<&Env>,
vm: &mut VM,
) -> Option<Result<pipeline::ViewSinkCapability, EvalError>> {
match sink {
pipeline::ViewSinkCapability::Membership {
op,
target: pipeline::ViewMembershipTarget::Program(program),
} => {
let env = base_env?;
Some(vm.exec_in_env(&program, env).map(|target| {
pipeline::ViewSinkCapability::Membership {
op,
target: pipeline::ViewMembershipTarget::Literal(target),
}
}))
}
sink => Some(Ok(sink)),
}
}
fn observe_view_sink<'a, V>(
item: &V,
sink: &pipeline::ViewSinkCapability,
sink_acc: &mut pipeline::SinkAccumulator,
sink_kernels: &[pipeline::BodyKernel],
) -> Option<ViewRowAction>
where
V: ValueView<'a>,
{
match sink {
pipeline::ViewSinkCapability::Collect => {
debug_assert_eq!(
sink.materialization(),
pipeline::ViewMaterialization::SinkOutputRows
);
sink_acc.observe_collect(item.materialize());
Some(ViewRowAction::Emit)
}
pipeline::ViewSinkCapability::Builtin {
accumulator,
predicate_kernel,
project_kernel,
..
} => {
if !view_sink_predicate_matches(item, *predicate_kernel, sink_kernels)? {
return Some(ViewRowAction::Skip);
}
let sink_done = sink_acc.observe_builtin_lazy(
*accumulator,
|| scalar_view_to_owned_val(item.scalar()).unwrap_or_else(|| item.materialize()),
|| {
let kernel = (*project_kernel)?;
let kernel = sink_kernels.get(kernel)?;
eval_owned_scalar_or_value_kernel(item, kernel)
},
|| Some(eval_view_key(item, None)?.object_key().to_string()),
)?;
Some(if sink_done {
ViewRowAction::Stop
} else {
ViewRowAction::Emit
})
}
pipeline::ViewSinkCapability::Nth { index } => {
let sink_done = sink_acc.observe_nth_lazy(*index, || item.materialize());
Some(if sink_done {
ViewRowAction::Stop
} else {
ViewRowAction::Emit
})
}
pipeline::ViewSinkCapability::Predicate {
op,
predicate_kernel,
} => {
let kernel = sink_kernels.get(*predicate_kernel)?;
let matched = eval_filter_kernel(item, kernel)?;
let sink_done = sink_acc
.observe_predicate_lazy(*op, matched, || item.materialize())
.ok()?;
Some(if sink_done {
ViewRowAction::Stop
} else if matched {
ViewRowAction::Emit
} else {
ViewRowAction::Skip
})
}
pipeline::ViewSinkCapability::Membership { op, target } => {
let pipeline::ViewMembershipTarget::Literal(target) = target else {
return None;
};
let matched = view_membership_matches(item, target);
let sink_done = sink_acc.observe_membership_match(*op, matched);
Some(if sink_done {
ViewRowAction::Stop
} else if matched {
ViewRowAction::Emit
} else {
ViewRowAction::Skip
})
}
pipeline::ViewSinkCapability::ArgExtreme {
want_max,
key_kernel,
} => {
let key = view_arg_extreme_key(item, sink_kernels.get(*key_kernel)?)?;
sink_acc.observe_arg_extreme_lazy(*want_max, key, || item.materialize());
Some(ViewRowAction::Emit)
}
pipeline::ViewSinkCapability::SelectMany {
n,
from_end,
source_reversed,
} => {
let sink_done =
sink_acc.observe_select_many_lazy(*n, *from_end, *source_reversed, || {
item.materialize()
});
Some(if sink_done {
ViewRowAction::Stop
} else {
ViewRowAction::Emit
})
}
}
}
fn view_membership_matches<'a, V>(item: &V, target: &Val) -> bool
where
V: ValueView<'a>,
{
view_matches_value(item, target)
}
fn view_matches_value<'a, V>(item: &V, target: &Val) -> bool
where
V: ValueView<'a>,
{
let target_view = JsonView::from_val(target);
if !matches!(target_view, JsonView::ArrayLen(_) | JsonView::ObjectLen(_)) {
return crate::util::json_vals_eq(item.scalar(), target_view);
}
crate::util::vals_eq(&item.materialize(), target)
}
fn view_arg_extreme_key<'a, V>(item: &V, kernel: &pipeline::BodyKernel) -> Option<Val>
where
V: ValueView<'a>,
{
match pipeline::eval_view_kernel(kernel, item)? {
pipeline::ViewKernelValue::View(view) => {
scalar_view_to_owned_val(view.scalar()).or_else(|| Some(view.materialize()))
}
pipeline::ViewKernelValue::Owned(value) => Some(value),
}
}
fn view_sink_predicate_matches<'a, V>(
item: &V,
predicate_kernel: Option<usize>,
sink_kernels: &[pipeline::BodyKernel],
) -> Option<bool>
where
V: ValueView<'a>,
{
let Some(kernel_idx) = predicate_kernel else {
return Some(true);
};
let kernel = sink_kernels.get(kernel_idx)?;
eval_filter_kernel(item, kernel)
}
fn run_prefix_then_materialized_suffix<'a, V>(
source: V,
body: &pipeline::PipelineBody,
cache: Option<&dyn pipeline::PipelineData>,
base_env: &Env,
vm: &mut VM,
) -> Option<Result<Val, EvalError>>
where
V: ValueView<'a>,
{
let prefix = pipeline::view_prefix_capabilities(body)?;
if prefix.consumed_stages >= body.stages.len()
&& !body.suffix_can_run_with_materialized_receiver(prefix.consumed_stages)
{
return None;
}
if !body.suffix_can_run_with_materialized_receiver(prefix.consumed_stages) {
return None;
}
let mut boundary_rows = Vec::new();
let source_demand =
pipeline::Pipeline::segment_pull_demand(&body.stages[..prefix.consumed_stages], &body.sink);
drive_view_frontier(
source,
pipeline::SourceCapabilities::VIEW_ARRAY,
&prefix.stages,
&body.stage_kernels,
source_demand,
|item| {
boundary_rows.push(item.materialize());
Some(ViewRowAction::Emit)
},
)?;
Some(run_materialized_suffix(
body,
prefix.consumed_stages,
boundary_rows,
cache,
base_env,
vm,
))
}
fn run_terminal_collect<'a, V>(
source: V,
body: &pipeline::PipelineBody,
) -> Option<Result<Val, EvalError>>
where
V: ValueView<'a>,
{
let plan = terminal_collect_plan(body)?;
let mut collector = pipeline::TerminalCollector::new(plan.collect_program.kernel());
drive_view_frontier(
source,
pipeline::SourceCapabilities::VIEW_ARRAY,
&plan.prefix,
&body.stage_kernels,
plan.source_demand,
|item| {
collector.push_view_program(item, &plan.collect_program)?;
Some(ViewRowAction::Emit)
},
)?;
Some(Ok(collector.finish()))
}
fn run_terminal_select_projection<'a, V>(
source: V,
body: &pipeline::PipelineBody,
) -> Option<Result<Val, EvalError>>
where
V: ValueView<'a>,
{
let (prefix_len, project_kernel) = terminal_projection_run(body, 0)?;
let position = match &body.sink {
pipeline::Sink::Nth(_) => TerminalSelectPosition::Nth,
_ => {
let sink_spec = body.sink.builtin_sink_spec()?;
match sink_spec.accumulator {
crate::builtins::BuiltinSinkAccumulator::SelectOne(
crate::builtins::BuiltinSelectionPosition::First,
) => TerminalSelectPosition::First,
crate::builtins::BuiltinSinkAccumulator::SelectOne(
crate::builtins::BuiltinSelectionPosition::Last,
) => TerminalSelectPosition::Last,
_ => return None,
}
}
};
let prefix = terminal_collect_prefix_from(&body.stages[..prefix_len], body, 0)?;
let source_demand =
pipeline::Pipeline::segment_pull_demand(&body.stages[..prefix_len], &body.sink);
let mut selected = Val::Null;
let mut seen = false;
let mut nth_seen = 0usize;
let nth_target = match body.sink {
pipeline::Sink::Nth(_)
if matches!(source_demand, PullDemand::NthInput(_))
&& pipeline::ViewStageCapability::all_preserve_cardinality(&prefix) =>
{
Some(0)
}
pipeline::Sink::Nth(index) => Some(index),
_ => None,
};
drive_view_frontier(
source,
pipeline::SourceCapabilities::VIEW_ARRAY,
&prefix,
&body.stage_kernels,
source_demand,
|item| {
if let Some(target) = nth_target {
if nth_seen < target {
nth_seen += 1;
return Some(ViewRowAction::Emit);
}
}
selected = eval_owned_scalar_or_value_kernel(item, &project_kernel)?;
seen = true;
Some(match position {
TerminalSelectPosition::First | TerminalSelectPosition::Nth => ViewRowAction::Stop,
TerminalSelectPosition::Last => ViewRowAction::Emit,
})
},
)?;
Some(Ok(if seen { selected } else { Val::Null }))
}
#[derive(Clone, Copy)]
enum TerminalSelectPosition {
First,
Last,
Nth,
}
enum ViewRowAction {
Skip,
Emit,
Stop,
}
enum ViewDriveFlow {
Continue,
Stop,
}
fn drive_view_frontier<'a, V, F>(
source: V,
source_capabilities: pipeline::SourceCapabilities,
stages: &[pipeline::ViewStageCapability],
stage_kernels: &[pipeline::BodyKernel],
source_demand: PullDemand,
observe: F,
) -> Option<()>
where
V: ValueView<'a>,
F: FnMut(&V) -> Option<ViewRowAction>,
{
if source_demand.is_zero() {
return Some(());
}
let access = source_capabilities.choose_view_access(source_demand, stages);
match access {
pipeline::SourceAccessMode::Reverse { outputs } => {
let len = match source.scalar() {
JsonView::ArrayLen(len) => len,
_ => return None,
};
let items = (0..len).rev().map(|idx| source.index(idx as i64));
return drive_view_iter(
items,
stages,
stage_kernels,
PullDemand::LastInput(outputs),
observe,
);
}
pipeline::SourceAccessMode::Indexed(idx) => {
let len = match source.scalar() {
JsonView::ArrayLen(len) => len,
_ => return None,
};
if idx >= len {
return Some(());
}
let items = std::iter::once(source.index(idx as i64));
return drive_view_iter(items, stages, stage_kernels, PullDemand::All, observe);
}
pipeline::SourceAccessMode::IndexedFromEnd(offset) => {
let len = match source.scalar() {
JsonView::ArrayLen(len) => len,
_ => return None,
};
let Some(idx) = index_from_end(len, offset) else {
return Some(());
};
let items = std::iter::once(source.index(idx as i64));
return drive_view_iter(items, stages, stage_kernels, PullDemand::All, observe);
}
pipeline::SourceAccessMode::ForwardBounded(inputs) => {
let items = source.array_iter()?;
return drive_view_iter(
items,
stages,
stage_kernels,
PullDemand::FirstInput(inputs),
observe,
);
}
pipeline::SourceAccessMode::Forward | pipeline::SourceAccessMode::MaterializedFallback => {}
}
let items = source.array_iter()?;
drive_view_iter(items, stages, stage_kernels, source_demand, observe)
}
fn index_from_end(len: usize, offset: usize) -> Option<usize> {
len.checked_sub(offset.checked_add(1)?)
}
fn drive_view_iter<'a, V, I, F>(
items: I,
stages: &[pipeline::ViewStageCapability],
stage_kernels: &[pipeline::BodyKernel],
source_demand: PullDemand,
mut observe: F,
) -> Option<()>
where
V: ValueView<'a>,
I: IntoIterator<Item = V>,
F: FnMut(&V) -> Option<ViewRowAction>,
{
let mut op_state: Vec<ViewStageState> = (0..stages.len())
.map(|_| ViewStageState::default())
.collect();
let mut pulled_inputs = 0usize;
let mut emitted_outputs = 0usize;
for row in items {
if matches!(source_demand, PullDemand::FirstInput(n) if pulled_inputs >= n) {
break;
}
pulled_inputs += 1;
if matches!(
drive_view_item(
row,
0,
stages,
&mut op_state,
stage_kernels,
source_demand,
&mut emitted_outputs,
&mut observe,
)?,
ViewDriveFlow::Stop
) {
break;
}
if matches!(source_demand, PullDemand::LastInput(n) if emitted_outputs >= n) {
break;
}
}
Some(())
}
fn drive_view_item<'a, V, F>(
item: V,
stage_idx: usize,
stages: &[pipeline::ViewStageCapability],
op_state: &mut [ViewStageState],
stage_kernels: &[pipeline::BodyKernel],
source_demand: PullDemand,
emitted_outputs: &mut usize,
observe: &mut F,
) -> Option<ViewDriveFlow>
where
V: ValueView<'a>,
F: FnMut(&V) -> Option<ViewRowAction>,
{
let Some(stage) = stages.get(stage_idx).cloned() else {
return match observe(&item)? {
ViewRowAction::Skip => Some(ViewDriveFlow::Continue),
ViewRowAction::Emit => {
*emitted_outputs += 1;
Some(
if matches!(source_demand, PullDemand::UntilOutput(n) if *emitted_outputs >= n)
|| matches!(source_demand, PullDemand::LastInput(n) if *emitted_outputs >= n)
{
ViewDriveFlow::Stop
} else {
ViewDriveFlow::Continue
},
)
}
ViewRowAction::Stop => Some(ViewDriveFlow::Stop),
};
};
if let pipeline::ViewStageCapability::FlatMap { kernel } = stage {
debug_assert_eq!(stage.input_mode(), pipeline::ViewInputMode::ReadsView);
debug_assert_eq!(
stage.output_mode(),
pipeline::ViewOutputMode::BorrowedSubviews
);
let kernel = stage_kernels.get(kernel)?;
for child in eval_flat_map_kernel(&item, kernel)? {
if matches!(
drive_view_item(
child,
stage_idx + 1,
stages,
op_state,
stage_kernels,
source_demand,
emitted_outputs,
observe,
)?,
ViewDriveFlow::Stop
) {
return Some(ViewDriveFlow::Stop);
}
}
return Some(ViewDriveFlow::Continue);
}
match apply_view_stage(item, stage, stage_idx, op_state, stage_kernels)? {
ViewStageFlow::Keep(next) => drive_view_item(
next,
stage_idx + 1,
stages,
op_state,
stage_kernels,
source_demand,
emitted_outputs,
observe,
),
ViewStageFlow::Drop => Some(ViewDriveFlow::Continue),
ViewStageFlow::Stop => Some(ViewDriveFlow::Stop),
}
}
struct TerminalCollectPlan {
prefix: Vec<pipeline::ViewStageCapability>,
collect_program: pipeline::RowProgram,
source_demand: PullDemand,
}
fn terminal_collect_plan(body: &pipeline::PipelineBody) -> Option<TerminalCollectPlan> {
terminal_collect_plan_from(body, 0)
}
fn terminal_collect_plan_from(
body: &pipeline::PipelineBody,
start: usize,
) -> Option<TerminalCollectPlan> {
if !matches!(
body.sink.view_capability(&body.sink_kernels)?,
pipeline::ViewSinkCapability::Collect
) {
return None;
}
let suffix_stages = body.stages.get(start..)?;
let source_demand = pipeline::Pipeline::segment_pull_demand(suffix_stages, &body.sink);
if let Some((prefix_len, collect_kernel)) = terminal_projection_run(body, start) {
return Some(TerminalCollectPlan {
prefix: terminal_collect_prefix_from(&suffix_stages[..prefix_len], body, start)?,
collect_program: pipeline::RowProgram::from_kernel(collect_kernel)?,
source_demand,
});
}
Some(TerminalCollectPlan {
prefix: terminal_collect_prefix_from(suffix_stages, body, start)?,
collect_program: pipeline::RowProgram::from_kernel(pipeline::BodyKernel::Current)?,
source_demand,
})
}
fn terminal_projection_run(
body: &pipeline::PipelineBody,
start: usize,
) -> Option<(usize, pipeline::BodyKernel)> {
let suffix_stages = body.stages.get(start..)?;
let mut idx = suffix_stages.len();
let mut kernel = pipeline::BodyKernel::Current;
let mut found = false;
while idx > 0 {
let abs_idx = start + idx - 1;
let Some(stage_kernel) = terminal_projection_stage_kernel(
&body.stages[abs_idx],
abs_idx,
body.stage_kernels.get(abs_idx),
) else {
break;
};
kernel = compose_projection_kernels(stage_kernel, kernel);
found = true;
idx -= 1;
}
found.then_some((idx, kernel))
}
fn terminal_projection_stage_kernel(
stage: &pipeline::Stage,
idx: usize,
kernel: Option<&pipeline::BodyKernel>,
) -> Option<pipeline::BodyKernel> {
if matches!(
stage.view_capability(idx, kernel),
Some(pipeline::ViewStageCapability::Map { .. })
) {
let kernel = kernel?;
return kernel.is_view_native().then(|| kernel.clone());
}
match stage {
pipeline::Stage::Builtin(call) if call.method.is_view_projection_method() => {
Some(pipeline::BodyKernel::BuiltinCall {
receiver: Box::new(pipeline::BodyKernel::Current),
call: call.clone(),
})
}
_ => None,
}
}
fn compose_projection_kernels(
first: pipeline::BodyKernel,
then: pipeline::BodyKernel,
) -> pipeline::BodyKernel {
if matches!(then, pipeline::BodyKernel::Current) {
return first;
}
pipeline::BodyKernel::Compose {
first: Box::new(first),
then: Box::new(then),
}
}
fn terminal_collect_prefix_from(
stages: &[pipeline::Stage],
body: &pipeline::PipelineBody,
start: usize,
) -> Option<Vec<pipeline::ViewStageCapability>> {
let mut prefix = Vec::with_capacity(stages.len());
for (offset, stage) in stages.iter().enumerate() {
let idx = start + offset;
let capability = stage.view_capability(idx, body.stage_kernels.get(idx))?;
if !matches!(
capability.materialization(),
pipeline::ViewMaterialization::Never
) {
return None;
}
prefix.push(capability);
}
Some(prefix)
}
fn run_reducing_stage_prefix_then_materialized_suffix<'a, V>(
source: V,
body: &pipeline::PipelineBody,
cache: Option<&dyn pipeline::PipelineData>,
base_env: &Env,
vm: &mut VM,
) -> Option<Result<Val, EvalError>>
where
V: ValueView<'a>,
{
let mut plan = reducer_stage::plan(body)?;
if !body.suffix_can_run_with_materialized_receiver(plan.consumed_stages) {
return None;
}
let source_demand = body.pull_demand();
drive_view_frontier(
source,
pipeline::SourceCapabilities::VIEW_ARRAY,
&plan.prefix,
&body.stage_kernels,
source_demand,
|item| {
plan.reducer.observe(item, &body.stage_kernels)?;
Some(ViewRowAction::Emit)
},
)?;
Some(run_materialized_value_suffix(
body,
plan.consumed_stages,
plan.reducer.finish(),
cache,
base_env,
vm,
))
}
fn run_sort_prefix_then_materialized_suffix<'a, V>(
source: V,
body: &pipeline::PipelineBody,
cache: Option<&dyn pipeline::PipelineData>,
base_env: &Env,
vm: &mut VM,
) -> Option<Result<Val, EvalError>>
where
V: ValueView<'a>,
{
let plan = sort_barrier_plan(body)?;
let strategies =
pipeline::compute_strategies_with_kernels(&body.stages, &body.stage_kernels, &body.sink);
let strategy = strategies
.get(plan.sort_stage)
.copied()
.unwrap_or(pipeline::StageStrategy::Default);
if matches!(strategy, pipeline::StageStrategy::SortUntilOutput(_)) {
return run_sort_prefix_then_view_suffix(source, body, &plan, base_env, vm);
}
let collect_suffix = terminal_collect_plan_from(body, plan.sort_stage + 1);
if collect_suffix.is_none()
&& !body.suffix_can_run_with_materialized_receiver(plan.sort_stage + 1)
{
return None;
}
let mut sorter =
pipeline::BoundedKeySorter::new(plan.descending, strategy, pipeline::cmp_val_total);
drive_view_frontier(
source,
pipeline::SourceCapabilities::VIEW_ARRAY,
&plan.prefix,
&body.stage_kernels,
PullDemand::All,
|item| {
let key = view_sort_key(item, plan.key_program.as_ref())?;
sorter.push_keyed(key, item.clone());
Some(ViewRowAction::Emit)
},
)?;
let winners = sorter.finish();
if let Some(collect_plan) = collect_suffix {
return run_sorted_rows_terminal_collect_suffix(
winners,
&collect_plan,
&body.stage_kernels,
);
}
if let Some(out) = run_sorted_rows_terminal_select_projection_suffix(
winners.as_slice(),
body,
plan.sort_stage + 1,
false,
) {
return Some(out);
}
if let Some(out) =
run_sorted_rows_view_suffix(winners.as_slice(), body, plan.sort_stage + 1, base_env, vm)
{
return Some(out);
}
let boundary_rows: Vec<Val> = winners.into_iter().map(|row| row.materialize()).collect();
Some(run_materialized_suffix(
body,
plan.sort_stage + 1,
boundary_rows,
cache,
base_env,
vm,
))
}
fn run_sorted_rows_terminal_collect_suffix<'a, V>(
rows: Vec<V>,
plan: &TerminalCollectPlan,
stage_kernels: &[pipeline::BodyKernel],
) -> Option<Result<Val, EvalError>>
where
V: ValueView<'a>,
{
let mut collector = pipeline::TerminalCollector::new(plan.collect_program.kernel());
drive_view_iter(
rows,
&plan.prefix,
stage_kernels,
plan.source_demand,
|item| {
collector.push_view_program(item, &plan.collect_program)?;
Some(ViewRowAction::Emit)
},
)?;
Some(Ok(collector.finish()))
}
fn run_sorted_rows_terminal_select_projection_suffix<'a, V>(
rows: &[V],
body: &pipeline::PipelineBody,
suffix_start: usize,
source_reversed: bool,
) -> Option<Result<Val, EvalError>>
where
V: ValueView<'a>,
{
let (relative_prefix_len, project_kernel) = terminal_projection_run(body, suffix_start)?;
let prefix_end = suffix_start + relative_prefix_len;
let prefix =
terminal_collect_prefix_from(&body.stages[suffix_start..prefix_end], body, suffix_start)?;
let source_demand = pipeline::Pipeline::segment_pull_demand(
&body.stages[suffix_start..prefix_end],
&body.sink,
);
if let pipeline::Sink::SelectMany { from_end, .. } = body.sink {
let mut selected = Vec::new();
drive_view_iter(
rows.iter().cloned(),
&prefix,
&body.stage_kernels,
source_demand,
|item| {
selected.push(eval_owned_scalar_or_value_kernel(item, &project_kernel)?);
Some(ViewRowAction::Emit)
},
)?;
if from_end && source_reversed {
selected.reverse();
}
return Some(Ok(Val::Arr(Arc::new(selected))));
}
let mut nth_target = None;
let position = match &body.sink {
pipeline::Sink::Nth(index) => {
nth_target = Some(*index);
TerminalSelectPosition::Nth
}
_ => {
let sink_spec = body.sink.builtin_sink_spec()?;
match sink_spec.accumulator {
crate::builtins::BuiltinSinkAccumulator::SelectOne(
crate::builtins::BuiltinSelectionPosition::First,
) => TerminalSelectPosition::First,
crate::builtins::BuiltinSinkAccumulator::SelectOne(
crate::builtins::BuiltinSelectionPosition::Last,
) => TerminalSelectPosition::Last,
_ => return None,
}
}
};
let mut selected = Val::Null;
let mut seen = false;
let mut selected_index = 0usize;
drive_view_iter(
rows.iter().cloned(),
&prefix,
&body.stage_kernels,
source_demand,
|item| {
if let Some(target) = nth_target {
if selected_index < target {
selected_index += 1;
return Some(ViewRowAction::Skip);
}
}
selected = eval_owned_scalar_or_value_kernel(item, &project_kernel)?;
seen = true;
Some(match position {
TerminalSelectPosition::First | TerminalSelectPosition::Nth => ViewRowAction::Stop,
TerminalSelectPosition::Last => ViewRowAction::Emit,
})
},
)?;
Some(Ok(if seen { selected } else { Val::Null }))
}
fn run_sorted_rows_view_suffix<'a, V>(
rows: &[V],
body: &pipeline::PipelineBody,
suffix_start: usize,
base_env: &Env,
vm: &mut VM,
) -> Option<Result<Val, EvalError>>
where
V: ValueView<'a>,
{
let suffix = view_suffix_capabilities(body, suffix_start)?;
let source_demand =
pipeline::Pipeline::segment_pull_demand(&body.stages[suffix_start..], &body.sink);
let sink = view_suffix_sink_for_demand(suffix.sink, source_demand);
let sink = match resolve_view_sink(sink, Some(base_env), vm) {
Some(Ok(sink)) => sink,
Some(Err(err)) => return Some(Err(err)),
None => return None,
};
let mut sink_acc = pipeline::SinkAccumulator::new(&body.sink);
drive_view_iter(
rows.iter().cloned(),
&suffix.stages,
&body.stage_kernels,
source_demand,
|item| observe_view_sink(item, &sink, &mut sink_acc, &body.sink_kernels),
)?;
Some(sink_acc.finish_result(false))
}
fn run_sort_prefix_then_view_suffix<'a, V>(
source: V,
body: &pipeline::PipelineBody,
plan: &SortBarrierPlan,
base_env: &Env,
vm: &mut VM,
) -> Option<Result<Val, EvalError>>
where
V: ValueView<'a>,
{
let suffix = view_suffix_capabilities(body, plan.sort_stage + 1)?;
let source_demand =
pipeline::Pipeline::segment_pull_demand(&body.stages[plan.sort_stage + 1..], &body.sink);
let sink = view_suffix_sink_for_demand(suffix.sink, source_demand);
let sink = match resolve_view_sink(sink, Some(base_env), vm) {
Some(Ok(sink)) => sink,
Some(Err(err)) => return Some(Err(err)),
None => return None,
};
let ordered_descending = if matches!(source_demand, PullDemand::LastInput(_)) {
!plan.descending
} else {
plan.descending
};
let source_reversed = ordered_descending != plan.descending;
let mut sorter = pipeline::OrderedKeySorter::new(ordered_descending, pipeline::cmp_val_total);
drive_view_frontier(
source,
pipeline::SourceCapabilities::VIEW_ARRAY,
&plan.prefix,
&body.stage_kernels,
PullDemand::All,
|item| {
let key = view_sort_key(item, plan.key_program.as_ref())?;
sorter.push_keyed(key, item.clone());
Some(ViewRowAction::Emit)
},
)?;
let mut sink_acc = pipeline::SinkAccumulator::new(&body.sink);
let ordered: Vec<_> = sorter.finish().collect();
if let Some(out) = run_sorted_rows_terminal_select_projection_suffix(
ordered.as_slice(),
body,
plan.sort_stage + 1,
source_reversed,
) {
return Some(out);
}
drive_view_iter(
ordered,
&suffix.stages,
&body.stage_kernels,
source_demand,
|item| observe_view_sink(item, &sink, &mut sink_acc, &body.sink_kernels),
)?;
Some(sink_acc.finish_result(false))
}
fn view_suffix_sink_for_demand(
sink: pipeline::ViewSinkCapability,
source_demand: PullDemand,
) -> pipeline::ViewSinkCapability {
match (source_demand, sink) {
(
PullDemand::LastInput(_),
pipeline::ViewSinkCapability::SelectMany { n, from_end, .. },
) => pipeline::ViewSinkCapability::SelectMany {
n,
from_end,
source_reversed: true,
},
(_, sink) => sink,
}
}
struct SortBarrierPlan {
prefix: Vec<pipeline::ViewStageCapability>,
sort_stage: usize,
key_program: Option<pipeline::RowProgram>,
descending: bool,
}
struct ViewSuffixCapabilities {
stages: Vec<pipeline::ViewStageCapability>,
sink: pipeline::ViewSinkCapability,
}
fn view_suffix_capabilities(
body: &pipeline::PipelineBody,
start: usize,
) -> Option<ViewSuffixCapabilities> {
let mut stages = Vec::with_capacity(body.stages.len().saturating_sub(start));
for (idx, stage) in body.stages.iter().enumerate().skip(start) {
stages.push(stage.view_capability(idx, body.stage_kernels.get(idx))?);
}
Some(ViewSuffixCapabilities {
stages,
sink: body.sink.view_capability(&body.sink_kernels)?,
})
}
fn sort_barrier_plan(body: &pipeline::PipelineBody) -> Option<SortBarrierPlan> {
let mut prefix = Vec::new();
for (idx, stage) in body.stages.iter().enumerate() {
match stage {
pipeline::Stage::Sort(spec) => {
let key_program = if spec.key.is_some() {
let kernel = body.stage_kernels.get(idx)?;
kernel
.is_view_native()
.then(|| pipeline::RowProgram::from_kernel(kernel.clone()))?
} else {
None
};
return Some(SortBarrierPlan {
prefix,
sort_stage: idx,
key_program,
descending: spec.descending,
});
}
_ => {
let capability = stage.view_capability(idx, body.stage_kernels.get(idx))?;
if !matches!(
capability.materialization(),
pipeline::ViewMaterialization::Never
) {
return None;
}
prefix.push(capability);
}
}
}
None
}
fn run_materialized_suffix(
body: &pipeline::PipelineBody,
consumed_stages: usize,
boundary_rows: Vec<Val>,
cache: Option<&dyn pipeline::PipelineData>,
base_env: &Env,
vm: &mut VM,
) -> Result<Val, EvalError> {
let suffix = suffix_body(body, consumed_stages)
.with_source(pipeline::Source::Receiver(Val::arr(boundary_rows)));
let root = Val::Null;
suffix.run_with_env_and_vm(&root, base_env, cache, vm)
}
fn run_materialized_value_suffix(
body: &pipeline::PipelineBody,
consumed_stages: usize,
boundary_value: Val,
cache: Option<&dyn pipeline::PipelineData>,
base_env: &Env,
vm: &mut VM,
) -> Result<Val, EvalError> {
if consumed_stages >= body.stages.len() && matches!(body.sink, pipeline::Sink::Collect) {
return Ok(boundary_value);
}
let suffix =
suffix_body(body, consumed_stages).with_source(pipeline::Source::Receiver(boundary_value));
let root = Val::Null;
suffix.run_with_env_and_vm(&root, base_env, cache, vm)
}
fn apply_view_stage<'a, V>(
item: V,
stage: pipeline::ViewStageCapability,
op_idx: usize,
op_state: &mut [ViewStageState],
stage_kernels: &[pipeline::BodyKernel],
) -> Option<ViewStageFlow<V>>
where
V: ValueView<'a>,
{
stage_flow::apply_stage(item, stage, op_idx, op_state, stage_kernels)
}
fn suffix_body(body: &pipeline::PipelineBody, consumed_stages: usize) -> pipeline::PipelineBody {
let stage_exprs = if body.stage_exprs.len() == body.stages.len() {
body.stage_exprs[consumed_stages..].to_vec()
} else {
Vec::new()
};
pipeline::PipelineBody {
stages: body.stages[consumed_stages..].to_vec(),
stage_exprs,
sink: body.sink.clone(),
stage_kernels: body.stage_kernels[consumed_stages..].to_vec(),
sink_kernels: body.sink_kernels.clone(),
}
}
fn eval_filter_kernel<'a, V>(item: &V, kernel: &pipeline::BodyKernel) -> Option<bool>
where
V: ValueView<'a>,
{
match pipeline::eval_view_kernel(kernel, item)? {
pipeline::ViewKernelValue::View(view) => Some(view.scalar().truthy()),
pipeline::ViewKernelValue::Owned(value) => Some(crate::util::is_truthy(&value)),
}
}
fn eval_map_kernel<'a, V>(item: &V, kernel: &pipeline::BodyKernel) -> Option<V>
where
V: ValueView<'a>,
{
match pipeline::eval_view_kernel(kernel, item)? {
pipeline::ViewKernelValue::View(view) => Some(view),
pipeline::ViewKernelValue::Owned(_) => None,
}
}
fn eval_flat_map_kernel<'a, V>(
item: &V,
kernel: &pipeline::BodyKernel,
) -> Option<Box<dyn Iterator<Item = V> + 'a>>
where
V: ValueView<'a>,
{
match pipeline::eval_view_kernel(kernel, item)? {
pipeline::ViewKernelValue::View(view) => view.array_iter(),
pipeline::ViewKernelValue::Owned(_) => None,
}
}
fn eval_owned_scalar_or_value_kernel<'a, V>(item: &V, kernel: &pipeline::BodyKernel) -> Option<Val>
where
V: ValueView<'a>,
{
match pipeline::eval_view_kernel(kernel, item)? {
pipeline::ViewKernelValue::View(view) => {
scalar_view_to_owned_val(view.scalar()).or_else(|| Some(view.materialize()))
}
pipeline::ViewKernelValue::Owned(value) => Some(value),
}
}
fn eval_view_key<'a, V>(item: &V, kernel: Option<&pipeline::BodyKernel>) -> Option<ViewKey>
where
V: ValueView<'a>,
{
match kernel {
Some(kernel) => match pipeline::eval_view_kernel(kernel, item)? {
pipeline::ViewKernelValue::View(view) => ViewKey::from_view(view.scalar())
.or_else(|| Some(ViewKey::from_owned(view.materialize()))),
pipeline::ViewKernelValue::Owned(value) => Some(ViewKey::from_owned(value)),
},
None => ViewKey::from_view(item.scalar())
.or_else(|| Some(ViewKey::from_owned(item.materialize()))),
}
}
fn view_sort_key<'a, V>(item: &V, key: Option<&pipeline::RowProgram>) -> Option<Val>
where
V: ValueView<'a>,
{
match key {
Some(program) => match program.eval_view(item)? {
pipeline::ViewKernelValue::View(view) => {
scalar_view_to_owned_val(view.scalar()).or_else(|| Some(view.materialize()))
}
pipeline::ViewKernelValue::Owned(value) => Some(value),
},
None => Some(item.materialize()),
}
}
#[cfg(test)]
mod tests {
use std::cell::Cell;
use std::rc::Rc;
use std::sync::Arc;
use indexmap::IndexMap;
use crate::compile::compiler::Compiler;
use crate::data::context::Env;
use crate::data::value::Val;
use crate::data::view::{ValView, ValueView};
use crate::exec::pipeline::{
ArgExtremeSinkSpec, BodyKernel, MembershipSinkOp, MembershipSinkSpec, MembershipSinkTarget,
PipelineBody, PredicateSinkOp, PredicateSinkSpec, Sink, SourceCapabilities, Stage,
ViewSinkCapability, ViewStageCapability,
};
use crate::plan::demand::PullDemand;
use crate::parse::ast::BinOp;
use crate::util::JsonView;
#[derive(Clone)]
struct CountingView {
rows: Arc<[i64]>,
idx: Option<usize>,
scalar_reads: Rc<Cell<usize>>,
array_iter_reads: Rc<Cell<usize>>,
materialize_reads: Rc<Cell<usize>>,
}
impl CountingView {
fn root(rows: &[i64]) -> Self {
Self {
rows: rows.iter().copied().collect::<Vec<_>>().into(),
idx: None,
scalar_reads: Rc::new(Cell::new(0)),
array_iter_reads: Rc::new(Cell::new(0)),
materialize_reads: Rc::new(Cell::new(0)),
}
}
fn scalar_reads(&self) -> usize {
self.scalar_reads.get()
}
fn materialize_reads(&self) -> usize {
self.materialize_reads.get()
}
fn array_iter_reads(&self) -> usize {
self.array_iter_reads.get()
}
}
impl<'a> ValueView<'a> for CountingView {
fn scalar(&self) -> JsonView<'_> {
self.scalar_reads.set(self.scalar_reads.get() + 1);
if self.idx.is_none() {
return JsonView::ArrayLen(self.rows.len());
}
self.idx
.and_then(|idx| self.rows.get(idx).copied())
.map(JsonView::Int)
.unwrap_or(JsonView::Null)
}
fn field(&self, _key: &str) -> Self {
Self {
rows: Arc::clone(&self.rows),
idx: None,
scalar_reads: Rc::clone(&self.scalar_reads),
array_iter_reads: Rc::clone(&self.array_iter_reads),
materialize_reads: Rc::clone(&self.materialize_reads),
}
}
fn has_key(&self, _key: &str) -> Option<bool> {
None
}
fn object_keys(&self) -> Option<Val> {
None
}
fn object_values(&self) -> Option<Val> {
None
}
fn object_entries(&self) -> Option<Val> {
None
}
fn pick_keys(&self, _keys: &[Arc<str>]) -> Option<Val> {
None
}
fn omit_keys(&self, _keys: &[Arc<str>]) -> Option<Val> {
None
}
fn index(&self, idx: i64) -> Self {
let idx = if idx >= 0 { Some(idx as usize) } else { None };
Self {
rows: Arc::clone(&self.rows),
idx,
scalar_reads: Rc::clone(&self.scalar_reads),
array_iter_reads: Rc::clone(&self.array_iter_reads),
materialize_reads: Rc::clone(&self.materialize_reads),
}
}
fn array_iter(&self) -> Option<Box<dyn Iterator<Item = Self> + 'a>> {
self.array_iter_reads
.set(self.array_iter_reads.get() + 1);
if self.idx.is_some() {
return None;
}
let rows = Arc::clone(&self.rows);
let scalar_reads = Rc::clone(&self.scalar_reads);
let array_iter_reads = Rc::clone(&self.array_iter_reads);
let materialize_reads = Rc::clone(&self.materialize_reads);
Some(Box::new((0..rows.len()).map(move |idx| Self {
rows: Arc::clone(&rows),
idx: Some(idx),
scalar_reads: Rc::clone(&scalar_reads),
array_iter_reads: Rc::clone(&array_iter_reads),
materialize_reads: Rc::clone(&materialize_reads),
})))
}
fn materialize(&self) -> Val {
self.materialize_reads.set(self.materialize_reads.get() + 1);
self.idx
.and_then(|idx| self.rows.get(idx).copied())
.map(Val::Int)
.unwrap_or(Val::Null)
}
}
#[test]
fn view_frontier_zero_demand_skips_source_access() {
let source = CountingView::root(&[1, 2, 3]);
let observed = Rc::new(Cell::new(0usize));
let observed_in_closure = Rc::clone(&observed);
let result = super::drive_view_frontier(
source.clone(),
SourceCapabilities::VIEW_ARRAY,
&[],
&[],
PullDemand::FirstInput(0),
move |_| {
observed_in_closure.set(observed_in_closure.get() + 1);
Some(super::ViewRowAction::Emit)
},
);
assert!(result.is_some());
assert_eq!(observed.get(), 0);
assert_eq!(source.scalar_reads(), 0);
assert_eq!(source.array_iter_reads(), 0);
assert_eq!(source.materialize_reads(), 0);
}
#[test]
fn view_full_runner_stops_after_until_output_demand_is_met() {
let source = CountingView::root(&[1, 2, 3]);
let body = PipelineBody {
stages: vec![
Stage::Filter(
Arc::new(crate::vm::Program::new(Vec::new(), "")),
crate::builtins::BuiltinViewStage::Filter,
),
Stage::UsizeBuiltin {
method: crate::builtins::BuiltinMethod::Take,
value: 2,
},
],
stage_exprs: Vec::new(),
sink: Sink::Collect,
stage_kernels: vec![
BodyKernel::CurrentCmpLit(BinOp::Gt, Val::Int(0)),
BodyKernel::Generic,
],
sink_kernels: Vec::new(),
};
let out = super::run_full(source.clone(), &body).unwrap().unwrap();
let out_json: serde_json::Value = out.into();
assert_eq!(out_json, serde_json::json!([1, 2]));
assert_eq!(source.scalar_reads(), 2);
}
#[test]
fn view_full_runner_stops_when_predicate_sink_result_is_decided() {
let source = CountingView::root(&[1, 2, 3, 4]);
let body = PipelineBody {
stages: Vec::new(),
stage_exprs: Vec::new(),
sink: Sink::Predicate(PredicateSinkSpec {
op: PredicateSinkOp::Any,
predicate: Arc::new(crate::vm::Program::new(Vec::new(), "")),
}),
stage_kernels: Vec::new(),
sink_kernels: vec![BodyKernel::CurrentCmpLit(BinOp::Gt, Val::Int(2))],
};
let out = super::run_full(source.clone(), &body).unwrap().unwrap();
assert_eq!(out, Val::Bool(true));
assert_eq!(source.scalar_reads(), 3);
assert_eq!(source.materialize_reads(), 0);
}
#[test]
fn view_full_runner_stops_when_all_sink_fails() {
let source = CountingView::root(&[1, 2, 3, 4]);
let body = PipelineBody {
stages: Vec::new(),
stage_exprs: Vec::new(),
sink: Sink::Predicate(PredicateSinkSpec {
op: PredicateSinkOp::All,
predicate: Arc::new(crate::vm::Program::new(Vec::new(), "")),
}),
stage_kernels: Vec::new(),
sink_kernels: vec![BodyKernel::CurrentCmpLit(BinOp::Lt, Val::Int(3))],
};
let out = super::run_full(source.clone(), &body).unwrap().unwrap();
assert_eq!(out, Val::Bool(false));
assert_eq!(source.scalar_reads(), 3);
assert_eq!(source.materialize_reads(), 0);
}
#[test]
fn view_full_runner_stops_when_membership_sink_result_is_decided() {
let source = CountingView::root(&[1, 2, 3, 4]);
let body = PipelineBody {
stages: Vec::new(),
stage_exprs: Vec::new(),
sink: Sink::Membership(MembershipSinkSpec {
op: MembershipSinkOp::Includes,
target: MembershipSinkTarget::Literal(Val::Int(3)),
method: crate::builtins::BuiltinMethod::Includes,
}),
stage_kernels: Vec::new(),
sink_kernels: Vec::new(),
};
let out = super::run_full(source.clone(), &body).unwrap().unwrap();
assert_eq!(out, Val::Bool(true));
assert_eq!(source.scalar_reads(), 3);
assert_eq!(source.materialize_reads(), 0);
}
#[test]
fn view_full_runner_stops_when_index_sink_matches() {
let source = CountingView::root(&[1, 2, 3, 4]);
let body = PipelineBody {
stages: Vec::new(),
stage_exprs: Vec::new(),
sink: Sink::Membership(MembershipSinkSpec {
op: MembershipSinkOp::Index,
target: MembershipSinkTarget::Literal(Val::Int(3)),
method: crate::builtins::BuiltinMethod::Index,
}),
stage_kernels: Vec::new(),
sink_kernels: Vec::new(),
};
let out = super::run_full(source.clone(), &body).unwrap().unwrap();
assert_eq!(out, Val::Int(2));
assert_eq!(source.scalar_reads(), 3);
assert_eq!(source.materialize_reads(), 0);
}
#[test]
fn view_full_runner_handles_select_many_first_and_last() {
let first_source = CountingView::root(&[1, 2, 3, 4]);
let first_body = PipelineBody {
stages: Vec::new(),
stage_exprs: Vec::new(),
sink: Sink::SelectMany {
n: 2,
from_end: false,
},
stage_kernels: Vec::new(),
sink_kernels: Vec::new(),
};
let first = super::run_full(first_source.clone(), &first_body)
.unwrap()
.unwrap();
let first_json: serde_json::Value = first.into();
assert_eq!(first_json, serde_json::json!([1, 2]));
assert_eq!(first_source.materialize_reads(), 2);
let last_source = CountingView::root(&[1, 2, 3, 4]);
let last_body = PipelineBody {
sink: Sink::SelectMany {
n: 2,
from_end: true,
},
..first_body
};
let last = super::run_full(last_source.clone(), &last_body)
.unwrap()
.unwrap();
let last_json: serde_json::Value = last.into();
assert_eq!(last_json, serde_json::json!([3, 4]));
assert_eq!(last_source.materialize_reads(), 2);
}
#[test]
fn view_full_runner_uses_direct_access_for_first_last_and_nth() {
let first_source = CountingView::root(&[1, 2, 3, 4]);
let first_body = PipelineBody {
stages: Vec::new(),
stage_exprs: Vec::new(),
sink: Sink::Terminal(crate::builtins::BuiltinMethod::First),
stage_kernels: Vec::new(),
sink_kernels: Vec::new(),
};
let first = super::run_full(first_source.clone(), &first_body)
.unwrap()
.unwrap();
let first_json: serde_json::Value = first.into();
assert_eq!(first_json, serde_json::json!(1));
assert_eq!(first_source.materialize_reads(), 0);
assert_eq!(first_source.array_iter_reads(), 0);
let last_source = CountingView::root(&[1, 2, 3, 4]);
let last_body = PipelineBody {
sink: Sink::Terminal(crate::builtins::BuiltinMethod::Last),
..first_body
};
let last = super::run_full(last_source.clone(), &last_body)
.unwrap()
.unwrap();
let last_json: serde_json::Value = last.into();
assert_eq!(last_json, serde_json::json!(4));
assert_eq!(last_source.materialize_reads(), 0);
assert_eq!(last_source.scalar_reads(), 2);
assert_eq!(last_source.array_iter_reads(), 0);
let nth_source = CountingView::root(&[1, 2, 3, 4]);
let nth_body = PipelineBody {
stages: Vec::new(),
stage_exprs: Vec::new(),
sink: Sink::Nth(2),
stage_kernels: Vec::new(),
sink_kernels: Vec::new(),
};
let nth = super::run_full(nth_source.clone(), &nth_body)
.unwrap()
.unwrap();
let nth_json: serde_json::Value = nth.into();
assert_eq!(nth_json, serde_json::json!(3));
assert_eq!(nth_source.materialize_reads(), 1);
assert_eq!(nth_source.scalar_reads(), 1);
assert_eq!(nth_source.array_iter_reads(), 0);
}
#[test]
fn view_frontier_zero_demand_does_not_touch_source() {
let source = CountingView::root(&[1, 2, 3, 4]);
let mut observed = 0usize;
super::drive_view_frontier(
source.clone(),
crate::exec::pipeline::SourceCapabilities::VIEW_ARRAY,
&[],
&[],
crate::plan::demand::PullDemand::LastInput(0),
|_| {
observed += 1;
Some(super::ViewRowAction::Emit)
},
)
.unwrap();
assert_eq!(observed, 0);
assert_eq!(source.scalar_reads(), 0);
assert_eq!(source.array_iter_reads(), 0);
assert_eq!(source.materialize_reads(), 0);
}
#[test]
fn view_frontier_ignores_overflowing_from_end_offset() {
assert_eq!(super::index_from_end(4, 0), Some(3));
assert_eq!(super::index_from_end(4, 3), Some(0));
assert_eq!(super::index_from_end(4, 4), None);
assert_eq!(super::index_from_end(4, usize::MAX), None);
}
#[test]
fn view_suffix_sink_marks_reversed_select_many_for_last_input() {
let sink = ViewSinkCapability::SelectMany {
n: 2,
from_end: true,
source_reversed: false,
};
let adjusted = super::view_suffix_sink_for_demand(sink, PullDemand::LastInput(2));
assert!(matches!(
adjusted,
ViewSinkCapability::SelectMany {
n: 2,
from_end: true,
source_reversed: true
}
));
}
#[test]
fn view_runner_applies_late_map_only_to_demanded_rows() {
let map_stage = Stage::Map(
Arc::new(crate::vm::Program::new(Vec::new(), "")),
crate::builtins::BuiltinViewStage::Map,
);
let first_source = CountingView::root(&[1, 2, 3, 4]);
let first_body = PipelineBody {
stages: vec![map_stage.clone()],
stage_exprs: Vec::new(),
sink: Sink::Terminal(crate::builtins::BuiltinMethod::First),
stage_kernels: vec![BodyKernel::Current],
sink_kernels: Vec::new(),
};
let first = super::run_terminal_select_projection(first_source.clone(), &first_body)
.unwrap()
.unwrap();
assert_eq!(first, Val::Int(1));
assert_eq!(first_source.scalar_reads(), 2);
assert_eq!(first_source.array_iter_reads(), 0);
let last_source = CountingView::root(&[1, 2, 3, 4]);
let last_body = PipelineBody {
sink: Sink::Terminal(crate::builtins::BuiltinMethod::Last),
..first_body
};
let last = super::run_terminal_select_projection(last_source.clone(), &last_body)
.unwrap()
.unwrap();
assert_eq!(last, Val::Int(4));
assert_eq!(last_source.scalar_reads(), 2);
assert_eq!(last_source.array_iter_reads(), 0);
let take_source = CountingView::root(&[1, 2, 3, 4]);
let take_body = PipelineBody {
stages: vec![
map_stage,
Stage::UsizeBuiltin {
method: crate::builtins::BuiltinMethod::Take,
value: 3,
},
],
stage_exprs: Vec::new(),
sink: Sink::Collect,
stage_kernels: vec![BodyKernel::Current, BodyKernel::Generic],
sink_kernels: Vec::new(),
};
let take = super::run_terminal_collect(take_source.clone(), &take_body)
.unwrap()
.unwrap();
let take_json: serde_json::Value = take.into();
assert_eq!(take_json, serde_json::json!([1, 2, 3]));
assert_eq!(take_source.scalar_reads(), 3);
let nth_source = CountingView::root(&[1, 2, 3, 4]);
let nth_body = PipelineBody {
stages: vec![Stage::Map(
Arc::new(crate::vm::Program::new(Vec::new(), "")),
crate::builtins::BuiltinViewStage::Map,
)],
stage_exprs: Vec::new(),
sink: Sink::Nth(2),
stage_kernels: vec![BodyKernel::Current],
sink_kernels: Vec::new(),
};
let nth = super::run_terminal_select_projection(nth_source.clone(), &nth_body)
.unwrap()
.unwrap();
assert_eq!(nth, Val::Int(3));
assert_eq!(nth_source.scalar_reads(), 2);
assert_eq!(nth_source.array_iter_reads(), 0);
}
#[test]
fn view_full_runner_handles_literal_membership_sinks_without_materializing_scalars() {
let includes_source = CountingView::root(&[1, 2, 3, 4]);
let includes_body = PipelineBody {
stages: Vec::new(),
stage_exprs: Vec::new(),
sink: Sink::Membership(MembershipSinkSpec {
op: MembershipSinkOp::Includes,
target: MembershipSinkTarget::Literal(Val::Int(3)),
method: crate::builtins::BuiltinMethod::Includes,
}),
stage_kernels: Vec::new(),
sink_kernels: Vec::new(),
};
let includes = super::run_full(includes_source.clone(), &includes_body)
.unwrap()
.unwrap();
assert_eq!(includes, Val::Bool(true));
assert_eq!(includes_source.materialize_reads(), 0);
assert_eq!(includes_source.scalar_reads(), 3);
let index_source = CountingView::root(&[1, 2, 3, 4]);
let index_body = PipelineBody {
sink: Sink::Membership(MembershipSinkSpec {
op: MembershipSinkOp::Index,
target: MembershipSinkTarget::Literal(Val::Int(4)),
method: crate::builtins::BuiltinMethod::Index,
}),
..includes_body
};
let index = super::run_full(index_source.clone(), &index_body)
.unwrap()
.unwrap();
assert_eq!(index, Val::Int(3));
assert_eq!(index_source.materialize_reads(), 0);
assert_eq!(index_source.scalar_reads(), 4);
let indices_source = CountingView::root(&[1, 2, 1, 3]);
let indices_body = PipelineBody {
sink: Sink::Membership(MembershipSinkSpec {
op: MembershipSinkOp::IndicesOf,
target: MembershipSinkTarget::Literal(Val::Int(1)),
method: crate::builtins::BuiltinMethod::IndicesOf,
}),
..index_body
};
let indices = super::run_full(indices_source.clone(), &indices_body)
.unwrap()
.unwrap();
let indices_json: serde_json::Value = indices.into();
assert_eq!(indices_json, serde_json::json!([0, 2]));
assert_eq!(indices_source.materialize_reads(), 0);
assert_eq!(indices_source.scalar_reads(), 4);
}
#[test]
fn view_full_runner_evaluates_dynamic_membership_targets_once() {
let mut root = IndexMap::new();
root.insert(Arc::<str>::from("needle"), Val::Int(3));
let env = Env::new(Val::obj(root));
let target = Arc::new(Compiler::compile_str("$.needle").unwrap());
let includes_source = CountingView::root(&[1, 2, 3, 4]);
let includes_body = PipelineBody {
stages: Vec::new(),
stage_exprs: Vec::new(),
sink: Sink::Membership(MembershipSinkSpec {
op: MembershipSinkOp::Includes,
target: MembershipSinkTarget::Program(Arc::clone(&target)),
method: crate::builtins::BuiltinMethod::Includes,
}),
stage_kernels: Vec::new(),
sink_kernels: Vec::new(),
};
let mut vm = crate::vm::VM::new();
let includes = super::run_full_with_env(
includes_source.clone(),
&includes_body,
Some(&env),
&mut vm,
)
.unwrap()
.unwrap();
assert_eq!(includes, Val::Bool(true));
assert_eq!(includes_source.materialize_reads(), 0);
assert_eq!(includes_source.scalar_reads(), 3);
let index_source = CountingView::root(&[1, 2, 3, 4]);
let index_body = PipelineBody {
sink: Sink::Membership(MembershipSinkSpec {
op: MembershipSinkOp::Index,
target: MembershipSinkTarget::Program(Arc::clone(&target)),
method: crate::builtins::BuiltinMethod::Index,
}),
..includes_body
};
let index =
super::run_full_with_env(index_source.clone(), &index_body, Some(&env), &mut vm)
.unwrap()
.unwrap();
assert_eq!(index, Val::Int(2));
assert_eq!(index_source.materialize_reads(), 0);
assert_eq!(index_source.scalar_reads(), 3);
let indices_source = CountingView::root(&[3, 1, 3, 4]);
let indices_body = PipelineBody {
sink: Sink::Membership(MembershipSinkSpec {
op: MembershipSinkOp::IndicesOf,
target: MembershipSinkTarget::Program(target),
method: crate::builtins::BuiltinMethod::IndicesOf,
}),
..index_body
};
let indices =
super::run_full_with_env(indices_source.clone(), &indices_body, Some(&env), &mut vm)
.unwrap()
.unwrap();
let indices_json: serde_json::Value = indices.into();
assert_eq!(indices_json, serde_json::json!([0, 2]));
assert_eq!(indices_source.materialize_reads(), 0);
assert_eq!(indices_source.scalar_reads(), 4);
}
#[test]
fn view_full_runner_handles_arg_extreme_sinks_lazily() {
let max_source = CountingView::root(&[2, 1, 4, 3]);
let max_body = PipelineBody {
stages: Vec::new(),
stage_exprs: Vec::new(),
sink: Sink::ArgExtreme(ArgExtremeSinkSpec {
want_max: true,
key: Arc::new(crate::vm::Program::new(Vec::new(), "")),
}),
stage_kernels: Vec::new(),
sink_kernels: vec![BodyKernel::Current],
};
let max = super::run_full(max_source.clone(), &max_body)
.unwrap()
.unwrap();
assert_eq!(max, Val::Int(4));
assert_eq!(max_source.scalar_reads(), 4);
assert_eq!(max_source.materialize_reads(), 2);
let min_source = CountingView::root(&[3, 4, 1, 2]);
let min_body = PipelineBody {
sink: Sink::ArgExtreme(ArgExtremeSinkSpec {
want_max: false,
key: Arc::new(crate::vm::Program::new(Vec::new(), "")),
}),
..max_body
};
let min = super::run_full(min_source.clone(), &min_body)
.unwrap()
.unwrap();
assert_eq!(min, Val::Int(1));
assert_eq!(min_source.scalar_reads(), 4);
assert_eq!(min_source.materialize_reads(), 2);
}
#[test]
fn terminal_collect_plan_accepts_view_native_prefix_and_final_map() {
let body = PipelineBody {
stages: vec![
Stage::Filter(
Arc::new(crate::vm::Program::new(Vec::new(), "")),
crate::builtins::BuiltinViewStage::Filter,
),
Stage::Map(
Arc::new(crate::vm::Program::new(Vec::new(), "")),
crate::builtins::BuiltinViewStage::Map,
),
],
stage_exprs: Vec::new(),
sink: Sink::Collect,
stage_kernels: vec![
BodyKernel::CurrentCmpLit(BinOp::Gt, Val::Int(1)),
BodyKernel::Current,
],
sink_kernels: Vec::new(),
};
let plan = super::terminal_collect_plan(&body).unwrap();
assert_eq!(plan.prefix.len(), 1);
assert!(matches!(plan.prefix[0], ViewStageCapability::Filter { .. }));
assert!(matches!(plan.collect_program.kernel(), BodyKernel::Current));
}
#[test]
fn terminal_collect_plan_accepts_current_row_collect_without_final_map() {
let body = PipelineBody {
stages: vec![
Stage::Filter(
Arc::new(crate::vm::Program::new(Vec::new(), "")),
crate::builtins::BuiltinViewStage::Filter,
),
Stage::UsizeBuiltin {
method: crate::builtins::BuiltinMethod::Take,
value: 1,
},
],
stage_exprs: Vec::new(),
sink: Sink::Collect,
stage_kernels: vec![
BodyKernel::CurrentCmpLit(BinOp::Gt, Val::Int(1)),
BodyKernel::Generic,
],
sink_kernels: Vec::new(),
};
let plan = super::terminal_collect_plan(&body).unwrap();
assert_eq!(plan.prefix.len(), 2);
assert!(matches!(plan.prefix[0], ViewStageCapability::Filter { .. }));
assert!(matches!(plan.prefix[1], ViewStageCapability::Take(1)));
assert!(matches!(plan.collect_program.kernel(), BodyKernel::Current));
}
#[test]
fn terminal_collect_plan_composes_trailing_projection_builtins() {
let call = crate::builtins::BuiltinCall {
method: crate::builtins::BuiltinMethod::Upper,
args: crate::builtins::BuiltinArgs::None,
};
assert!(call.spec().view_scalar);
let body = PipelineBody {
stages: vec![
Stage::Map(
Arc::new(crate::vm::Program::new(Vec::new(), "")),
crate::builtins::BuiltinViewStage::Map,
),
Stage::Builtin(call),
],
stage_exprs: Vec::new(),
sink: Sink::Collect,
stage_kernels: vec![
BodyKernel::FieldRead(Arc::from("name")),
BodyKernel::Generic,
],
sink_kernels: Vec::new(),
};
let plan = super::terminal_collect_plan(&body).unwrap();
assert!(plan.prefix.is_empty());
assert!(matches!(
plan.collect_program.kernel(),
BodyKernel::Compose { .. }
));
}
#[test]
fn terminal_collect_plan_composes_trailing_object_key_builtins() {
let call = crate::builtins::BuiltinCall {
method: crate::builtins::BuiltinMethod::HasKey,
args: crate::builtins::BuiltinArgs::Str(Arc::from("isbn")),
};
assert!(call.method.is_view_object_key_method());
let body = PipelineBody {
stages: vec![Stage::Builtin(call)],
stage_exprs: Vec::new(),
sink: Sink::Collect,
stage_kernels: vec![BodyKernel::Generic],
sink_kernels: Vec::new(),
};
let plan = super::terminal_collect_plan(&body).unwrap();
assert!(plan.prefix.is_empty());
assert!(matches!(
plan.collect_program.kernel(),
BodyKernel::BuiltinCall { .. }
));
}
#[test]
fn terminal_collect_current_row_runner_stops_after_demand_is_met() {
let source = CountingView::root(&[1, 2, 3]);
let body = PipelineBody {
stages: vec![
Stage::Filter(
Arc::new(crate::vm::Program::new(Vec::new(), "")),
crate::builtins::BuiltinViewStage::Filter,
),
Stage::UsizeBuiltin {
method: crate::builtins::BuiltinMethod::Take,
value: 1,
},
],
stage_exprs: Vec::new(),
sink: Sink::Collect,
stage_kernels: vec![
BodyKernel::CurrentCmpLit(BinOp::Gt, Val::Int(1)),
BodyKernel::Generic,
],
sink_kernels: Vec::new(),
};
let out = super::run_terminal_collect(source.clone(), &body)
.unwrap()
.unwrap();
let out_json: serde_json::Value = out.into();
assert_eq!(out_json, serde_json::json!([2]));
assert_eq!(source.scalar_reads(), 3);
}
#[test]
fn terminal_collect_accepts_flat_map_frontier_prefix() {
let source = Val::from(&serde_json::json!([
{"items": [1, 2, 3]},
{"items": [4]}
]));
let body = PipelineBody {
stages: vec![
Stage::FlatMap(
Arc::new(crate::vm::Program::new(Vec::new(), "")),
crate::builtins::BuiltinViewStage::FlatMap,
),
Stage::UsizeBuiltin {
method: crate::builtins::BuiltinMethod::Take,
value: 2,
},
],
stage_exprs: Vec::new(),
sink: Sink::Collect,
stage_kernels: vec![
BodyKernel::FieldRead(Arc::from("items")),
BodyKernel::Generic,
],
sink_kernels: Vec::new(),
};
let plan = super::terminal_collect_plan(&body).unwrap();
assert_eq!(plan.prefix.len(), 2);
assert!(matches!(
plan.prefix[0],
ViewStageCapability::FlatMap { .. }
));
let out = super::run_terminal_collect(ValView::new(&source), &body)
.unwrap()
.unwrap();
let out_json: serde_json::Value = out.into();
assert_eq!(out_json, serde_json::json!([1, 2]));
}
#[test]
fn view_distinct_stage_feeds_count_sink_without_materializing_rows() {
let source = CountingView::root(&[7, 8, 7, 9, 8, 7]);
let body = PipelineBody {
stages: vec![Stage::UniqueBy(None)],
stage_exprs: Vec::new(),
sink: Sink::Reducer(crate::exec::pipeline::ReducerSpec::count()),
stage_kernels: vec![BodyKernel::Generic],
sink_kernels: Vec::new(),
};
let out = super::run_full(source.clone(), &body).unwrap().unwrap();
assert_eq!(out, Val::Int(3));
assert_eq!(source.scalar_reads(), 6);
assert_eq!(source.materialize_reads(), 0);
}
#[test]
fn reducing_count_by_stage_materializes_only_final_boundary_value() {
let source = CountingView::root(&[1, 2, 1, 3, 2, 1]);
let body = PipelineBody {
stages: vec![Stage::ExprBuiltin {
method: crate::builtins::BuiltinMethod::CountBy,
body: Arc::new(crate::vm::Program::new(Vec::new(), "")),
}],
stage_exprs: Vec::new(),
sink: Sink::Terminal(crate::builtins::BuiltinMethod::First),
stage_kernels: vec![BodyKernel::Current],
sink_kernels: Vec::new(),
};
let env = Env::new(Val::Null);
let mut vm = crate::vm::VM::new();
let out = super::run_reducing_stage_prefix_then_materialized_suffix(
source.clone(),
&body,
None,
&env,
&mut vm,
)
.unwrap()
.unwrap();
let out_json: serde_json::Value = out.into();
assert_eq!(out_json, serde_json::json!({"1": 3, "2": 2, "3": 1}));
assert_eq!(source.scalar_reads(), 6);
assert_eq!(source.materialize_reads(), 0);
}
#[test]
fn reducing_index_by_stage_uses_shared_keyed_reducer_path() {
let source = CountingView::root(&[1, 2, 1, 3]);
let body = PipelineBody {
stages: vec![Stage::ExprBuiltin {
method: crate::builtins::BuiltinMethod::IndexBy,
body: Arc::new(crate::vm::Program::new(Vec::new(), "")),
}],
stage_exprs: Vec::new(),
sink: Sink::Terminal(crate::builtins::BuiltinMethod::First),
stage_kernels: vec![BodyKernel::Current],
sink_kernels: Vec::new(),
};
let env = Env::new(Val::Null);
let mut vm = crate::vm::VM::new();
let out = super::run_reducing_stage_prefix_then_materialized_suffix(
source.clone(),
&body,
None,
&env,
&mut vm,
)
.unwrap()
.unwrap();
let out_json: serde_json::Value = out.into();
assert_eq!(out_json, serde_json::json!({"1": 1, "2": 2, "3": 3}));
assert_eq!(source.scalar_reads(), 4);
assert_eq!(source.materialize_reads(), 4);
}
}