use std::sync::Arc;
use crate::builtins::{
BuiltinCancellation, BuiltinMethod, BuiltinNumericReducer, BuiltinViewStage,
};
use crate::data::context::{Env, EvalError};
use crate::data::value::Val;
use crate::parse::ast::Expr;
mod capability;
mod collector;
mod columnar;
mod common;
mod composed;
mod exec;
mod indexed_exec;
mod ir;
mod kernels;
pub(crate) mod logical_lower;
mod lower;
pub(crate) mod materialized_exec;
mod nested;
mod operator;
mod plan;
mod reducer;
mod row_program;
mod row_source;
mod sink_accumulator;
mod symbolic;
mod val_stage_flow;
pub(crate) use capability::{
view_capabilities, view_prefix_capabilities, SourceAccessMode, SourceCapabilities,
ViewInputMode, ViewMaterialization, ViewMembershipTarget, ViewOutputMode, ViewSinkCapability,
ViewStageCapability,
};
pub(crate) use collector::{TerminalCollector, TerminalMapCollector};
pub(crate) use common::{
apply_item_in_env, bounded_sort_by_key, bounded_sort_by_key_cmp, cmp_val_total, is_truthy,
num_finalise, num_fold, num_fold_f64, num_fold_i64, ordered_by_key_cmp, walk_field_chain,
BoundedKeySorter, OrderedKeySorter,
};
#[cfg(test)]
pub use ir::Strategy;
pub use ir::{
FallbackBoundary, LateProjection, PayloadDemand, PhysicalExecPath, Plan, Position, SinkDemand,
StageStrategy,
};
pub use kernels::{eval_cmp_op, eval_kernel, BodyKernel};
pub(crate) use kernels::{
eval_kernel_with_vm, eval_view_kernel, CollectLayout, ObjectKernel, ViewKernelValue,
};
pub(crate) use lower::{compile_pipeline_expr_body, compile_sort_spec};
pub use operator::{
ArgExtremeSinkSpec, MembershipSinkOp, MembershipSinkSpec, MembershipSinkTarget,
PredicateSinkOp, PredicateSinkSpec, ReducerOp, ReducerSpec,
};
#[cfg(test)]
pub use plan::compute_strategies;
#[cfg(test)]
pub use plan::plan;
#[cfg(test)]
pub use plan::plan_with_kernels;
#[cfg(test)]
pub use plan::select_strategy;
pub use plan::{compute_strategies_with_kernels, plan_with_exprs, select_exec_path};
pub(crate) use reducer::ReducerAccumulator;
pub(crate) use row_program::RowProgram;
pub(crate) use sink_accumulator::SinkAccumulator;
pub(crate) enum StageFlow<T> {
Continue(T),
SkipRow,
Stop,
TerminalCollected,
}
#[allow(dead_code)]
pub(crate) fn run_tape_field_chain(
body: &PipelineBody,
tape: &crate::data::tape::TapeData,
keys: &[Arc<str>],
base_env: &Env,
) -> Option<Result<Val, EvalError>> {
let mut vm = crate::vm::VM::new();
materialized_exec::run_tape_field_chain_with_vm(body, tape, keys, base_env, &mut vm)
}
pub(crate) fn run_tape_field_chain_with_vm(
body: &PipelineBody,
tape: &crate::data::tape::TapeData,
keys: &[Arc<str>],
base_env: &Env,
vm: &mut crate::vm::VM,
) -> Option<Result<Val, EvalError>> {
materialized_exec::run_tape_field_chain_with_vm(body, tape, keys, base_env, vm)
}
pub trait PipelineData {
fn promote_objvec(&self, arr: &Arc<Vec<Val>>) -> Option<Arc<crate::data::value::ObjVecData>>;
}
use std::sync::atomic::{AtomicU8, Ordering};
static TRACE_INIT: AtomicU8 = AtomicU8::new(0);
#[inline]
pub(crate) fn trace_enabled() -> bool {
let v = TRACE_INIT.load(Ordering::Relaxed);
if v != 0 {
return v == 2;
}
let on = std::env::var_os("JETRO_PIPELINE_TRACE").is_some();
TRACE_INIT.store(if on { 2 } else { 1 }, Ordering::Relaxed);
on
}
fn sink_name(s: &Sink) -> &'static str {
match s {
Sink::Collect => "collect",
Sink::Reducer(spec) => match spec.op {
ReducerOp::Count => "count",
ReducerOp::Numeric(NumOp::Sum) => "sum",
ReducerOp::Numeric(NumOp::Min) => "min",
ReducerOp::Numeric(NumOp::Max) => "max",
ReducerOp::Numeric(NumOp::Avg) => "avg",
},
Sink::Membership(spec) => match spec.op {
MembershipSinkOp::Includes => "includes",
MembershipSinkOp::Index => "index",
MembershipSinkOp::IndicesOf => "indices_of",
},
Sink::Predicate(spec) => match spec.op {
PredicateSinkOp::Any => "any",
PredicateSinkOp::All => "all",
PredicateSinkOp::FindIndex => "find_index",
PredicateSinkOp::IndicesWhere => "indices_where",
PredicateSinkOp::FindOne => "find_one",
},
Sink::ArgExtreme(spec) if spec.want_max => "max_by",
Sink::ArgExtreme(_) => "min_by",
Sink::Terminal(BuiltinMethod::First) => "first",
Sink::Terminal(BuiltinMethod::Last) => "last",
Sink::SelectMany {
from_end: false, ..
} => "first_n",
Sink::SelectMany { from_end: true, .. } => "last_n",
Sink::Nth(_) => "nth",
Sink::Terminal(_) => "terminal",
Sink::ApproxCountDistinct => "approx_count_distinct",
}
}
fn source_name(s: &Source) -> &'static str {
match s {
Source::Receiver(_) => "receiver",
Source::FieldChain { .. } => "field_chain",
}
}
fn expr_label(e: &Expr) -> &'static str {
match e {
Expr::Chain(_, _) => "chain",
Expr::Pipeline { .. } => "pipeline",
Expr::Object(_) => "object",
Expr::Array(_) => "array",
Expr::ListComp { .. } => "list_comp",
Expr::DictComp { .. } => "dict_comp",
Expr::Let { .. } => "let",
Expr::Patch { .. } => "patch",
Expr::Lambda { .. } => "lambda",
Expr::IfElse { .. } => "if_else",
Expr::BinOp(_, _, _) => "binop",
Expr::Root => "root_only",
_ => "other",
}
}
#[derive(Debug, Clone)]
pub enum Source {
Receiver(Val),
FieldChain { keys: Arc<[Arc<str>]> },
}
impl Source {
pub(crate) fn capabilities(&self) -> SourceCapabilities {
match self {
Source::Receiver(_) => SourceCapabilities::MATERIALIZED_ARRAY,
Source::FieldChain { .. } => SourceCapabilities::MATERIALIZED_ARRAY,
}
}
}
pub type PipelineBuiltinCall = crate::builtins::BuiltinCall;
#[derive(Debug, Clone)]
pub struct SortSpec {
pub key: Option<Arc<crate::vm::Program>>,
pub descending: bool,
}
impl SortSpec {
pub fn identity() -> Self {
Self {
key: None,
descending: false,
}
}
pub fn keyed(key: Arc<crate::vm::Program>, descending: bool) -> Self {
Self {
key: Some(key),
descending,
}
}
}
#[derive(Debug, Clone)]
pub enum Stage {
Filter(Arc<crate::vm::Program>, BuiltinViewStage),
Map(Arc<crate::vm::Program>, BuiltinViewStage),
FlatMap(Arc<crate::vm::Program>, BuiltinViewStage),
Reverse(BuiltinCancellation),
UniqueBy(Option<Arc<crate::vm::Program>>),
Sort(SortSpec),
Builtin(PipelineBuiltinCall),
UsizeBuiltin {
method: BuiltinMethod,
value: usize,
},
StringBuiltin {
method: BuiltinMethod,
value: Arc<str>,
},
StringPairBuiltin {
method: BuiltinMethod,
first: Arc<str>,
second: Arc<str>,
},
IntRangeBuiltin {
method: BuiltinMethod,
start: i64,
end: Option<i64>,
},
CompiledMap(Arc<Plan>),
ExprBuiltin {
method: BuiltinMethod,
body: Arc<crate::vm::Program>,
},
SortedDedup(Option<Arc<crate::vm::Program>>),
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum NumOp {
Sum,
Min,
Max,
Avg,
}
impl NumOp {
pub(crate) fn from_builtin_reducer(reducer: BuiltinNumericReducer) -> Self {
match reducer {
BuiltinNumericReducer::Sum => NumOp::Sum,
BuiltinNumericReducer::Avg => NumOp::Avg,
BuiltinNumericReducer::Min => NumOp::Min,
BuiltinNumericReducer::Max => NumOp::Max,
}
}
pub(crate) fn method(self) -> BuiltinMethod {
match self {
NumOp::Sum => BuiltinMethod::Sum,
NumOp::Min => BuiltinMethod::Min,
NumOp::Max => BuiltinMethod::Max,
NumOp::Avg => BuiltinMethod::Avg,
}
}
fn empty(self) -> Val {
match self {
NumOp::Sum => Val::Int(0),
NumOp::Avg => Val::Null,
NumOp::Min => Val::Null,
NumOp::Max => Val::Null,
}
}
}
impl Sink {
pub(crate) fn reducer_spec(&self) -> Option<ReducerSpec> {
match self {
Sink::Reducer(spec) => Some(spec.clone()),
_ => None,
}
}
}
#[derive(Debug, Clone)]
pub enum Sink {
Collect,
Reducer(ReducerSpec),
Predicate(PredicateSinkSpec),
Membership(MembershipSinkSpec),
ArgExtreme(ArgExtremeSinkSpec),
Terminal(BuiltinMethod),
SelectMany {
n: usize,
from_end: bool,
},
Nth(usize),
ApproxCountDistinct,
}
#[derive(Debug, Clone)]
pub struct Pipeline {
pub source: Source,
pub stages: Vec<Stage>,
pub stage_exprs: Vec<Option<Arc<Expr>>>,
pub sink: Sink,
pub stage_kernels: Vec<BodyKernel>,
pub sink_kernels: Vec<BodyKernel>,
pub source_demand: SinkDemand,
#[allow(dead_code)]
pub payload_demand: PayloadDemand,
pub late_projection: Option<LateProjection>,
#[allow(dead_code)]
pub(crate) source_capabilities: SourceCapabilities,
#[allow(dead_code)]
pub(crate) source_payload_lanes_supported: bool,
#[allow(dead_code)]
pub(crate) source_selected_materialization_supported: bool,
pub(crate) source_access: SourceAccessMode,
#[allow(dead_code)]
pub fallback_boundary: FallbackBoundary,
pub exec_path: PhysicalExecPath,
}
#[derive(Debug, Clone)]
pub struct PipelineBody {
pub stages: Vec<Stage>,
pub stage_exprs: Vec<Option<Arc<Expr>>>,
pub sink: Sink,
pub stage_kernels: Vec<BodyKernel>,
pub sink_kernels: Vec<BodyKernel>,
}
impl PipelineBody {
pub(crate) fn planned(
stages: Vec<Stage>,
stage_exprs: Vec<Option<Arc<Expr>>>,
sink: Sink,
) -> Self {
let kernels = classify_stage_kernels(&stages, &stage_exprs);
let plan_result = plan_with_exprs(stages, stage_exprs, &kernels, sink);
let stage_kernels = classify_stage_kernels(&plan_result.stages, &plan_result.stage_exprs);
let sink_kernels = plan_result.sink.body_kernels();
Self {
stages: plan_result.stages,
stage_exprs: plan_result.stage_exprs,
sink: plan_result.sink,
stage_kernels,
sink_kernels,
}
}
pub(crate) fn can_run_with_view(&self) -> bool {
capability::view_capabilities(self).is_some()
}
#[inline]
pub fn with_source(self, source: Source) -> Pipeline {
let exec_path = select_exec_path(&self.stages, &self.sink);
let source_demand = self.source_demand();
let payload_demand = Pipeline::segment_payload_demand(
&self.stages,
&self.stage_kernels,
&self.sink,
&self.sink_kernels,
);
let late_projection = Pipeline::late_projection_for(&self.stages, &self.stage_kernels);
let source_capabilities = source.capabilities();
let source_access = source_capabilities.choose_access(source_demand.chain.pull);
let source_payload_lanes_supported = source_capabilities
.supports_payload_lanes(&payload_demand.scan_need, &payload_demand.result_need);
let source_selected_materialization_supported =
source_capabilities.supports_selected_materialization(source_demand.chain.pull);
let fallback_boundary = Pipeline::fallback_boundary_for(&self.stages, exec_path);
Pipeline {
source,
exec_path,
source_demand,
payload_demand,
late_projection,
source_capabilities,
source_payload_lanes_supported,
source_selected_materialization_supported,
source_access,
fallback_boundary,
stages: self.stages,
stage_exprs: self.stage_exprs,
sink: self.sink,
stage_kernels: self.stage_kernels,
sink_kernels: self.sink_kernels,
}
}
}
fn classify_stage_kernels(stages: &[Stage], exprs: &[Option<Arc<Expr>>]) -> Vec<BodyKernel> {
stages
.iter()
.enumerate()
.map(|(idx, stage)| {
exprs
.get(idx)
.and_then(|expr| expr.as_ref())
.map(|expr| BodyKernel::classify_expr(expr))
.filter(|kernel| !matches!(kernel, BodyKernel::Generic))
.unwrap_or_else(|| stage.body_kernel())
})
.collect()
}
impl Pipeline {
#[inline]
pub fn into_source_body(self) -> (Source, PipelineBody) {
let body = PipelineBody {
stages: self.stages,
stage_exprs: self.stage_exprs,
sink: self.sink,
stage_kernels: self.stage_kernels,
sink_kernels: self.sink_kernels,
};
(self.source, body)
}
pub fn canonical(&self) -> (Vec<Stage>, Vec<BodyKernel>, Sink) {
(
self.stages.clone(),
self.stage_kernels.clone(),
self.sink.clone(),
)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::parse::ast::{Arg, BinOp, Expr, Step};
use crate::parse::parser;
fn lower_query(q: &str) -> Option<Pipeline> {
let expr = parser::parse(q).ok()?;
Pipeline::lower(&expr)
}
fn only_stage_expr(p: &Pipeline) -> &Expr {
assert_eq!(p.stage_exprs.len(), p.stages.len());
p.stage_exprs[0]
.as_ref()
.expect("expected optimized stage expression")
.as_ref()
}
fn assert_price_qty_gt_100(expr: &Expr) {
match expr {
Expr::BinOp(lhs, BinOp::Gt, rhs) => {
assert_price_qty_mul(lhs);
assert!(matches!(rhs.as_ref(), Expr::Int(100)), "{expr:#?}");
}
_ => panic!("expected `(price * qty) > 100`, got {expr:#?}"),
}
}
fn assert_price_qty_mul(expr: &Expr) {
match expr {
Expr::BinOp(lhs, BinOp::Mul, rhs) => {
assert!(
matches!(lhs.as_ref(), Expr::Ident(name) if name == "price"),
"{expr:#?}"
);
assert!(
matches!(rhs.as_ref(), Expr::Ident(name) if name == "qty"),
"{expr:#?}"
);
}
_ => panic!("expected `price * qty`, got {expr:#?}"),
}
}
fn demand_paths(need: &crate::plan::demand::FieldDemand) -> Vec<String> {
match need {
crate::plan::demand::FieldDemand::None => Vec::new(),
crate::plan::demand::FieldDemand::Whole => vec!["*".to_string()],
crate::plan::demand::FieldDemand::Fields(fields) => fields
.paths()
.iter()
.map(|path| {
path.keys()
.iter()
.map(|key| key.as_ref())
.collect::<Vec<_>>()
.join(".")
})
.collect(),
}
}
fn assert_pipeline_matches_vm(query: &str, doc: serde_json::Value) {
assert_pipeline_matches_vm_query(query, query, doc);
}
fn assert_pipeline_matches_vm_query(
pipeline_query: &str,
vm_query: &str,
doc: serde_json::Value,
) {
let pipeline = lower_query(pipeline_query).expect("query should lower to pipeline");
let root = Val::from(&doc);
let actual: serde_json::Value = pipeline
.run(&root)
.expect("pipeline execution should succeed")
.into();
let mut vm = crate::vm::VM::new();
let expected = vm
.run_str(vm_query, &doc)
.expect("VM execution should succeed");
assert_eq!(
actual, expected,
"pipeline diverged from VM for {pipeline_query}"
);
}
#[test]
fn lower_field_chain_only() {
let p = lower_query("$.a.b.c").unwrap();
assert!(matches!(p.source, Source::FieldChain { .. }));
assert!(p.stages.is_empty());
assert!(matches!(p.sink, Sink::Collect));
}
#[test]
fn row_source_keeps_objvec_as_streaming_provider() {
let keys: std::sync::Arc<[std::sync::Arc<str>]> =
vec![std::sync::Arc::<str>::from("id")].into();
let data = std::sync::Arc::new(crate::data::value::ObjVecData {
keys,
cells: vec![Val::Int(1), Val::Int(2)],
typed_cols: None,
});
let recv = Val::ObjVec(std::sync::Arc::clone(&data));
let source = row_source::ValRowSource::from_receiver(&recv);
assert!(source.is_objvec_streaming());
let mut iter = source.iter();
assert!(matches!(iter, row_source::ValRowsIter::ObjVec { .. }));
assert_eq!(iter.next().unwrap().get_field("id"), Val::Int(1));
assert_eq!(iter.next().unwrap().get_field("id"), Val::Int(2));
assert!(iter.next().is_none());
}
#[test]
fn tape_row_source_walks_field_chain_array_lazily() {
let tape = crate::data::tape::TapeData::parse(
br#"{"books":[{"id":1},{"id":2}],"skip":[3]}"#.to_vec(),
)
.unwrap();
let keys = vec![std::sync::Arc::<str>::from("books")];
let source = row_source::TapeRowSource::from_field_chain(&tape, &keys);
assert!(source.is_array_provider());
use crate::data::view::ValueView;
let mut iter = source.iter_views();
assert!(matches!(iter, row_source::TapeRowsIter::Array { .. }));
assert_eq!(
iter.next().unwrap().materialize().get_field("id"),
Val::Int(1)
);
assert_eq!(
iter.next().unwrap().materialize().get_field("id"),
Val::Int(2)
);
assert!(iter.next().is_none());
}
#[test]
fn receiver_pipeline_start_uses_builtin_metadata() {
assert!(Pipeline::is_receiver_pipeline_start(&Step::Method(
"filter".into(),
vec![Arg::Pos(Expr::Bool(true))]
)));
assert!(Pipeline::is_receiver_pipeline_start(&Step::Method(
"sum".into(),
Vec::new()
)));
assert!(Pipeline::is_receiver_pipeline_start(&Step::Method(
"first".into(),
Vec::new()
)));
assert!(Pipeline::is_receiver_pipeline_start(&Step::Method(
"count_by".into(),
vec![Arg::Pos(Expr::Ident("kind".into()))]
)));
assert!(!Pipeline::is_receiver_pipeline_start(&Step::Method(
"from_json".into(),
Vec::new()
)));
}
#[test]
fn lower_take_skip_sum() {
let p = lower_query("$.xs.skip(2).take(5).sum()").unwrap();
assert_eq!(p.stages.len(), 2);
assert!(matches!(
p.stages[0],
Stage::UsizeBuiltin {
method: BuiltinMethod::Skip,
value: 2
}
));
assert!(matches!(
p.stages[1],
Stage::UsizeBuiltin {
method: BuiltinMethod::Take,
value: 5
}
));
assert!(
matches!(&p.sink, Sink::Reducer(spec) if spec.op == ReducerOp::Numeric(NumOp::Sum))
);
}
#[test]
fn lower_pure_builtin_uses_generic_builtin_stage() {
let p = lower_query("$.names.upper().starts_with(\"A\")").unwrap();
assert_eq!(p.stages.len(), 2);
assert!(matches!(
p.stages[0],
Stage::Builtin(PipelineBuiltinCall {
method: crate::builtins::BuiltinMethod::Upper,
..
})
));
assert!(matches!(
p.stages[1],
Stage::Builtin(PipelineBuiltinCall {
method: crate::builtins::BuiltinMethod::StartsWith,
..
})
));
assert!(matches!(p.sink, Sink::Collect));
}
#[test]
fn lower_pipeline_builtin_uses_builtin_owned_allowlist() {
let p = lower_query("$.items.schema()").unwrap();
assert_eq!(p.stages.len(), 1);
assert!(matches!(
p.stages[0],
Stage::Builtin(PipelineBuiltinCall {
method: crate::builtins::BuiltinMethod::Schema,
..
})
));
}
#[test]
fn lower_whole_receiver_builtin_not_as_per_element_stage() {
assert!(lower_query("$.items.join(\",\")").is_none());
}
#[test]
fn lower_returns_none_for_unsupported_shape() {
assert!(lower_query("$.xs.equi_join($.ys, lhs, rhs)").is_none());
assert!(lower_query("@.x.filter(y > 0)").is_none());
}
#[test]
fn debug_filter_pred_shape() {
let expr = crate::parse::parser::parse("@.total > 100").unwrap();
let prog = crate::compile::compiler::Compiler::compile(&expr, "");
eprintln!("PRED OPS = {:#?}", prog.ops);
}
#[test]
fn method_chain_scalar_filter_lowers_from_builtin_view_metadata() {
let p = lower_query("$.people.filter(name.len() == 3).take(1).map(name)").unwrap();
assert!(matches!(
&p.stage_kernels[0],
BodyKernel::CmpLit { lhs, .. }
if matches!(
lhs.as_ref(),
BodyKernel::BuiltinCall { receiver, call }
if call.spec().view_scalar
&& matches!(receiver.as_ref(), BodyKernel::FieldRead(k) if k.as_ref() == "name")
)
));
assert!(p.stage_kernels[0].is_view_native());
}
#[test]
fn method_chain_scalar_filter_lowers_static_arg_view_builtin() {
let p = lower_query(r#"$.people.filter(name.starts_with("a")).take(1).map(name)"#).unwrap();
assert!(matches!(
&p.stage_kernels[0],
BodyKernel::BuiltinCall { receiver, call }
if call.spec().view_scalar
&& call.method == BuiltinMethod::StartsWith
&& matches!(receiver.as_ref(), BodyKernel::FieldRead(k) if k.as_ref() == "name")
));
assert!(p.stage_kernels[0].is_view_native());
}
#[test]
fn method_chain_scalar_filter_lowers_another_static_arg_view_builtin() {
let p = lower_query(r#"$.people.filter(name.ends_with("a")).take(1).map(name)"#).unwrap();
assert!(matches!(
&p.stage_kernels[0],
BodyKernel::BuiltinCall { receiver, call }
if call.spec().view_scalar
&& call.method == BuiltinMethod::EndsWith
&& matches!(receiver.as_ref(), BodyKernel::FieldRead(k) if k.as_ref() == "name")
));
assert!(p.stage_kernels[0].is_view_native());
}
#[test]
fn method_chain_scalar_filter_lowers_string_predicate_view_builtin() {
let p = lower_query(r#"$.people.filter(name.matches("ad")).take(1).map(name)"#).unwrap();
assert!(matches!(
&p.stage_kernels[0],
BodyKernel::BuiltinCall { receiver, call }
if call.spec().view_scalar
&& call.method == BuiltinMethod::Matches
&& matches!(receiver.as_ref(), BodyKernel::FieldRead(k) if k.as_ref() == "name")
));
assert!(p.stage_kernels[0].is_view_native());
}
#[test]
fn method_chain_scalar_filter_lowers_string_index_view_builtin() {
let p =
lower_query(r#"$.people.filter(name.index_of("d") >= 1).take(1).map(name)"#).unwrap();
assert!(matches!(
&p.stage_kernels[0],
BodyKernel::CmpLit { lhs, .. }
if matches!(
lhs.as_ref(),
BodyKernel::BuiltinCall { receiver, call }
if call.spec().view_scalar
&& call.method == BuiltinMethod::IndexOf
&& matches!(receiver.as_ref(), BodyKernel::FieldRead(k) if k.as_ref() == "name")
)
));
assert!(p.stage_kernels[0].is_view_native());
}
#[test]
fn method_chain_scalar_filter_lowers_no_arg_string_view_builtin() {
let p = lower_query(r#"$.people.filter(code.is_numeric()).take(1).map(code)"#).unwrap();
assert!(matches!(
&p.stage_kernels[0],
BodyKernel::BuiltinCall { receiver, call }
if call.spec().view_scalar
&& call.method == BuiltinMethod::IsNumeric
&& matches!(receiver.as_ref(), BodyKernel::FieldRead(k) if k.as_ref() == "code")
));
assert!(p.stage_kernels[0].is_view_native());
}
#[test]
fn method_chain_scalar_filter_lowers_no_arg_numeric_string_view_builtin() {
let p = lower_query(r#"$.people.filter(code.byte_len() == 3).take(1).map(code)"#).unwrap();
assert!(matches!(
&p.stage_kernels[0],
BodyKernel::CmpLit { lhs, .. }
if matches!(
lhs.as_ref(),
BodyKernel::BuiltinCall { receiver, call }
if call.spec().view_scalar
&& call.method == BuiltinMethod::ByteLen
&& matches!(receiver.as_ref(), BodyKernel::FieldRead(k) if k.as_ref() == "code")
)
));
assert!(p.stage_kernels[0].is_view_native());
}
#[test]
fn method_chain_scalar_filter_lowers_numeric_view_builtin() {
let p = lower_query(r#"$.people.filter(score.abs() > 10).take(1).map(score)"#).unwrap();
assert!(matches!(
&p.stage_kernels[0],
BodyKernel::CmpLit { lhs, .. }
if matches!(
lhs.as_ref(),
BodyKernel::BuiltinCall { receiver, call }
if call.spec().view_scalar
&& call.method == BuiltinMethod::Abs
&& matches!(receiver.as_ref(), BodyKernel::FieldRead(k) if k.as_ref() == "score")
)
));
assert!(p.stage_kernels[0].is_view_native());
}
#[test]
fn method_chain_scalar_filter_lowers_float_numeric_view_builtin() {
let p = lower_query(r#"$.people.filter(score.round() == 10).take(1).map(score)"#).unwrap();
assert!(matches!(
&p.stage_kernels[0],
BodyKernel::CmpLit { lhs, .. }
if matches!(
lhs.as_ref(),
BodyKernel::BuiltinCall { receiver, call }
if call.spec().view_scalar
&& call.method == BuiltinMethod::Round
&& matches!(receiver.as_ref(), BodyKernel::FieldRead(k) if k.as_ref() == "score")
)
));
assert!(p.stage_kernels[0].is_view_native());
}
#[test]
fn run_count_on_simple_array() {
use serde_json::json;
let doc: Val = (&json!({"orders":[
{"total": 50}, {"total": 150}, {"total": 200}
]}))
.into();
let p = lower_query("$.orders.filter(total > 100).count()").unwrap();
let out = p.run(&doc).unwrap();
assert_eq!(out, Val::Int(2));
}
#[test]
fn run_sum_on_simple_array() {
use serde_json::json;
let doc: Val = (&json!({"xs":[1, 2, 3, 4, 5]})).into();
let p = lower_query("$.xs.sum()").unwrap();
let out = p.run(&doc).unwrap();
assert_eq!(out, Val::Int(15));
}
#[test]
fn run_filter_map_sum() {
use serde_json::json;
let doc: Val = (&json!({"orders":[
{"id": 1, "total": 50},
{"id": 2, "total": 150},
{"id": 3, "total": 200}
]}))
.into();
let p = lower_query("$.orders.filter(total > 100).map(total).sum()").unwrap();
let out = p.run(&doc).unwrap();
assert_eq!(out, Val::Int(350));
}
#[test]
fn compiled_map_plan_preserves_preclassified_kernels() {
use serde_json::json;
let p = lower_query("$.rows.map(items.map(qty * price).sum())").unwrap();
assert_eq!(p.exec_path, PhysicalExecPath::Composed);
let Stage::CompiledMap(plan) = &p.stages[0] else {
panic!("expected compiled nested map stage");
};
assert_eq!(plan.stages.len(), plan.stage_kernels.len());
assert_eq!(plan.stages.len(), plan.stage_exprs.len());
assert!(matches!(plan.source, Source::FieldChain { .. }));
assert!(plan
.stage_kernels
.iter()
.chain(plan.sink_kernels.iter())
.any(|kernel| matches!(kernel, BodyKernel::Binary { .. })));
let doc: Val = (&json!({
"rows": [
{"items": [{"qty": 2, "price": 10}, {"qty": 3, "price": 5}]},
{"items": [{"qty": 1, "price": 7}]}
]
}))
.into();
let out = p.run(&doc).unwrap();
assert!(crate::util::vals_deep_eq(
&out,
&Val::arr(vec![Val::Int(35), Val::Int(7)])
));
}
#[test]
fn compiled_map_receiver_plan_runs_from_current_row() {
use serde_json::json;
let p = lower_query("$.groups.map(@.map(qty * price).sum())").unwrap();
let Stage::CompiledMap(plan) = &p.stages[0] else {
panic!("expected compiled nested map stage");
};
assert!(matches!(plan.source, Source::Receiver(_)));
let doc: Val = (&json!({
"groups": [
[{"qty": 2, "price": 10}, {"qty": 3, "price": 5}],
[{"qty": 1, "price": 7}]
]
}))
.into();
let out = p.run(&doc).unwrap();
assert!(crate::util::vals_deep_eq(
&out,
&Val::arr(vec![Val::Int(35), Val::Int(7)])
));
}
#[test]
fn run_terminal_count_predicate() {
use serde_json::json;
let doc: Val = (&json!({"orders":[
{"total": 50}, {"total": 150}, {"total": 200}
]}))
.into();
let p = lower_query("$.orders.count(total > 100)").unwrap();
let out = p.run(&doc).unwrap();
assert_eq!(out, Val::Int(2));
}
#[test]
fn run_terminal_numeric_projection() {
use serde_json::json;
let doc: Val = (&json!({"orders":[
{"total": 50}, {"total": 150}, {"total": 200}
]}))
.into();
let p = lower_query("$.orders.sum(total)").unwrap();
let out = p.run(&doc).unwrap();
assert_eq!(out, Val::Int(400));
}
#[test]
fn rewrite_skip_skip_merges() {
let p = lower_query("$.xs.skip(2).skip(3).sum()").unwrap();
assert_eq!(p.stages.len(), 1);
assert!(matches!(
p.stages[0],
Stage::UsizeBuiltin {
method: BuiltinMethod::Skip,
value: 5
}
));
}
#[test]
fn rewrite_take_take_merges_min() {
let p = lower_query("$.xs.take(10).take(3).sum()").unwrap();
assert_eq!(p.stages.len(), 1);
assert!(matches!(
p.stages[0],
Stage::UsizeBuiltin {
method: BuiltinMethod::Take,
value: 3
}
));
}
#[test]
fn rewrite_filter_filter_merges_via_andop() {
let p = lower_query("$.orders.filter(total > 100).filter(qty > 0).sum()").unwrap();
assert_eq!(p.stages.len(), 1);
match &p.stages[0] {
Stage::Filter(prog, _) => {
assert!(prog
.ops
.iter()
.any(|o| matches!(o, crate::vm::Opcode::AndOp(_))));
}
_ => panic!("expected merged Filter"),
}
assert!(
matches!(&p.stage_kernels[0], BodyKernel::And(predicates) if predicates.len() == 2)
);
assert!(p.stage_kernels[0].is_view_native());
}
#[test]
fn predicate_fusion_collapses_long_filter_runs_after_planning() {
let p = lower_query(
"$.orders.filter(total > 100).filter(qty > 0).filter(active == true).filter(region == \"eu\").count()",
)
.unwrap();
assert_eq!(p.stages.len(), 1);
assert!(matches!(&p.stages[0], Stage::Filter(_, _)));
assert!(
matches!(&p.stage_kernels[0], BodyKernel::And(predicates) if predicates.len() == 4)
);
assert!(p.stage_kernels[0].is_view_native());
}
#[test]
fn rewrite_map_then_count_drops_map() {
let p = lower_query("$.orders.map(total).count()").unwrap();
assert_eq!(p.stages.len(), 0);
assert!(matches!(p.sink, Sink::Reducer(ref spec) if spec.op == ReducerOp::Count));
}
#[test]
fn demand_optimizer_drops_value_only_work_for_count() {
let p = lower_query("$.orders.map(total).upper().count()").unwrap();
assert!(p.stages.is_empty());
assert!(matches!(p.sink, Sink::Reducer(ref spec) if spec.op == ReducerOp::Count));
}
#[test]
fn terminal_count_predicate_lowers_to_reducer_predicate() {
let p = lower_query("$.orders.count(total > 10)").unwrap();
assert!(p.stages.is_empty());
assert!(
matches!(&p.sink, Sink::Reducer(spec) if spec.op == ReducerOp::Count && spec.predicate.is_some())
);
}
#[test]
fn terminal_numeric_projection_lowers_to_reducer_projection() {
let p = lower_query("$.orders.sum(total)").unwrap();
assert!(p.stages.is_empty());
assert!(
matches!(&p.sink, Sink::Reducer(spec) if spec.op == ReducerOp::Numeric(NumOp::Sum) && spec.projection.is_some())
);
}
#[test]
fn keyed_reducer_pipelines_match_vm() {
use serde_json::json;
let doc = json!({
"orders": [
{"status": "pending", "id": 1},
{"status": "shipped", "id": 2},
{"status": "pending", "id": 3}
]
});
assert_pipeline_matches_vm("$.orders.group_by(status)", doc.clone());
assert_pipeline_matches_vm("$.orders.count_by(status)", doc.clone());
assert_pipeline_matches_vm("$.orders.index_by(id)", doc);
}
#[test]
fn keyed_reducer_materialized_fallback_matches_vm() {
use serde_json::json;
let doc = json!({
"orders": [
{"status": "pending", "id": "a"},
{"status": "shipped", "id": "b"},
{"status": "pending", "id": "c"}
]
});
assert_pipeline_matches_vm("$.orders.count_by(status.upper())", doc.clone());
assert_pipeline_matches_vm("$.orders.index_by(id.upper())", doc);
}
#[test]
fn keyed_reducers_preserve_upstream_projection_rows() {
use serde_json::json;
let doc = json!({
"orders": [
{"status": "pending", "id": 1, "price": 10},
{"status": "shipped", "id": 2, "price": 20},
{"status": "pending", "id": 3, "price": 30}
]
});
assert_pipeline_matches_vm(
"$.orders.map({status: status, label: id}).group_by(status)",
doc.clone(),
);
assert_pipeline_matches_vm("$.orders.map({id: id, label: status}).index_by(id)", doc);
}
#[test]
fn demand_optimizer_pulls_filter_through_map_for_count() {
let p = lower_query("$.orders.map(total).filter(@ > 10).count()").unwrap();
assert_eq!(p.stages.len(), 1);
assert!(matches!(p.stages[0], Stage::Filter(_, _)));
assert!(matches!(p.sink, Sink::Reducer(ref spec) if spec.op == ReducerOp::Count));
}
#[test]
fn demand_optimizer_pulls_filter_through_computed_map_for_count() {
let p = lower_query("$.orders.map(price * qty).filter(@ > 100).count()").unwrap();
assert_eq!(p.stages.len(), 1);
assert!(matches!(p.stages[0], Stage::Filter(_, _)));
assert_price_qty_gt_100(only_stage_expr(&p));
assert!(matches!(p.sink, Sink::Reducer(ref spec) if spec.op == ReducerOp::Count));
}
#[test]
fn demand_optimizer_pulls_filter_through_method_chain_map_for_count() {
let p =
lower_query("$.users.map(name.trim().upper()).filter(@ == \"ADA\").count()").unwrap();
assert_eq!(p.stages.len(), 1);
assert!(matches!(p.stages[0], Stage::Filter(_, _)));
assert!(matches!(p.sink, Sink::Reducer(ref spec) if spec.op == ReducerOp::Count));
}
#[test]
fn demand_optimizer_simplifies_object_projection_after_substitution() {
let p = lower_query("$.orders.map({v: price * qty, id: id}).filter(@.v > 100).count()")
.unwrap();
assert_eq!(p.stages.len(), 1);
assert!(matches!(p.stages[0], Stage::Filter(_, _)));
assert_price_qty_gt_100(only_stage_expr(&p));
assert!(matches!(p.sink, Sink::Reducer(ref spec) if spec.op == ReducerOp::Count));
}
#[test]
fn demand_optimizer_simplifies_array_projection_after_substitution() {
let p = lower_query("$.orders.map([price * qty, id]).filter(@[0] > 100).count()").unwrap();
assert_eq!(p.stages.len(), 1);
assert!(matches!(p.stages[0], Stage::Filter(_, _)));
assert_price_qty_gt_100(only_stage_expr(&p));
assert!(matches!(p.sink, Sink::Reducer(ref spec) if spec.op == ReducerOp::Count));
}
#[test]
fn demand_optimizer_pulls_filter_through_map_but_keeps_map_for_collect() {
let p = lower_query("$.orders.map(total).filter(@ > 10)").unwrap();
assert_eq!(p.stages.len(), 2);
assert!(matches!(p.stages[0], Stage::Filter(_, _)));
assert!(matches!(p.stages[1], Stage::Map(_, _)));
assert!(matches!(p.sink, Sink::Collect));
}
#[test]
fn demand_optimizer_removes_order_only_work_for_numeric_sink() {
let p = lower_query("$.orders.sort().reverse().map(total).sum()").unwrap();
assert!(p.stages.is_empty());
assert!(
matches!(&p.sink, Sink::Reducer(spec) if spec.op == ReducerOp::Numeric(NumOp::Sum) && spec.projection.is_some())
);
}
#[test]
fn demand_optimizer_keeps_membership_work_for_numeric_sink() {
let p = lower_query("$.orders.take(2).map(total).sum()").unwrap();
assert_eq!(p.stages.len(), 1);
assert!(matches!(
p.stages[0],
Stage::UsizeBuiltin {
method: BuiltinMethod::Take,
value: 2
}
));
assert!(
matches!(&p.sink, Sink::Reducer(spec) if spec.op == ReducerOp::Numeric(NumOp::Sum) && spec.projection.is_some())
);
}
#[test]
fn payload_demand_delays_map_for_last_selection() {
use serde_json::json;
let p = lower_query("$.books.map(isbn).last()").unwrap();
let demand = p.payload_demand();
assert_eq!(demand_paths(&demand.scan_need), Vec::<String>::new());
assert_eq!(demand_paths(&demand.result_need), vec!["isbn"]);
assert_eq!(
p.source_demand().chain.pull,
crate::plan::demand::PullDemand::LastInput(1)
);
assert!(matches!(
p.source_access,
SourceAccessMode::IndexedFromEnd(0)
));
assert!(!p.source_capabilities.tape_view);
assert!(matches!(
p.late_projection,
Some(LateProjection { prefix_len: 0, .. })
));
assert_eq!(p.fallback_boundary, FallbackBoundary::None);
assert!(
matches!(p.late_projection.as_ref().map(|projection| &projection.kernel), Some(BodyKernel::FieldRead(field)) if field.as_ref() == "isbn")
);
assert_pipeline_matches_vm(
"$.books.map(isbn).last()",
json!({
"books": [
{"isbn": "first"},
{"isbn": "last"}
]
}),
);
}
#[test]
fn payload_demand_delays_map_for_first_and_nth_selection() {
let first = lower_query("$.books.map(isbn).first()").unwrap();
let first_demand = first.payload_demand();
assert_eq!(demand_paths(&first_demand.scan_need), Vec::<String>::new());
assert_eq!(demand_paths(&first_demand.result_need), vec!["isbn"]);
assert_eq!(
first.source_demand().chain.pull,
crate::plan::demand::PullDemand::FirstInput(1)
);
assert!(matches!(first.source_access, SourceAccessMode::Indexed(0)));
assert!(matches!(
first.late_projection,
Some(LateProjection { prefix_len: 0, .. })
));
let nth = lower_query("$.books.map(isbn).nth(2)").unwrap();
let nth_demand = nth.payload_demand();
assert_eq!(demand_paths(&nth_demand.scan_need), Vec::<String>::new());
assert_eq!(demand_paths(&nth_demand.result_need), vec!["isbn"]);
assert_eq!(
nth.source_demand().chain.pull,
crate::plan::demand::PullDemand::NthInput(2)
);
assert!(matches!(nth.source_access, SourceAccessMode::Indexed(2)));
assert!(matches!(
nth.late_projection,
Some(LateProjection { prefix_len: 0, .. })
));
}
#[test]
fn late_projection_positional_sinks_match_vm() {
use serde_json::json;
let doc = json!({
"books": [
{"isbn": "a"},
{"isbn": "b"},
{"isbn": "c"}
]
});
for query in [
"$.books.map(isbn).first()",
"$.books.map(isbn).take(2)",
"$.books.map(isbn).nth(1)",
] {
assert_pipeline_matches_vm(query, doc.clone());
}
}
#[test]
fn positional_sinks_choose_direct_source_access() {
assert!(matches!(
lower_query("$.books.map(isbn).first()")
.unwrap()
.source_access,
SourceAccessMode::Indexed(0)
));
assert!(matches!(
lower_query("$.books.map(isbn).nth(2)")
.unwrap()
.source_access,
SourceAccessMode::Indexed(2)
));
assert!(matches!(
lower_query("$.books.map(isbn).last()")
.unwrap()
.source_access,
SourceAccessMode::IndexedFromEnd(0)
));
}
#[test]
fn compact_participates_in_filter_like_demand() {
use serde_json::json;
let p = lower_query("$.xs.compact().last()").unwrap();
assert_eq!(
p.source_demand().chain.pull,
crate::plan::demand::PullDemand::LastInput(1)
);
let root = Val::from(&json!({"xs": [null, 1, null, 2]}));
assert_eq!(p.run(&root).unwrap(), Val::Int(2));
}
#[test]
fn compact_first_scans_until_non_null_output() {
use serde_json::json;
let p = lower_query("$.xs.compact().first()").unwrap();
assert_eq!(
p.source_demand().chain.pull,
crate::plan::demand::PullDemand::UntilOutput(1)
);
assert!(matches!(p.source_access, SourceAccessMode::Forward));
let root = Val::from(&json!({"xs": [null, null, 3, 4]}));
assert_eq!(p.run(&root).unwrap(), Val::Int(3));
}
#[test]
fn remove_value_participates_in_filter_like_demand() {
use serde_json::json;
let p = lower_query("$.xs.remove(2).last()").unwrap();
assert_eq!(
p.source_demand().chain.pull,
crate::plan::demand::PullDemand::LastInput(1)
);
let root = Val::from(&json!({"xs": [1, 2, 3, 2]}));
assert_eq!(p.run(&root).unwrap(), Val::Int(3));
}
#[test]
fn payload_demand_splits_filter_scan_from_late_projection() {
let p = lower_query("$.books.filter(price > 20).map(isbn).last()").unwrap();
let demand = p.payload_demand();
assert_eq!(demand_paths(&demand.scan_need), vec!["price"]);
assert_eq!(demand_paths(&demand.result_need), vec!["isbn"]);
assert!(matches!(
p.late_projection,
Some(LateProjection { prefix_len: 1, .. })
));
assert_eq!(p.fallback_boundary, FallbackBoundary::None);
assert!(
matches!(p.late_projection.as_ref().map(|projection| &projection.kernel), Some(BodyKernel::FieldRead(field)) if field.as_ref() == "isbn")
);
assert_eq!(
p.source_demand().chain.pull,
crate::plan::demand::PullDemand::LastInput(1)
);
}
#[test]
fn late_projection_accepts_object_key_builtin_stages() {
let stages = vec![Stage::Builtin(crate::builtins::BuiltinCall::new(
BuiltinMethod::HasKey,
crate::builtins::BuiltinArgs::Str(Arc::from("isbn")),
))];
let projection = Pipeline::late_projection_for(&stages, &[]).unwrap();
assert_eq!(projection.prefix_len, 0);
assert!(matches!(
&projection.kernel,
BodyKernel::BuiltinCall { call, .. } if call.method == BuiltinMethod::HasKey
));
}
#[test]
fn literal_path_helpers_store_preparsed_paths() {
let p = lower_query(r#"$.books.map(@.get_path("user.name")).last()"#).unwrap();
assert!(matches!(
p.late_projection.as_ref().map(|projection| &projection.kernel),
Some(BodyKernel::BuiltinCall { call, .. })
if matches!(call.args, crate::builtins::BuiltinArgs::Path(_))
));
}
#[test]
fn late_projection_composes_chained_maps() {
let query = "$.books.map(user).map(name).last()";
let p = lower_query(query).unwrap();
let demand = p.payload_demand();
assert_eq!(demand_paths(&demand.scan_need), Vec::<String>::new());
assert_eq!(demand_paths(&demand.result_need), vec!["user.name"]);
assert!(p.source_capabilities.field_key_read);
assert!(p.source_capabilities.selected_row_materialization);
assert!(p.source_payload_lanes_supported);
assert!(p.source_selected_materialization_supported);
assert!(matches!(
p.late_projection,
Some(LateProjection { prefix_len: 0, .. })
));
assert!(
matches!(
p.late_projection.as_ref().map(|projection| &projection.kernel),
Some(BodyKernel::FieldChain(keys)) if keys.iter().map(|key| key.as_ref()).collect::<Vec<_>>() == ["user", "name"]
),
"{:?}",
p.late_projection
);
}
#[test]
fn payload_demand_prefixes_scan_and_result_lanes_through_map() {
let p = lower_query("$.books.map(user).filter(@.active).map(name).last()").unwrap();
let demand = p.payload_demand();
assert_eq!(demand_paths(&demand.scan_need), vec!["user.active"]);
assert_eq!(demand_paths(&demand.result_need), vec!["name"]);
assert!(p.source_payload_lanes_supported);
assert!(p.source_selected_materialization_supported);
assert!(matches!(
p.late_projection,
Some(LateProjection { prefix_len: 1, .. })
));
}
#[test]
fn selected_materialization_planning_tracks_pull_demand() {
assert!(
lower_query("$.books.map(isbn).first()")
.unwrap()
.source_selected_materialization_supported
);
assert!(
lower_query("$.books.filter(price > 20).map(isbn).take(2)")
.unwrap()
.source_selected_materialization_supported
);
assert!(
!lower_query("$.books.map(isbn)")
.unwrap()
.source_selected_materialization_supported
);
}
#[test]
fn payload_demand_prefixes_object_projection_fields() {
let p = lower_query("$.books.map(@.user).map({name, city: address.city}).last()").unwrap();
let demand = p.payload_demand();
assert_eq!(demand_paths(&demand.scan_need), Vec::<String>::new());
assert_eq!(
demand_paths(&demand.result_need),
vec!["name", "address.city"]
);
assert!(p.source_payload_lanes_supported);
}
#[test]
fn late_projection_guard_respects_prefix_and_barriers() {
let p = lower_query("$.books.filter(price > 20).map(isbn).last()").unwrap();
assert!(p.can_apply_late_projection_from(0));
assert!(p.can_apply_late_projection_from(1));
assert!(!p.can_apply_late_projection_from(2));
let p = lower_query("$.books.unique().map(isbn).last()").unwrap();
assert!(matches!(
p.fallback_boundary,
FallbackBoundary::LegacyStage { index: 0 }
));
assert!(!p.can_apply_late_projection_from(0));
}
#[test]
fn sink_late_projection_support_is_demand_aware() {
use crate::plan::demand::PullDemand;
assert!(Sink::Collect.supports_late_projection(PullDemand::FirstInput(2)));
assert!(!Sink::Collect.supports_late_projection(PullDemand::LastInput(2)));
assert!(Sink::Nth(3).supports_late_projection(PullDemand::NthInput(3)));
assert!(
Sink::Terminal(BuiltinMethod::Last).supports_late_projection(PullDemand::LastInput(1))
);
assert!(!Sink::Reducer(ReducerSpec::count()).supports_late_projection(PullDemand::All));
}
#[test]
fn object_lambda_stages_preserve_positional_demand() {
for query in [
"$.books.transform_values(@).last()",
"$.books.transform_keys(@).last()",
"$.books.filter_values(@ != null).last()",
"$.books.filter_keys(@ != \"debug\").last()",
] {
let p = lower_query(query).unwrap();
assert_eq!(
p.source_demand().chain.pull,
crate::plan::demand::PullDemand::LastInput(1),
"{query}"
);
}
}
#[test]
fn payload_demand_tracks_sort_filter_project_lanes() {
let p = lower_query("$.books.sort(-score).filter(price > 20).map(isbn).last()").unwrap();
let demand = p.payload_demand();
assert_eq!(demand_paths(&demand.scan_need), vec!["price", "score"]);
assert_eq!(demand_paths(&demand.result_need), vec!["isbn"]);
assert!(matches!(
p.late_projection,
Some(LateProjection { prefix_len: 2, .. })
));
assert_eq!(p.fallback_boundary, FallbackBoundary::None);
assert!(
matches!(p.late_projection.as_ref().map(|projection| &projection.kernel), Some(BodyKernel::FieldRead(field)) if field.as_ref() == "isbn")
);
}
#[test]
fn payload_demand_tracks_keyed_reducer_lanes() {
let count_by = lower_query("$.orders.count_by(status)").unwrap();
let demand = count_by.payload_demand();
assert_eq!(demand_paths(&demand.scan_need), vec!["status"]);
assert_eq!(demand_paths(&demand.result_need), Vec::<String>::new());
let group_by = lower_query("$.orders.group_by(status)").unwrap();
let demand = group_by.payload_demand();
assert_eq!(demand_paths(&demand.scan_need), vec!["*"]);
assert_eq!(demand_paths(&demand.result_need), Vec::<String>::new());
let index_by = lower_query("$.orders.index_by(id)").unwrap();
let demand = index_by.payload_demand();
assert_eq!(demand_paths(&demand.scan_need), vec!["*"]);
assert_eq!(demand_paths(&demand.result_need), Vec::<String>::new());
}
#[test]
fn fallback_boundary_marks_legacy_barrier_stage() {
let p = lower_query("$.books.flat_map(tags).last()").unwrap();
assert_eq!(
p.fallback_boundary,
FallbackBoundary::LegacyStage { index: 0 }
);
}
#[test]
fn demand_optimizer_computed_map_filter_count_matches_vm() {
use serde_json::json;
assert_pipeline_matches_vm(
"$.orders.map(price * qty).filter(@ > 100).count()",
json!({
"orders": [
{"price": 10, "qty": 3},
{"price": 25, "qty": 5},
{"price": 8, "qty": 20},
{"price": 50, "qty": 1}
]
}),
);
}
#[test]
fn demand_optimizer_object_projection_filter_count_matches_vm() {
use serde_json::json;
assert_pipeline_matches_vm(
"$.orders.map({v: price * qty, id: id}).filter(@.v > 100).count()",
json!({
"orders": [
{"id": "a", "price": 10, "qty": 3},
{"id": "b", "price": 25, "qty": 5},
{"id": "c", "price": 8, "qty": 20},
{"id": "d", "price": 50, "qty": 1}
]
}),
);
}
#[test]
fn demand_optimizer_array_projection_filter_count_matches_vm() {
use serde_json::json;
assert_pipeline_matches_vm(
"$.orders.map([price * qty, id]).filter(@[0] > 100).count()",
json!({
"orders": [
{"id": "a", "price": 10, "qty": 3},
{"id": "b", "price": 25, "qty": 5},
{"id": "c", "price": 8, "qty": 20},
{"id": "d", "price": 50, "qty": 1}
]
}),
);
}
#[test]
fn demand_optimizer_order_removal_numeric_sink_matches_vm() {
use serde_json::json;
assert_pipeline_matches_vm(
"$.orders.sort().reverse().map(total).sum()",
json!({
"orders": [
{"id": 1, "total": 7},
{"id": 2, "total": 40},
{"id": 3, "total": -2},
{"id": 4, "total": 9}
]
}),
);
}
#[test]
fn demand_optimizer_projected_numeric_sink_with_take_matches_vm() {
use serde_json::json;
assert_pipeline_matches_vm_query(
"$.orders.take(2).map(total).sum()",
"$.orders.first(2).map(total).sum()",
json!({
"orders": [
{"id": 1, "total": 7},
{"id": 2, "total": 40},
{"id": 3, "total": -2},
{"id": 4, "total": 9}
]
}),
);
}
#[test]
fn run_topn_smallest_three() {
use serde_json::json;
let doc: Val = (&json!({"xs":[5, 2, 8, 1, 4, 7, 3]})).into();
let p = lower_query("$.xs.sort().take(3)").unwrap();
let out = p.run(&doc).unwrap();
let out_json: serde_json::Value = out.into();
assert_eq!(out_json, json!([1, 2, 3]));
}
#[test]
fn sort_key_spellings_lower_to_canonical_sort_spec() {
let p = lower_query("$.rows.sort(-score).take(10)").unwrap();
assert!(matches!(&p.stages[0], Stage::Sort(spec) if spec.key.is_some() && spec.descending));
assert!(
matches!(p.stage_kernels[0], BodyKernel::FieldRead(ref k) if k.as_ref() == "score")
);
let p = lower_query("$.rows.sort_by(-score).take(10)").unwrap();
assert!(matches!(&p.stages[0], Stage::Sort(spec) if spec.key.is_some() && spec.descending));
assert!(
matches!(p.stage_kernels[0], BodyKernel::FieldRead(ref k) if k.as_ref() == "score")
);
}
#[test]
fn descending_sort_take_uses_bounded_topk_strategy_and_matches_vm() {
use serde_json::json;
let p = lower_query("$.rows.sort_by(-score).take(2)").unwrap();
let strategies = compute_strategies(&p.stages, &p.sink);
assert!(matches!(strategies[0], StageStrategy::SortTopK(2)));
assert_pipeline_matches_vm_query(
"$.rows.sort_by(-score).take(2)",
"$.rows.sort(-score).first(2)",
json!({
"rows": [
{"id": 1, "score": 10},
{"id": 2, "score": 30},
{"id": 3, "score": 20}
]
}),
);
}
#[test]
fn descending_sort_take_map_projects_only_after_topk_and_matches_vm() {
use serde_json::json;
let query = "$.rows.sort_by(-score).take(2).map(isbn)";
let p = lower_query(query).unwrap();
let strategies = compute_strategies_with_kernels(&p.stages, &p.stage_kernels, &p.sink);
assert!(matches!(strategies[0], StageStrategy::SortTopK(2)));
assert_pipeline_matches_vm_query(
query,
"$.rows.sort(-score).first(2).map(isbn)",
json!({
"rows": [
{"isbn": "low", "score": 10},
{"isbn": "top", "score": 30},
{"isbn": "mid", "score": 20}
]
}),
);
}
#[test]
fn ascending_sort_last_map_projects_only_after_bottomk_and_matches_vm() {
use serde_json::json;
let query = "$.rows.sort(score).map(isbn).last(2)";
let p = lower_query(query).unwrap();
let strategies = compute_strategies_with_kernels(&p.stages, &p.stage_kernels, &p.sink);
assert!(matches!(strategies[0], StageStrategy::SortBottomK(2)));
assert_pipeline_matches_vm_query(
query,
"$.rows.sort(score).map(isbn).last(2)",
json!({
"rows": [
{"isbn": "low", "score": 10},
{"isbn": "top", "score": 30},
{"isbn": "mid", "score": 20}
]
}),
);
}
#[test]
fn sort_take_while_take_uses_prefix_demand_without_key_correlation() {
let p = lower_query("$.rows.sort_by(-price).take_while(price > 10).take(2)").unwrap();
let strategies = compute_strategies_with_kernels(&p.stages, &p.stage_kernels, &p.sink);
assert!(matches!(strategies[0], StageStrategy::SortTopK(2)));
let p = lower_query("$.rows.sort_by(-score).take_while(price > 10).take(2)").unwrap();
let strategies = compute_strategies_with_kernels(&p.stages, &p.stage_kernels, &p.sink);
assert!(matches!(strategies[0], StageStrategy::SortTopK(2)));
let p = lower_query("$.rows.sort().take_while(price > 10).take(2)").unwrap();
let strategies = compute_strategies_with_kernels(&p.stages, &p.stage_kernels, &p.sink);
assert!(matches!(strategies[0], StageStrategy::SortTopK(2)));
let p = lower_query("$.rows.sort_by(-score).filter(price > 10).take(2)").unwrap();
let strategies = compute_strategies_with_kernels(&p.stages, &p.stage_kernels, &p.sink);
assert!(matches!(strategies[0], StageStrategy::SortUntilOutput(2)));
}
#[test]
fn sort_filter_take_uses_lazy_ordered_until_output_and_matches_vm() {
use serde_json::json;
let p = lower_query("$.rows.sort_by(-price).filter(test > 10).take(2)").unwrap();
let strategies = compute_strategies_with_kernels(&p.stages, &p.stage_kernels, &p.sink);
assert!(matches!(strategies[0], StageStrategy::SortUntilOutput(2)));
assert_pipeline_matches_vm_query(
"$.rows.sort_by(-price).filter(test > 10).take(2)",
"$.rows.sort(-price).filter(test > 10).first(2)",
json!({
"rows": [
{"id": 1, "price": 100, "test": 0},
{"id": 2, "price": 90, "test": 20},
{"id": 3, "price": 80, "test": 0},
{"id": 4, "price": 70, "test": 30}
]
}),
);
}
#[test]
fn sort_filter_map_last_uses_lazy_tail_scan_and_matches_vm() {
use serde_json::json;
let query = "$.rows.sort(-score).filter(price > 20).map(isbn).last()";
let p = lower_query(query).unwrap();
let strategies = compute_strategies_with_kernels(&p.stages, &p.stage_kernels, &p.sink);
assert!(matches!(strategies[0], StageStrategy::SortUntilOutput(1)));
assert_pipeline_matches_vm(
query,
json!({
"rows": [
{"isbn": "top-fails", "score": 100, "price": 10},
{"isbn": "answer", "score": 90, "price": 30},
{"isbn": "tail-fails", "score": 80, "price": 5}
]
}),
);
}
#[test]
fn sort_filter_map_last_many_uses_lazy_until_output_and_matches_vm() {
use serde_json::json;
let query = "$.rows.sort(-score).filter(price > 20).map(isbn).last(2)";
let p = lower_query(query).unwrap();
let strategies = compute_strategies_with_kernels(&p.stages, &p.stage_kernels, &p.sink);
assert!(matches!(strategies[0], StageStrategy::SortUntilOutput(2)));
assert_pipeline_matches_vm(
query,
json!({
"rows": [
{"isbn": "top-fails", "score": 100, "price": 10},
{"isbn": "high-pass", "score": 90, "price": 30},
{"isbn": "mid-fails", "score": 80, "price": 5},
{"isbn": "low-pass", "score": 70, "price": 40},
{"isbn": "tail-pass", "score": 60, "price": 50}
]
}),
);
}
#[test]
fn sort_drop_while_filter_map_last_preserves_prefix_boundary() {
use serde_json::json;
let query =
"$.rows.sort(-score).drop_while(name.contains(\"_test\")).filter(price > 20).map(isbn).last()";
let p = lower_query(query).unwrap();
let demand = p.source_demand();
assert_eq!(demand.chain.pull, crate::plan::demand::PullDemand::All);
assert!(matches!(
p.late_projection,
Some(LateProjection { prefix_len: 3, .. })
));
assert_pipeline_matches_vm(
query,
json!({
"rows": [
{"isbn": "dropped", "name": "top_test", "score": 100, "price": 99},
{"isbn": "kept-fails-filter", "name": "live", "score": 90, "price": 10},
{"isbn": "middle-pass", "name": "middle", "score": 80, "price": 30},
{"isbn": "answer", "name": "tail_test", "score": 70, "price": 50}
]
}),
);
}
#[test]
fn prefix_while_terminal_sinks_keep_safe_source_demand() {
let take_first = lower_query("$.rows.take_while(price > 20).first()").unwrap();
assert_eq!(
take_first.source_demand().chain.pull,
crate::plan::demand::PullDemand::FirstInput(1)
);
let take_last = lower_query("$.rows.take_while(price > 20).last()").unwrap();
assert_eq!(
take_last.source_demand().chain.pull,
crate::plan::demand::PullDemand::All
);
let drop_first = lower_query("$.rows.drop_while(price < 20).first()").unwrap();
assert_eq!(
drop_first.source_demand().chain.pull,
crate::plan::demand::PullDemand::All
);
let drop_last = lower_query("$.rows.drop_while(price < 20).last()").unwrap();
assert_eq!(
drop_last.source_demand().chain.pull,
crate::plan::demand::PullDemand::All
);
}
#[test]
fn sort_prefix_filter_last_does_not_shrink_before_filtering() {
use serde_json::json;
let query = "$.rows.sort(-score).filter(score > 80).map(isbn).last()";
let p = lower_query(query).unwrap();
let strategies = compute_strategies_with_kernels(&p.stages, &p.stage_kernels, &p.sink);
assert!(matches!(strategies[0], StageStrategy::SortUntilOutput(1)));
assert_pipeline_matches_vm(
query,
json!({
"rows": [
{"isbn": "top", "score": 100},
{"isbn": "answer", "score": 90},
{"isbn": "tail-fails", "score": 70}
]
}),
);
}
#[test]
fn sort_map_last_can_still_use_bounded_bottomk() {
use serde_json::json;
let query = "$.rows.sort(-score).map(isbn).last()";
let p = lower_query(query).unwrap();
let strategies = compute_strategies_with_kernels(&p.stages, &p.stage_kernels, &p.sink);
assert!(matches!(strategies[0], StageStrategy::SortBottomK(1)));
assert_pipeline_matches_vm(
query,
json!({
"rows": [
{"isbn": "top", "score": 100},
{"isbn": "answer", "score": 70},
{"isbn": "mid", "score": 90}
]
}),
);
}
#[test]
fn sort_scalar_element_last_can_use_bounded_bottomk() {
use serde_json::json;
let query = "$.rows.sort(-score).has_key(\"isbn\").last()";
let p = lower_query(query).unwrap();
let strategies = compute_strategies_with_kernels(&p.stages, &p.stage_kernels, &p.sink);
assert!(matches!(strategies[0], StageStrategy::SortBottomK(1)));
let out: serde_json::Value = p
.run(&Val::from(&json!({
"rows": [
{"isbn": "top", "score": 100},
{"score": 70},
{"isbn": "mid", "score": 90}
]
})))
.unwrap()
.into();
assert_eq!(out, json!(false));
}
#[test]
fn adversarial_bounded_demand_chains_match_vm() {
use serde_json::json;
let doc = json!({
"rows": [
{"isbn": "top-fails-price", "score": 100, "price": 10, "tags": ["a", "b"]},
{"isbn": "high-pass", "score": 90, "price": 30, "tags": []},
{"isbn": "middle-pass", "score": 80, "price": 40, "tags": ["c"]},
{"isbn": "low-fails-price", "score": 70, "price": 5, "tags": ["d", "e"]},
{"isbn": "tail-pass", "score": 60, "price": 50, "tags": ["f"]}
],
"dups": ["a", "b", "a", "c", "b"]
});
for (pipeline_query, vm_query) in [
(
"$.rows.filter(price > 20).map(isbn).last()",
"$.rows.filter(price > 20).map(isbn).last()",
),
(
"$.rows.filter(price > 20).map(isbn).first(2)",
"$.rows.filter(price > 20).map(isbn).first(2)",
),
(
"$.rows.filter(price > 20).map(isbn).last(2)",
"$.rows.filter(price > 20).map(isbn).last(2)",
),
(
"$.rows.map(price).filter(@ > 20).last()",
"$.rows.map(price).filter(@ > 20).last()",
),
(
"$.rows.take(2).filter(price > 20).last()",
"$.rows.first(2).filter(price > 20).last()",
),
(
"$.rows.take_while(price > 20).last()",
"$.rows.take_while(price > 20).last()",
),
(
"$.rows.drop_while(price < 20).last()",
"$.rows.drop_while(price < 20).last()",
),
(
"$.rows.sort(-score).drop_while(name.contains(\"_test\")).filter(price > 20).map(isbn).last()",
"$.rows.sort(-score).drop_while(name.contains(\"_test\")).filter(price > 20).map(isbn).last()",
),
(
"$.rows.sort(-score).filter(price > 20).map(isbn).last()",
"$.rows.sort(-score).filter(price > 20).map(isbn).last()",
),
(
"$.rows.sort(score).filter(price > 20).map(isbn).last()",
"$.rows.sort(score).filter(price > 20).map(isbn).last()",
),
(
"$.rows.sort(-score).filter(score > 80).map(isbn).last()",
"$.rows.sort(-score).filter(score > 80).map(isbn).last()",
),
(
"$.rows.sort(-score).filter(score < 80).map(isbn).last()",
"$.rows.sort(-score).filter(score < 80).map(isbn).last()",
),
(
"$.rows.sort(-score).filter(price > 20).map(isbn).nth(1)",
"$.rows.sort(-score).filter(price > 20).map(isbn).nth(1)",
),
(
"$.rows.sort(-score).take(3).filter(price > 20).map(isbn).last()",
"$.rows.sort(-score).first(3).filter(price > 20).map(isbn).last()",
),
(
"$.rows.reverse().filter(price > 20).map(isbn).last()",
"$.rows.reverse().filter(price > 20).map(isbn).last()",
),
("$.dups.unique().last()", "$.dups.unique().last()"),
] {
assert_pipeline_matches_vm_query(pipeline_query, vm_query, doc.clone());
}
let pipeline = lower_query("$.rows.skip(1).last()").unwrap();
let actual: serde_json::Value = pipeline.run(&Val::from(&doc)).unwrap().into();
assert_eq!(
actual,
json!({"isbn": "tail-pass", "score": 60, "price": 50, "tags": ["f"]})
);
let pipeline = lower_query("$.rows.flat_map(tags).last()").unwrap();
let actual: serde_json::Value = pipeline.run(&Val::from(&doc)).unwrap().into();
assert_eq!(actual, json!("f"));
let pipeline = lower_query("$.rows.sort(-score).flat_map(tags).last()").unwrap();
let actual: serde_json::Value = pipeline.run(&Val::from(&doc)).unwrap().into();
assert_eq!(actual, json!("f"));
}
#[test]
fn first_sink_stops_after_first_passing_filter_row() {
use serde_json::json;
let doc: Val = (&json!({
"data": [
{"score": 901, "bad": 0},
{"score": 1, "bad": "not a number"}
]
}))
.into();
let p = lower_query("$.data.filter(score > 900 or 1 / 0 > 0).first()").unwrap();
let out = p.run(&doc).unwrap();
let out_json: serde_json::Value = out.into();
assert_eq!(out_json, json!({"score": 901, "bad": 0}));
}
#[test]
fn take_before_filter_caps_upstream_inputs() {
use serde_json::json;
let doc: Val = (&json!({
"data": [
{"score": 1},
{"score": 2},
{"score": 901}
]
}))
.into();
let p = lower_query("$.data.take(2).filter(score > 900).first()").unwrap();
let demand = p.source_demand();
assert_eq!(
demand.chain.pull,
crate::plan::demand::PullDemand::FirstInput(2)
);
let out = p.run(&doc).unwrap();
assert_eq!(out, Val::Null);
}
#[test]
fn filter_take_collect_stops_after_required_outputs() {
use serde_json::json;
let doc: Val = (&json!({
"data": [
{"score": 901, "bad": 0},
{"score": 902, "bad": 0},
{"score": 1, "bad": "not a number"}
]
}))
.into();
let p = lower_query("$.data.filter(score > 900 or 1 / 0 > 0).take(2)").unwrap();
let demand = p.source_demand();
assert_eq!(
demand.chain.pull,
crate::plan::demand::PullDemand::UntilOutput(2)
);
let out = p.run(&doc).unwrap();
let out_json: serde_json::Value = out.into();
assert_eq!(
out_json,
json!([{"score": 901, "bad": 0}, {"score": 902, "bad": 0}])
);
}
#[test]
fn last_sink_requests_reverse_input_when_available() {
use serde_json::json;
let doc: Val = (&json!({
"data": [
{"score": 901},
{"score": 902}
]
}))
.into();
let p = lower_query("$.data.filter(score > 900).last()").unwrap();
let demand = p.source_demand();
assert_eq!(
demand.chain.pull,
crate::plan::demand::PullDemand::LastInput(1)
);
let out = p.run(&doc).unwrap();
let out_json: serde_json::Value = out.into();
assert_eq!(out_json, json!({"score": 902}));
}
#[test]
fn reverse_propagates_positional_demand() {
use serde_json::json;
let doc: Val = (&json!({"xs": [10, 20, 30, 40]})).into();
let first = lower_query("$.xs.reverse().first()").unwrap();
assert_eq!(
first.source_demand().chain.pull,
crate::plan::demand::PullDemand::LastInput(1)
);
assert_eq!(first.run(&doc).unwrap(), Val::Int(40));
let last = lower_query("$.xs.reverse().last()").unwrap();
assert_eq!(
last.source_demand().chain.pull,
crate::plan::demand::PullDemand::FirstInput(1)
);
assert_eq!(last.run(&doc).unwrap(), Val::Int(10));
let take = lower_query("$.xs.reverse().take(2)").unwrap();
assert_eq!(
take.source_demand().chain.pull,
crate::plan::demand::PullDemand::LastInput(2)
);
let take_json: serde_json::Value = take.run(&doc).unwrap().into();
assert_eq!(take_json, json!([40, 30]));
let take_zero = lower_query("$.xs.reverse().take(0)").unwrap();
assert_eq!(
take_zero.source_demand().chain.pull,
crate::plan::demand::PullDemand::LastInput(0)
);
let take_zero_json: serde_json::Value = take_zero.run(&doc).unwrap().into();
assert_eq!(take_zero_json, json!([]));
}
#[test]
fn nth_sink_requests_indexed_input_when_available() {
use serde_json::json;
let doc: Val = (&json!({
"data": [
{"score": 901},
{"score": 902},
{"score": 903}
]
}))
.into();
let p = lower_query("$.data.map(score).nth(1)").unwrap();
let demand = p.source_demand();
assert_eq!(
demand.chain.pull,
crate::plan::demand::PullDemand::NthInput(1)
);
let out = p.run(&doc).unwrap();
let out_json: serde_json::Value = out.into();
assert_eq!(out_json, json!(902));
}
#[test]
fn positional_many_terminal_sinks_match_runtime_semantics() {
use serde_json::json;
let root = Val::from(&json!({"xs": [1, 2, 3, 4]}));
let first_one: serde_json::Value = lower_query("$.xs.first(1)")
.unwrap()
.run(&root)
.unwrap()
.into();
assert_eq!(first_one, json!(1));
let first_two: serde_json::Value = lower_query("$.xs.first(2)")
.unwrap()
.run(&root)
.unwrap()
.into();
assert_eq!(first_two, json!([1, 2]));
let last_one: serde_json::Value = lower_query("$.xs.last(1)")
.unwrap()
.run(&root)
.unwrap()
.into();
assert_eq!(last_one, json!(4));
let last_two: serde_json::Value = lower_query("$.xs.last(2)")
.unwrap()
.run(&root)
.unwrap()
.into();
assert_eq!(last_two, json!([3, 4]));
let mapped_first: serde_json::Value = lower_query("$.xs.map(@ + 1).first(2)")
.unwrap()
.run(&root)
.unwrap()
.into();
assert_eq!(mapped_first, json!([2, 3]));
let mapped_last: serde_json::Value = lower_query("$.xs.map(@ + 1).last(2)")
.unwrap()
.run(&root)
.unwrap()
.into();
assert_eq!(mapped_last, json!([4, 5]));
}
#[test]
fn positional_many_terminal_sinks_use_indexed_dispatch_for_indexed_chains() {
let first = lower_query("$.xs.map(@ + 1).first(2)").unwrap();
assert_eq!(first.exec_path, PhysicalExecPath::Indexed);
let last = lower_query("$.xs.map(@ + 1).last(2)").unwrap();
assert_eq!(last.exec_path, PhysicalExecPath::Indexed);
let first_one = lower_query("$.xs.map(@ + 1).first(1)").unwrap();
assert_eq!(first_one.exec_path, PhysicalExecPath::Indexed);
assert!(matches!(
first_one.source_access,
SourceAccessMode::Indexed(0)
));
let last_one = lower_query("$.xs.map(@ + 1).last(1)").unwrap();
assert_eq!(last_one.exec_path, PhysicalExecPath::Indexed);
assert!(matches!(
last_one.source_access,
SourceAccessMode::IndexedFromEnd(0)
));
}
#[test]
fn positional_many_terminal_sinks_propagate_bounded_demand() {
let first = lower_query("$.xs.first(3)").unwrap();
assert!(matches!(
first.sink,
Sink::SelectMany {
n: 3,
from_end: false
}
));
assert_eq!(
first.source_demand().chain.pull,
crate::plan::demand::PullDemand::FirstInput(3)
);
let last = lower_query("$.xs.last(3)").unwrap();
assert!(matches!(
last.sink,
Sink::SelectMany {
n: 3,
from_end: true
}
));
assert_eq!(
last.source_demand().chain.pull,
crate::plan::demand::PullDemand::LastInput(3)
);
}
#[test]
fn positional_terminal_sinks_respect_prefix_slices() {
use serde_json::json;
let root = Val::from(&json!({"xs": [1, 2, 3, 4]}));
let take_last = lower_query("$.xs.take(2).last()").unwrap();
assert_eq!(
take_last.source_demand().chain.pull,
crate::plan::demand::PullDemand::FirstInput(2)
);
let out: serde_json::Value = take_last.run(&root).unwrap().into();
assert_eq!(out, json!(2));
let skip_last = lower_query("$.xs.skip(2).last()").unwrap();
assert_eq!(
skip_last.source_demand().chain.pull,
crate::plan::demand::PullDemand::All
);
let out: serde_json::Value = skip_last.run(&root).unwrap().into();
assert_eq!(out, json!(4));
}
#[test]
fn positional_terminal_sinks_respect_zero_width_slices() {
use serde_json::json;
let root = Val::from(&json!({"xs": [1, 2, 3, 4]}));
let take_first = lower_query("$.xs.take(0).first()").unwrap();
assert_eq!(
take_first.source_demand().chain.pull,
crate::plan::demand::PullDemand::FirstInput(0)
);
let out: serde_json::Value = take_first.run(&root).unwrap().into();
assert_eq!(out, json!(null));
let take_last = lower_query("$.xs.take(0).last()").unwrap();
assert_eq!(
take_last.source_demand().chain.pull,
crate::plan::demand::PullDemand::FirstInput(0)
);
let out: serde_json::Value = take_last.run(&root).unwrap().into();
assert_eq!(out, json!(null));
}
#[test]
fn select_many_sink_demand_is_directional() {
let first = Sink::SelectMany {
n: 4,
from_end: false,
}
.demand();
assert_eq!(
first.chain.pull,
crate::plan::demand::PullDemand::FirstInput(4)
);
let last = Sink::SelectMany {
n: 2,
from_end: true,
}
.demand();
assert_eq!(
last.chain.pull,
crate::plan::demand::PullDemand::LastInput(2)
);
assert!(last.chain.order);
}
#[test]
fn nth_sink_demand_is_indexed_and_order_free() {
let demand = Sink::Nth(3).demand();
assert_eq!(
demand.chain.pull,
crate::plan::demand::PullDemand::NthInput(3)
);
assert_eq!(demand.chain.value, crate::plan::demand::ValueNeed::Whole);
assert!(!demand.chain.order);
assert_eq!(demand.positional, Some(Position::First));
}
#[test]
fn filter_nth_sink_keeps_filtered_semantics() {
use serde_json::json;
let doc: Val = (&json!({
"data": [
{"score": 1},
{"score": 901},
{"score": 902},
{"score": 903}
]
}))
.into();
let p = lower_query("$.data.filter(score > 900).map(score).nth(1)").unwrap();
let demand = p.source_demand();
assert_eq!(demand.chain.pull, crate::plan::demand::PullDemand::All);
let out = p.run(&doc).unwrap();
let out_json: serde_json::Value = out.into();
assert_eq!(out_json, json!(902));
}
#[test]
fn compact_first_and_last_are_filter_like() {
use serde_json::json;
let doc: Val = (&json!({"xs": [null, 10, null, 20]})).into();
let first = lower_query("$.xs.compact().first()").unwrap();
assert_eq!(
first.source_demand().chain.pull,
crate::plan::demand::PullDemand::UntilOutput(1)
);
assert_eq!(first.run(&doc).unwrap(), Val::Int(10));
let last = lower_query("$.xs.compact().last()").unwrap();
assert_eq!(
last.source_demand().chain.pull,
crate::plan::demand::PullDemand::LastInput(1)
);
assert_eq!(last.run(&doc).unwrap(), Val::Int(20));
}
#[test]
fn remove_first_and_last_are_filter_like() {
use serde_json::json;
let doc: Val = (&json!({"xs": [2, 1, 2, 3, 2]})).into();
let collect = lower_query("$.xs.remove(2)").unwrap();
let collect_json: serde_json::Value = collect.run(&doc).unwrap().into();
assert_eq!(collect_json, json!([1, 3]));
let first = lower_query("$.xs.remove(2).first()").unwrap();
assert_eq!(
first.source_demand().chain.pull,
crate::plan::demand::PullDemand::UntilOutput(1)
);
assert_eq!(first.run(&doc).unwrap(), Val::Int(1));
let last = lower_query("$.xs.remove(2).last()").unwrap();
assert_eq!(
last.source_demand().chain.pull,
crate::plan::demand::PullDemand::LastInput(1)
);
assert_eq!(last.run(&doc).unwrap(), Val::Int(3));
}
#[test]
fn find_first_lowers_to_filter_first_demand() {
use serde_json::json;
let doc: Val = (&json!({
"xs": [
{"score": 1},
{"score": 9},
{"score": 11}
]
}))
.into();
let p = lower_query("$.xs.find_first(score > 5)").unwrap();
assert!(matches!(p.sink, Sink::Terminal(BuiltinMethod::First)));
assert_eq!(
p.source_demand().chain.pull,
crate::plan::demand::PullDemand::UntilOutput(1)
);
let out_json: serde_json::Value = p.run(&doc).unwrap().into();
assert_eq!(out_json, json!({"score": 9}));
}
#[test]
fn rewrite_filter_const_true_dropped() {
let p = lower_query("$.xs.filter(true).count()").unwrap();
assert_eq!(p.stages.len(), 0);
assert!(matches!(p.sink, Sink::Reducer(ref spec) if spec.op == ReducerOp::Count));
}
#[test]
fn run_take_skip() {
use serde_json::json;
let doc: Val = (&json!({"xs":[10, 20, 30, 40, 50]})).into();
let p = lower_query("$.xs.skip(1).take(2).sum()").unwrap();
let out = p.run(&doc).unwrap();
assert_eq!(out, Val::Int(50));
}
#[test]
fn path_slice_lowers_to_bounded_positional_stages() {
use serde_json::json;
let doc: Val = (&json!({"xs": [10, 20, 30, 40, 50]})).into();
let p = lower_query("$.xs[0:3].last()").unwrap();
assert_eq!(
p.source_demand().chain.pull,
crate::plan::demand::PullDemand::FirstInput(3)
);
assert_eq!(p.run(&doc).unwrap(), Val::Int(30));
let p = lower_query("$.xs[2:].take(2)").unwrap();
assert_eq!(
p.source_demand().chain.pull,
crate::plan::demand::PullDemand::FirstInput(4)
);
let out: serde_json::Value = p.run(&doc).unwrap().into();
assert_eq!(out, json!([30, 40]));
assert!(lower_query("$.xs[-2:].first()").is_none());
}
#[test]
fn scalar_slice_preserves_pipeline_demand() {
use serde_json::json;
let doc: Val = (&json!({"rows": ["abc", "def"]})).into();
let p = lower_query("$.rows.slice(0, 2).last()").unwrap();
assert_eq!(
p.source_demand().chain.pull,
crate::plan::demand::PullDemand::LastInput(1)
);
let out: serde_json::Value = p.run(&doc).unwrap().into();
assert_eq!(out, json!("de"));
}
#[test]
fn scalar_and_path_element_stages_preserve_positional_demand() {
for query in [
"$.rows.has_key(\"isbn\").last()",
"$.rows.has_path(\"isbn\").last()",
"$.rows.get_path(\"isbn\").last()",
"$.rows.upper().last()",
"$.rows.byte_len().last()",
] {
let p = lower_query(query).unwrap();
assert_eq!(
p.source_demand().chain.pull,
crate::plan::demand::PullDemand::LastInput(1),
"{query}"
);
}
}
#[test]
fn chunk_and_window_bounded_demand_match_vm() {
use serde_json::json;
let doc: Val = (&json!({"xs": [1, 2, 3, 4, 5, 6, 7, 8]})).into();
let p = lower_query("$.xs.chunk(3).take(2)").unwrap();
assert_eq!(
p.source_demand().chain.pull,
crate::plan::demand::PullDemand::FirstInput(6)
);
let out: serde_json::Value = p.run(&doc).unwrap().into();
assert_eq!(out, json!([[1, 2, 3], [4, 5, 6]]));
let p = lower_query("$.xs.window(3).take(2)").unwrap();
assert_eq!(
p.source_demand().chain.pull,
crate::plan::demand::PullDemand::FirstInput(4)
);
let out: serde_json::Value = p.run(&doc).unwrap().into();
assert_eq!(out, json!([[1, 2, 3], [2, 3, 4]]));
let p = lower_query("$.xs.window(3).last()").unwrap();
assert_eq!(
p.source_demand().chain.pull,
crate::plan::demand::PullDemand::All
);
let out: serde_json::Value = p.run(&doc).unwrap().into();
assert_eq!(out, json!([6, 7, 8]));
}
#[test]
fn predicate_terminal_sinks_match_vm() {
use serde_json::json;
let doc = json!({
"xs": [
{"score": 1, "isbn": "a"},
{"score": 9, "isbn": "b"},
{"score": 11, "isbn": "c"}
]
});
for query in [
"$.xs.any(score > 10)",
"$.xs.exists(score > 10)",
"$.xs.all(score > 0)",
"$.xs.find_index(score > 10)",
"$.xs.indices_where(score > 5)",
"$.xs.map(isbn).find_index(@ == \"c\")",
] {
assert_pipeline_matches_vm(query, doc.clone());
}
let pipeline = lower_query("$.xs.find_one(score == 9)").unwrap();
let root = Val::from(&doc);
let actual: serde_json::Value = pipeline.run(&root).unwrap().into();
assert_eq!(actual["isbn"], json!("b"));
}
#[test]
fn predicate_terminal_sinks_keep_conservative_source_demand() {
let any = lower_query("$.xs.any(@ > 2)").unwrap();
assert!(matches!(any.sink, Sink::Predicate(ref spec) if spec.op == PredicateSinkOp::Any));
assert_eq!(
any.source_demand().chain.pull,
crate::plan::demand::PullDemand::All
);
assert_eq!(
any.source_demand().sink_result,
crate::plan::demand::SinkResultDemand::UntilMatch
);
assert!(any.source_demand().has_scalar_short_circuit());
assert_eq!(
any.source_demand().chain.value,
crate::plan::demand::ValueNeed::Predicate
);
let all = lower_query("$.xs.all(@ > 2)").unwrap();
assert!(matches!(all.sink, Sink::Predicate(ref spec) if spec.op == PredicateSinkOp::All));
assert_eq!(
all.source_demand().chain.pull,
crate::plan::demand::PullDemand::All
);
assert_eq!(
all.source_demand().sink_result,
crate::plan::demand::SinkResultDemand::UntilFailure
);
assert!(all.source_demand().has_scalar_short_circuit());
assert_eq!(
all.source_demand().chain.value,
crate::plan::demand::ValueNeed::Predicate
);
let find_index = lower_query("$.xs.find_index(@ > 2)").unwrap();
assert!(
matches!(find_index.sink, Sink::Predicate(ref spec) if spec.op == PredicateSinkOp::FindIndex)
);
assert_eq!(
find_index.source_demand().chain.pull,
crate::plan::demand::PullDemand::All
);
assert_eq!(
find_index.source_demand().sink_result,
crate::plan::demand::SinkResultDemand::UntilMatch
);
assert!(find_index.source_demand().has_scalar_short_circuit());
assert_eq!(
find_index.source_demand().chain.value,
crate::plan::demand::ValueNeed::Predicate
);
let indices_where = lower_query("$.xs.indices_where(@ > 2)").unwrap();
assert!(
matches!(indices_where.sink, Sink::Predicate(ref spec) if spec.op == PredicateSinkOp::IndicesWhere)
);
assert_eq!(
indices_where.source_demand().sink_result,
crate::plan::demand::SinkResultDemand::None
);
assert!(!indices_where.source_demand().has_scalar_short_circuit());
assert_eq!(
indices_where.source_demand().chain.value,
crate::plan::demand::ValueNeed::Predicate
);
let find_one = lower_query("$.xs.find_one(@ > 2)").unwrap();
assert!(
matches!(find_one.sink, Sink::Predicate(ref spec) if spec.op == PredicateSinkOp::FindOne)
);
assert_eq!(
find_one.source_demand().chain.pull,
crate::plan::demand::PullDemand::All
);
assert_eq!(
find_one.source_demand().chain.value,
crate::plan::demand::ValueNeed::Whole
);
}
#[test]
fn find_one_terminal_sink_errors_on_zero_or_multiple_matches() {
use serde_json::json;
let doc = json!({
"xs": [
{"score": 1},
{"score": 9},
{"score": 11}
]
});
let root = Val::from(&doc);
let zero = lower_query("$.xs.find_one(score > 100)").unwrap();
assert!(zero.run(&root).is_err());
let multiple = lower_query("$.xs.find_one(score > 5)").unwrap();
assert!(multiple.run(&root).is_err());
}
#[test]
fn membership_terminal_sinks_match_vm() {
use serde_json::json;
let doc = json!({
"xs": ["a", "urgent", "x", "urgent"],
"needle": "urgent",
"s": "hello",
"substring": "ell",
"obj": {"urgent": true}
});
for query in [
"$.xs.includes(\"urgent\")",
"$.xs.contains(\"urgent\")",
"$.xs.index(\"urgent\")",
"$.xs.indices_of(\"urgent\")",
"$.xs.includes($.needle)",
"$.xs.index($.needle)",
"$.xs.indices_of($.needle)",
"$.xs.map(@).includes(\"x\")",
"$.xs.map(@).includes($.needle)",
"$.s.includes(\"ell\")",
"$.s.includes($.substring)",
"$.obj.includes(\"urgent\")",
] {
assert_pipeline_matches_vm(query, doc.clone());
}
}
#[test]
fn membership_terminal_sinks_keep_conservative_source_demand() {
let includes = lower_query("$.xs.includes(\"urgent\")").unwrap();
assert!(
matches!(includes.sink, Sink::Membership(ref spec) if spec.op == MembershipSinkOp::Includes)
);
assert_eq!(
includes.source_demand().chain.pull,
crate::plan::demand::PullDemand::All
);
assert_eq!(
includes.source_demand().chain.value,
crate::plan::demand::ValueNeed::Whole
);
assert_eq!(
includes.source_demand().sink_result,
crate::plan::demand::SinkResultDemand::UntilMatch
);
let index = lower_query("$.xs.index(\"urgent\")").unwrap();
assert!(
matches!(index.sink, Sink::Membership(ref spec) if spec.op == MembershipSinkOp::Index)
);
assert_eq!(
index.source_demand().sink_result,
crate::plan::demand::SinkResultDemand::UntilMatch
);
assert!(index.source_demand().has_scalar_short_circuit());
let indices = lower_query("$.xs.indices_of(\"urgent\")").unwrap();
assert!(
matches!(indices.sink, Sink::Membership(ref spec) if spec.op == MembershipSinkOp::IndicesOf)
);
assert_eq!(
indices.source_demand().sink_result,
crate::plan::demand::SinkResultDemand::None
);
assert!(!indices.source_demand().has_scalar_short_circuit());
let dynamic = lower_query("$.xs.includes($.needle)").unwrap();
assert!(matches!(dynamic.sink, Sink::Membership(ref spec)
if matches!(spec.target, MembershipSinkTarget::Program(_))));
}
#[test]
fn arg_extreme_terminal_sinks_match_vm() {
use serde_json::json;
let doc = json!({
"xs": [
{"title": "A", "score": 1, "price": 20},
{"title": "B", "score": 9, "price": 10},
{"title": "C", "score": 9, "price": 30}
],
"names": ["a", "abc", "ab"]
});
for query in [
"$.xs.max_by(score)",
"$.xs.min_by(price)",
"$.names.max_by(@.len())",
"$.names.min_by(@.len())",
"$.xs.map(@).max_by(score)",
] {
assert_pipeline_matches_vm(query, doc.clone());
}
}
#[test]
fn arg_extreme_terminal_sinks_keep_conservative_source_demand() {
let max_by = lower_query("$.xs.max_by(score)").unwrap();
assert!(matches!(max_by.sink, Sink::ArgExtreme(ref spec) if spec.want_max));
assert_eq!(
max_by.source_demand().chain.pull,
crate::plan::demand::PullDemand::All
);
assert_eq!(
max_by.source_demand().chain.value,
crate::plan::demand::ValueNeed::Whole
);
assert!(max_by.source_demand().chain.order);
let min_by = lower_query("$.xs.min_by(score)").unwrap();
assert!(matches!(min_by.sink, Sink::ArgExtreme(ref spec) if !spec.want_max));
}
}