use std::sync::Arc;
use crate::builtins::{
BuiltinCancellation, BuiltinMethod, BuiltinNumericReducer, BuiltinViewStage,
};
use crate::data::context::{Env, EvalError};
use crate::data::value::Val;
use crate::parse::ast::Expr;
mod capability;
mod collector;
mod columnar;
mod common;
mod composed;
mod exec;
mod indexed_exec;
mod ir;
mod kernels;
pub(crate) mod logical_lower;
mod lower;
pub(crate) mod materialized_exec;
mod operator;
mod plan;
mod reducer;
mod row_source;
mod sink_accumulator;
mod symbolic;
mod val_stage_flow;
pub(crate) use capability::{
view_capabilities, view_prefix_capabilities, SourceAccessMode, SourceCapabilities,
ViewInputMode, ViewMaterialization, ViewMembershipTarget, ViewOutputMode, ViewSinkCapability,
ViewStageCapability,
};
pub(crate) use collector::{TerminalCollector, TerminalMapCollector};
pub(crate) use common::{
apply_item_in_env, bounded_sort_by_key, bounded_sort_by_key_cmp, cmp_val_total, is_truthy,
num_finalise, num_fold, ordered_by_key_cmp, walk_field_chain, BoundedKeySorter,
OrderedKeySorter,
};
#[cfg(test)]
pub use ir::Strategy;
pub use ir::{PhysicalExecPath, Plan, Position, StageStrategy};
pub use kernels::{eval_cmp_op, eval_kernel, BodyKernel};
pub(crate) use kernels::{eval_view_kernel, CollectLayout, ObjectKernel, ViewKernelValue};
pub use operator::{
ArgExtremeSinkSpec, MembershipSinkOp, MembershipSinkSpec, MembershipSinkTarget,
PredicateSinkOp, PredicateSinkSpec, ReducerOp, ReducerSpec,
};
#[cfg(test)]
pub use plan::compute_strategies;
#[cfg(test)]
pub use plan::plan;
#[cfg(test)]
pub use plan::select_strategy;
pub use plan::{
compute_strategies_with_kernels, plan_with_exprs, plan_with_kernels, select_exec_path,
};
pub(crate) use reducer::ReducerAccumulator;
pub(crate) use sink_accumulator::SinkAccumulator;
pub(crate) enum StageFlow<T> {
Continue(T),
SkipRow,
Stop,
TerminalCollected,
}
#[cfg(feature = "simd-json")]
pub(crate) fn run_tape_field_chain(
body: &PipelineBody,
tape: &crate::data::tape::TapeData,
keys: &[Arc<str>],
base_env: &Env,
) -> Option<Result<Val, EvalError>> {
materialized_exec::run_tape_field_chain(body, tape, keys, base_env)
}
pub trait PipelineData {
fn promote_objvec(&self, arr: &Arc<Vec<Val>>) -> Option<Arc<crate::data::value::ObjVecData>>;
}
use std::sync::atomic::{AtomicU8, Ordering};
static TRACE_INIT: AtomicU8 = AtomicU8::new(0);
#[inline]
pub(crate) fn trace_enabled() -> bool {
let v = TRACE_INIT.load(Ordering::Relaxed);
if v != 0 {
return v == 2;
}
let on = std::env::var_os("JETRO_PIPELINE_TRACE").is_some();
TRACE_INIT.store(if on { 2 } else { 1 }, Ordering::Relaxed);
on
}
fn sink_name(s: &Sink) -> &'static str {
match s {
Sink::Collect => "collect",
Sink::Reducer(spec) => match spec.op {
ReducerOp::Count => "count",
ReducerOp::Numeric(NumOp::Sum) => "sum",
ReducerOp::Numeric(NumOp::Min) => "min",
ReducerOp::Numeric(NumOp::Max) => "max",
ReducerOp::Numeric(NumOp::Avg) => "avg",
},
Sink::Membership(spec) => match spec.op {
MembershipSinkOp::Includes => "includes",
MembershipSinkOp::Index => "index",
MembershipSinkOp::IndicesOf => "indices_of",
},
Sink::Predicate(spec) => match spec.op {
PredicateSinkOp::Any => "any",
PredicateSinkOp::All => "all",
PredicateSinkOp::FindIndex => "find_index",
PredicateSinkOp::IndicesWhere => "indices_where",
PredicateSinkOp::FindOne => "find_one",
},
Sink::ArgExtreme(spec) if spec.want_max => "max_by",
Sink::ArgExtreme(_) => "min_by",
Sink::Terminal(BuiltinMethod::First) => "first",
Sink::Terminal(BuiltinMethod::Last) => "last",
Sink::SelectMany { from_end: false, .. } => "first_n",
Sink::SelectMany { from_end: true, .. } => "last_n",
Sink::Nth(_) => "nth",
Sink::Terminal(_) => "terminal",
Sink::ApproxCountDistinct => "approx_count_distinct",
}
}
fn source_name(s: &Source) -> &'static str {
match s {
Source::Receiver(_) => "receiver",
Source::FieldChain { .. } => "field_chain",
}
}
fn expr_label(e: &Expr) -> &'static str {
match e {
Expr::Chain(_, _) => "chain",
Expr::Pipeline { .. } => "pipeline",
Expr::Object(_) => "object",
Expr::Array(_) => "array",
Expr::ListComp { .. } => "list_comp",
Expr::DictComp { .. } => "dict_comp",
Expr::Let { .. } => "let",
Expr::Patch { .. } => "patch",
Expr::Lambda { .. } => "lambda",
Expr::IfElse { .. } => "if_else",
Expr::BinOp(_, _, _) => "binop",
Expr::Root => "root_only",
_ => "other",
}
}
#[derive(Debug, Clone)]
pub enum Source {
Receiver(Val),
FieldChain { keys: Arc<[Arc<str>]> },
}
pub type PipelineBuiltinCall = crate::builtins::BuiltinCall;
#[derive(Debug, Clone)]
pub struct SortSpec {
pub key: Option<Arc<crate::vm::Program>>,
pub descending: bool,
}
impl SortSpec {
pub fn identity() -> Self {
Self {
key: None,
descending: false,
}
}
pub fn keyed(key: Arc<crate::vm::Program>, descending: bool) -> Self {
Self {
key: Some(key),
descending,
}
}
}
#[derive(Debug, Clone)]
pub enum Stage {
Filter(Arc<crate::vm::Program>, BuiltinViewStage),
Map(Arc<crate::vm::Program>, BuiltinViewStage),
FlatMap(Arc<crate::vm::Program>, BuiltinViewStage),
Reverse(BuiltinCancellation),
UniqueBy(Option<Arc<crate::vm::Program>>),
Sort(SortSpec),
Builtin(PipelineBuiltinCall),
UsizeBuiltin {
method: BuiltinMethod,
value: usize,
},
StringBuiltin {
method: BuiltinMethod,
value: Arc<str>,
},
StringPairBuiltin {
method: BuiltinMethod,
first: Arc<str>,
second: Arc<str>,
},
IntRangeBuiltin {
method: BuiltinMethod,
start: i64,
end: Option<i64>,
},
CompiledMap(Arc<Plan>),
ExprBuiltin {
method: BuiltinMethod,
body: Arc<crate::vm::Program>,
},
SortedDedup(Option<Arc<crate::vm::Program>>),
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum NumOp {
Sum,
Min,
Max,
Avg,
}
impl NumOp {
pub(crate) fn from_builtin_reducer(reducer: BuiltinNumericReducer) -> Self {
match reducer {
BuiltinNumericReducer::Sum => NumOp::Sum,
BuiltinNumericReducer::Avg => NumOp::Avg,
BuiltinNumericReducer::Min => NumOp::Min,
BuiltinNumericReducer::Max => NumOp::Max,
}
}
pub(crate) fn method(self) -> BuiltinMethod {
match self {
NumOp::Sum => BuiltinMethod::Sum,
NumOp::Min => BuiltinMethod::Min,
NumOp::Max => BuiltinMethod::Max,
NumOp::Avg => BuiltinMethod::Avg,
}
}
fn empty(self) -> Val {
match self {
NumOp::Sum => Val::Int(0),
NumOp::Avg => Val::Null,
NumOp::Min => Val::Null,
NumOp::Max => Val::Null,
}
}
}
impl Sink {
pub(crate) fn reducer_spec(&self) -> Option<ReducerSpec> {
match self {
Sink::Reducer(spec) => Some(spec.clone()),
_ => None,
}
}
}
#[derive(Debug, Clone)]
pub enum Sink {
Collect,
Reducer(ReducerSpec),
Predicate(PredicateSinkSpec),
Membership(MembershipSinkSpec),
ArgExtreme(ArgExtremeSinkSpec),
Terminal(BuiltinMethod),
SelectMany {
n: usize,
from_end: bool,
},
Nth(usize),
ApproxCountDistinct,
}
#[derive(Debug, Clone)]
pub struct Pipeline {
pub source: Source,
pub stages: Vec<Stage>,
pub stage_exprs: Vec<Option<Arc<Expr>>>,
pub sink: Sink,
pub stage_kernels: Vec<BodyKernel>,
pub sink_kernels: Vec<BodyKernel>,
pub exec_path: PhysicalExecPath,
}
#[derive(Debug, Clone)]
pub struct PipelineBody {
pub stages: Vec<Stage>,
pub stage_exprs: Vec<Option<Arc<Expr>>>,
pub sink: Sink,
pub stage_kernels: Vec<BodyKernel>,
pub sink_kernels: Vec<BodyKernel>,
}
impl PipelineBody {
#[inline]
pub fn with_source(self, source: Source) -> Pipeline {
let exec_path = select_exec_path(&self.stages, &self.sink);
Pipeline {
source,
exec_path,
stages: self.stages,
stage_exprs: self.stage_exprs,
sink: self.sink,
stage_kernels: self.stage_kernels,
sink_kernels: self.sink_kernels,
}
}
}
impl Pipeline {
#[inline]
pub fn into_source_body(self) -> (Source, PipelineBody) {
let body = PipelineBody {
stages: self.stages,
stage_exprs: self.stage_exprs,
sink: self.sink,
stage_kernels: self.stage_kernels,
sink_kernels: self.sink_kernels,
};
(self.source, body)
}
pub fn canonical(&self) -> (Vec<Stage>, Vec<BodyKernel>, Sink) {
(
self.stages.clone(),
self.stage_kernels.clone(),
self.sink.clone(),
)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::parse::ast::{Arg, BinOp, Expr, Step};
use crate::parse::parser;
fn lower_query(q: &str) -> Option<Pipeline> {
let expr = parser::parse(q).ok()?;
Pipeline::lower(&expr)
}
fn only_stage_expr(p: &Pipeline) -> &Expr {
assert_eq!(p.stage_exprs.len(), p.stages.len());
p.stage_exprs[0]
.as_ref()
.expect("expected optimized stage expression")
.as_ref()
}
fn assert_price_qty_gt_100(expr: &Expr) {
match expr {
Expr::BinOp(lhs, BinOp::Gt, rhs) => {
assert_price_qty_mul(lhs);
assert!(matches!(rhs.as_ref(), Expr::Int(100)), "{expr:#?}");
}
_ => panic!("expected `(price * qty) > 100`, got {expr:#?}"),
}
}
fn assert_price_qty_mul(expr: &Expr) {
match expr {
Expr::BinOp(lhs, BinOp::Mul, rhs) => {
assert!(
matches!(lhs.as_ref(), Expr::Ident(name) if name == "price"),
"{expr:#?}"
);
assert!(
matches!(rhs.as_ref(), Expr::Ident(name) if name == "qty"),
"{expr:#?}"
);
}
_ => panic!("expected `price * qty`, got {expr:#?}"),
}
}
fn assert_pipeline_matches_vm(query: &str, doc: serde_json::Value) {
assert_pipeline_matches_vm_query(query, query, doc);
}
fn assert_pipeline_matches_vm_query(
pipeline_query: &str,
vm_query: &str,
doc: serde_json::Value,
) {
let pipeline = lower_query(pipeline_query).expect("query should lower to pipeline");
let root = Val::from(&doc);
let actual: serde_json::Value = pipeline
.run(&root)
.expect("pipeline execution should succeed")
.into();
let mut vm = crate::vm::VM::new();
let expected = vm
.run_str(vm_query, &doc)
.expect("VM execution should succeed");
assert_eq!(
actual, expected,
"pipeline diverged from VM for {pipeline_query}"
);
}
#[test]
fn lower_field_chain_only() {
let p = lower_query("$.a.b.c").unwrap();
assert!(matches!(p.source, Source::FieldChain { .. }));
assert!(p.stages.is_empty());
assert!(matches!(p.sink, Sink::Collect));
}
#[test]
fn row_source_keeps_objvec_as_streaming_provider() {
let keys: std::sync::Arc<[std::sync::Arc<str>]> =
vec![std::sync::Arc::<str>::from("id")].into();
let data = std::sync::Arc::new(crate::data::value::ObjVecData {
keys,
cells: vec![Val::Int(1), Val::Int(2)],
typed_cols: None,
});
let recv = Val::ObjVec(std::sync::Arc::clone(&data));
let source = row_source::ValRowSource::from_receiver(&recv);
assert!(source.is_objvec_streaming());
let mut iter = source.iter();
assert!(matches!(iter, row_source::ValRowsIter::ObjVec { .. }));
assert_eq!(iter.next().unwrap().get_field("id"), Val::Int(1));
assert_eq!(iter.next().unwrap().get_field("id"), Val::Int(2));
assert!(iter.next().is_none());
}
#[cfg(feature = "simd-json")]
#[test]
fn tape_row_source_walks_field_chain_array_lazily() {
let tape = crate::data::tape::TapeData::parse(
br#"{"books":[{"id":1},{"id":2}],"skip":[3]}"#.to_vec(),
)
.unwrap();
let keys = vec![std::sync::Arc::<str>::from("books")];
let source = row_source::TapeRowSource::from_field_chain(&tape, &keys);
assert!(source.is_array_provider());
use crate::data::view::ValueView;
let mut iter = source.iter_views();
assert!(matches!(iter, row_source::TapeRowsIter::Array { .. }));
assert_eq!(
iter.next().unwrap().materialize().get_field("id"),
Val::Int(1)
);
assert_eq!(
iter.next().unwrap().materialize().get_field("id"),
Val::Int(2)
);
assert!(iter.next().is_none());
}
#[test]
fn receiver_pipeline_start_uses_builtin_metadata() {
assert!(Pipeline::is_receiver_pipeline_start(&Step::Method(
"filter".into(),
vec![Arg::Pos(Expr::Bool(true))]
)));
assert!(Pipeline::is_receiver_pipeline_start(&Step::Method(
"sum".into(),
Vec::new()
)));
assert!(Pipeline::is_receiver_pipeline_start(&Step::Method(
"first".into(),
Vec::new()
)));
assert!(Pipeline::is_receiver_pipeline_start(&Step::Method(
"count_by".into(),
vec![Arg::Pos(Expr::Ident("kind".into()))]
)));
assert!(!Pipeline::is_receiver_pipeline_start(&Step::Method(
"from_json".into(),
Vec::new()
)));
}
#[test]
fn lower_take_skip_sum() {
let p = lower_query("$.xs.skip(2).take(5).sum()").unwrap();
assert_eq!(p.stages.len(), 2);
assert!(matches!(
p.stages[0],
Stage::UsizeBuiltin {
method: BuiltinMethod::Skip,
value: 2
}
));
assert!(matches!(
p.stages[1],
Stage::UsizeBuiltin {
method: BuiltinMethod::Take,
value: 5
}
));
assert!(
matches!(&p.sink, Sink::Reducer(spec) if spec.op == ReducerOp::Numeric(NumOp::Sum))
);
}
#[test]
fn lower_pure_builtin_uses_generic_builtin_stage() {
let p = lower_query("$.names.upper().starts_with(\"A\")").unwrap();
assert_eq!(p.stages.len(), 2);
assert!(matches!(
p.stages[0],
Stage::Builtin(PipelineBuiltinCall {
method: crate::builtins::BuiltinMethod::Upper,
..
})
));
assert!(matches!(
p.stages[1],
Stage::Builtin(PipelineBuiltinCall {
method: crate::builtins::BuiltinMethod::StartsWith,
..
})
));
assert!(matches!(p.sink, Sink::Collect));
}
#[test]
fn lower_pipeline_builtin_uses_builtin_owned_allowlist() {
let p = lower_query("$.items.schema()").unwrap();
assert_eq!(p.stages.len(), 1);
assert!(matches!(
p.stages[0],
Stage::Builtin(PipelineBuiltinCall {
method: crate::builtins::BuiltinMethod::Schema,
..
})
));
}
#[test]
fn lower_whole_receiver_builtin_not_as_per_element_stage() {
assert!(lower_query("$.items.join(\",\")").is_none());
}
#[test]
fn lower_returns_none_for_unsupported_shape() {
assert!(lower_query("$.xs.equi_join($.ys, lhs, rhs)").is_none());
assert!(lower_query("@.x.filter(y > 0)").is_none());
}
#[test]
fn debug_filter_pred_shape() {
let expr = crate::parse::parser::parse("@.total > 100").unwrap();
let prog = crate::compile::compiler::Compiler::compile(&expr, "");
eprintln!("PRED OPS = {:#?}", prog.ops);
}
#[test]
fn method_chain_scalar_filter_lowers_from_builtin_view_metadata() {
let p = lower_query("$.people.filter(name.len() == 3).take(1).map(name)").unwrap();
assert!(matches!(
&p.stage_kernels[0],
BodyKernel::CmpLit { lhs, .. }
if matches!(
lhs.as_ref(),
BodyKernel::BuiltinCall { receiver, call }
if call.spec().view_scalar
&& matches!(receiver.as_ref(), BodyKernel::FieldRead(k) if k.as_ref() == "name")
)
));
assert!(p.stage_kernels[0].is_view_native());
}
#[test]
fn method_chain_scalar_filter_lowers_static_arg_view_builtin() {
let p = lower_query(r#"$.people.filter(name.starts_with("a")).take(1).map(name)"#).unwrap();
assert!(matches!(
&p.stage_kernels[0],
BodyKernel::BuiltinCall { receiver, call }
if call.spec().view_scalar
&& call.method == BuiltinMethod::StartsWith
&& matches!(receiver.as_ref(), BodyKernel::FieldRead(k) if k.as_ref() == "name")
));
assert!(p.stage_kernels[0].is_view_native());
}
#[test]
fn method_chain_scalar_filter_lowers_another_static_arg_view_builtin() {
let p = lower_query(r#"$.people.filter(name.ends_with("a")).take(1).map(name)"#).unwrap();
assert!(matches!(
&p.stage_kernels[0],
BodyKernel::BuiltinCall { receiver, call }
if call.spec().view_scalar
&& call.method == BuiltinMethod::EndsWith
&& matches!(receiver.as_ref(), BodyKernel::FieldRead(k) if k.as_ref() == "name")
));
assert!(p.stage_kernels[0].is_view_native());
}
#[test]
fn method_chain_scalar_filter_lowers_string_predicate_view_builtin() {
let p = lower_query(r#"$.people.filter(name.matches("ad")).take(1).map(name)"#).unwrap();
assert!(matches!(
&p.stage_kernels[0],
BodyKernel::BuiltinCall { receiver, call }
if call.spec().view_scalar
&& call.method == BuiltinMethod::Matches
&& matches!(receiver.as_ref(), BodyKernel::FieldRead(k) if k.as_ref() == "name")
));
assert!(p.stage_kernels[0].is_view_native());
}
#[test]
fn method_chain_scalar_filter_lowers_string_index_view_builtin() {
let p =
lower_query(r#"$.people.filter(name.index_of("d") >= 1).take(1).map(name)"#).unwrap();
assert!(matches!(
&p.stage_kernels[0],
BodyKernel::CmpLit { lhs, .. }
if matches!(
lhs.as_ref(),
BodyKernel::BuiltinCall { receiver, call }
if call.spec().view_scalar
&& call.method == BuiltinMethod::IndexOf
&& matches!(receiver.as_ref(), BodyKernel::FieldRead(k) if k.as_ref() == "name")
)
));
assert!(p.stage_kernels[0].is_view_native());
}
#[test]
fn method_chain_scalar_filter_lowers_no_arg_string_view_builtin() {
let p = lower_query(r#"$.people.filter(code.is_numeric()).take(1).map(code)"#).unwrap();
assert!(matches!(
&p.stage_kernels[0],
BodyKernel::BuiltinCall { receiver, call }
if call.spec().view_scalar
&& call.method == BuiltinMethod::IsNumeric
&& matches!(receiver.as_ref(), BodyKernel::FieldRead(k) if k.as_ref() == "code")
));
assert!(p.stage_kernels[0].is_view_native());
}
#[test]
fn method_chain_scalar_filter_lowers_no_arg_numeric_string_view_builtin() {
let p = lower_query(r#"$.people.filter(code.byte_len() == 3).take(1).map(code)"#).unwrap();
assert!(matches!(
&p.stage_kernels[0],
BodyKernel::CmpLit { lhs, .. }
if matches!(
lhs.as_ref(),
BodyKernel::BuiltinCall { receiver, call }
if call.spec().view_scalar
&& call.method == BuiltinMethod::ByteLen
&& matches!(receiver.as_ref(), BodyKernel::FieldRead(k) if k.as_ref() == "code")
)
));
assert!(p.stage_kernels[0].is_view_native());
}
#[test]
fn method_chain_scalar_filter_lowers_numeric_view_builtin() {
let p = lower_query(r#"$.people.filter(score.abs() > 10).take(1).map(score)"#).unwrap();
assert!(matches!(
&p.stage_kernels[0],
BodyKernel::CmpLit { lhs, .. }
if matches!(
lhs.as_ref(),
BodyKernel::BuiltinCall { receiver, call }
if call.spec().view_scalar
&& call.method == BuiltinMethod::Abs
&& matches!(receiver.as_ref(), BodyKernel::FieldRead(k) if k.as_ref() == "score")
)
));
assert!(p.stage_kernels[0].is_view_native());
}
#[test]
fn method_chain_scalar_filter_lowers_float_numeric_view_builtin() {
let p = lower_query(r#"$.people.filter(score.round() == 10).take(1).map(score)"#).unwrap();
assert!(matches!(
&p.stage_kernels[0],
BodyKernel::CmpLit { lhs, .. }
if matches!(
lhs.as_ref(),
BodyKernel::BuiltinCall { receiver, call }
if call.spec().view_scalar
&& call.method == BuiltinMethod::Round
&& matches!(receiver.as_ref(), BodyKernel::FieldRead(k) if k.as_ref() == "score")
)
));
assert!(p.stage_kernels[0].is_view_native());
}
#[test]
fn run_count_on_simple_array() {
use serde_json::json;
let doc: Val = (&json!({"orders":[
{"total": 50}, {"total": 150}, {"total": 200}
]}))
.into();
let p = lower_query("$.orders.filter(total > 100).count()").unwrap();
let out = p.run(&doc).unwrap();
assert_eq!(out, Val::Int(2));
}
#[test]
fn run_sum_on_simple_array() {
use serde_json::json;
let doc: Val = (&json!({"xs":[1, 2, 3, 4, 5]})).into();
let p = lower_query("$.xs.sum()").unwrap();
let out = p.run(&doc).unwrap();
assert_eq!(out, Val::Int(15));
}
#[test]
fn run_filter_map_sum() {
use serde_json::json;
let doc: Val = (&json!({"orders":[
{"id": 1, "total": 50},
{"id": 2, "total": 150},
{"id": 3, "total": 200}
]}))
.into();
let p = lower_query("$.orders.filter(total > 100).map(total).sum()").unwrap();
let out = p.run(&doc).unwrap();
assert_eq!(out, Val::Int(350));
}
#[test]
fn run_terminal_count_predicate() {
use serde_json::json;
let doc: Val = (&json!({"orders":[
{"total": 50}, {"total": 150}, {"total": 200}
]}))
.into();
let p = lower_query("$.orders.count(total > 100)").unwrap();
let out = p.run(&doc).unwrap();
assert_eq!(out, Val::Int(2));
}
#[test]
fn run_terminal_numeric_projection() {
use serde_json::json;
let doc: Val = (&json!({"orders":[
{"total": 50}, {"total": 150}, {"total": 200}
]}))
.into();
let p = lower_query("$.orders.sum(total)").unwrap();
let out = p.run(&doc).unwrap();
assert_eq!(out, Val::Int(400));
}
#[test]
fn rewrite_skip_skip_merges() {
let p = lower_query("$.xs.skip(2).skip(3).sum()").unwrap();
assert_eq!(p.stages.len(), 1);
assert!(matches!(
p.stages[0],
Stage::UsizeBuiltin {
method: BuiltinMethod::Skip,
value: 5
}
));
}
#[test]
fn rewrite_take_take_merges_min() {
let p = lower_query("$.xs.take(10).take(3).sum()").unwrap();
assert_eq!(p.stages.len(), 1);
assert!(matches!(
p.stages[0],
Stage::UsizeBuiltin {
method: BuiltinMethod::Take,
value: 3
}
));
}
#[test]
fn rewrite_filter_filter_merges_via_andop() {
let p = lower_query("$.orders.filter(total > 100).filter(qty > 0).sum()").unwrap();
assert_eq!(p.stages.len(), 1);
match &p.stages[0] {
Stage::Filter(prog, _) => {
assert!(prog
.ops
.iter()
.any(|o| matches!(o, crate::vm::Opcode::AndOp(_))));
}
_ => panic!("expected merged Filter"),
}
assert!(
matches!(&p.stage_kernels[0], BodyKernel::And(predicates) if predicates.len() == 2)
);
assert!(p.stage_kernels[0].is_view_native());
}
#[test]
fn predicate_fusion_collapses_long_filter_runs_after_planning() {
let p = lower_query(
"$.orders.filter(total > 100).filter(qty > 0).filter(active == true).filter(region == \"eu\").count()",
)
.unwrap();
assert_eq!(p.stages.len(), 1);
assert!(matches!(&p.stages[0], Stage::Filter(_, _)));
assert!(
matches!(&p.stage_kernels[0], BodyKernel::And(predicates) if predicates.len() == 4)
);
assert!(p.stage_kernels[0].is_view_native());
}
#[test]
fn rewrite_map_then_count_drops_map() {
let p = lower_query("$.orders.map(total).count()").unwrap();
assert_eq!(p.stages.len(), 0);
assert!(matches!(p.sink, Sink::Reducer(ref spec) if spec.op == ReducerOp::Count));
}
#[test]
fn demand_optimizer_drops_value_only_work_for_count() {
let p = lower_query("$.orders.map(total).upper().count()").unwrap();
assert!(p.stages.is_empty());
assert!(matches!(p.sink, Sink::Reducer(ref spec) if spec.op == ReducerOp::Count));
}
#[test]
fn terminal_count_predicate_lowers_to_reducer_predicate() {
let p = lower_query("$.orders.count(total > 10)").unwrap();
assert!(p.stages.is_empty());
assert!(
matches!(&p.sink, Sink::Reducer(spec) if spec.op == ReducerOp::Count && spec.predicate.is_some())
);
}
#[test]
fn terminal_numeric_projection_lowers_to_reducer_projection() {
let p = lower_query("$.orders.sum(total)").unwrap();
assert!(p.stages.is_empty());
assert!(
matches!(&p.sink, Sink::Reducer(spec) if spec.op == ReducerOp::Numeric(NumOp::Sum) && spec.projection.is_some())
);
}
#[test]
fn demand_optimizer_pulls_filter_through_map_for_count() {
let p = lower_query("$.orders.map(total).filter(@ > 10).count()").unwrap();
assert_eq!(p.stages.len(), 1);
assert!(matches!(p.stages[0], Stage::Filter(_, _)));
assert!(matches!(p.sink, Sink::Reducer(ref spec) if spec.op == ReducerOp::Count));
}
#[test]
fn demand_optimizer_pulls_filter_through_computed_map_for_count() {
let p = lower_query("$.orders.map(price * qty).filter(@ > 100).count()").unwrap();
assert_eq!(p.stages.len(), 1);
assert!(matches!(p.stages[0], Stage::Filter(_, _)));
assert_price_qty_gt_100(only_stage_expr(&p));
assert!(matches!(p.sink, Sink::Reducer(ref spec) if spec.op == ReducerOp::Count));
}
#[test]
fn demand_optimizer_pulls_filter_through_method_chain_map_for_count() {
let p =
lower_query("$.users.map(name.trim().upper()).filter(@ == \"ADA\").count()").unwrap();
assert_eq!(p.stages.len(), 1);
assert!(matches!(p.stages[0], Stage::Filter(_, _)));
assert!(matches!(p.sink, Sink::Reducer(ref spec) if spec.op == ReducerOp::Count));
}
#[test]
fn demand_optimizer_simplifies_object_projection_after_substitution() {
let p = lower_query("$.orders.map({v: price * qty, id: id}).filter(@.v > 100).count()")
.unwrap();
assert_eq!(p.stages.len(), 1);
assert!(matches!(p.stages[0], Stage::Filter(_, _)));
assert_price_qty_gt_100(only_stage_expr(&p));
assert!(matches!(p.sink, Sink::Reducer(ref spec) if spec.op == ReducerOp::Count));
}
#[test]
fn demand_optimizer_simplifies_array_projection_after_substitution() {
let p = lower_query("$.orders.map([price * qty, id]).filter(@[0] > 100).count()").unwrap();
assert_eq!(p.stages.len(), 1);
assert!(matches!(p.stages[0], Stage::Filter(_, _)));
assert_price_qty_gt_100(only_stage_expr(&p));
assert!(matches!(p.sink, Sink::Reducer(ref spec) if spec.op == ReducerOp::Count));
}
#[test]
fn demand_optimizer_pulls_filter_through_map_but_keeps_map_for_collect() {
let p = lower_query("$.orders.map(total).filter(@ > 10)").unwrap();
assert_eq!(p.stages.len(), 2);
assert!(matches!(p.stages[0], Stage::Filter(_, _)));
assert!(matches!(p.stages[1], Stage::Map(_, _)));
assert!(matches!(p.sink, Sink::Collect));
}
#[test]
fn demand_optimizer_removes_order_only_work_for_numeric_sink() {
let p = lower_query("$.orders.sort().reverse().map(total).sum()").unwrap();
assert!(p.stages.is_empty());
assert!(
matches!(&p.sink, Sink::Reducer(spec) if spec.op == ReducerOp::Numeric(NumOp::Sum) && spec.projection.is_some())
);
}
#[test]
fn demand_optimizer_keeps_membership_work_for_numeric_sink() {
let p = lower_query("$.orders.take(2).map(total).sum()").unwrap();
assert_eq!(p.stages.len(), 1);
assert!(matches!(
p.stages[0],
Stage::UsizeBuiltin {
method: BuiltinMethod::Take,
value: 2
}
));
assert!(
matches!(&p.sink, Sink::Reducer(spec) if spec.op == ReducerOp::Numeric(NumOp::Sum) && spec.projection.is_some())
);
}
#[test]
fn demand_optimizer_computed_map_filter_count_matches_vm() {
use serde_json::json;
assert_pipeline_matches_vm(
"$.orders.map(price * qty).filter(@ > 100).count()",
json!({
"orders": [
{"price": 10, "qty": 3},
{"price": 25, "qty": 5},
{"price": 8, "qty": 20},
{"price": 50, "qty": 1}
]
}),
);
}
#[test]
fn demand_optimizer_object_projection_filter_count_matches_vm() {
use serde_json::json;
assert_pipeline_matches_vm(
"$.orders.map({v: price * qty, id: id}).filter(@.v > 100).count()",
json!({
"orders": [
{"id": "a", "price": 10, "qty": 3},
{"id": "b", "price": 25, "qty": 5},
{"id": "c", "price": 8, "qty": 20},
{"id": "d", "price": 50, "qty": 1}
]
}),
);
}
#[test]
fn demand_optimizer_array_projection_filter_count_matches_vm() {
use serde_json::json;
assert_pipeline_matches_vm(
"$.orders.map([price * qty, id]).filter(@[0] > 100).count()",
json!({
"orders": [
{"id": "a", "price": 10, "qty": 3},
{"id": "b", "price": 25, "qty": 5},
{"id": "c", "price": 8, "qty": 20},
{"id": "d", "price": 50, "qty": 1}
]
}),
);
}
#[test]
fn demand_optimizer_order_removal_numeric_sink_matches_vm() {
use serde_json::json;
assert_pipeline_matches_vm(
"$.orders.sort().reverse().map(total).sum()",
json!({
"orders": [
{"id": 1, "total": 7},
{"id": 2, "total": 40},
{"id": 3, "total": -2},
{"id": 4, "total": 9}
]
}),
);
}
#[test]
fn demand_optimizer_projected_numeric_sink_with_take_matches_vm() {
use serde_json::json;
assert_pipeline_matches_vm_query(
"$.orders.take(2).map(total).sum()",
"$.orders.first(2).map(total).sum()",
json!({
"orders": [
{"id": 1, "total": 7},
{"id": 2, "total": 40},
{"id": 3, "total": -2},
{"id": 4, "total": 9}
]
}),
);
}
#[test]
fn run_topn_smallest_three() {
use serde_json::json;
let doc: Val = (&json!({"xs":[5, 2, 8, 1, 4, 7, 3]})).into();
let p = lower_query("$.xs.sort().take(3)").unwrap();
let out = p.run(&doc).unwrap();
let out_json: serde_json::Value = out.into();
assert_eq!(out_json, json!([1, 2, 3]));
}
#[test]
fn sort_key_spellings_lower_to_canonical_sort_spec() {
let p = lower_query("$.rows.sort(-score).take(10)").unwrap();
assert!(matches!(&p.stages[0], Stage::Sort(spec) if spec.key.is_some() && spec.descending));
assert!(
matches!(p.stage_kernels[0], BodyKernel::FieldRead(ref k) if k.as_ref() == "score")
);
let p = lower_query("$.rows.sort_by(-score).take(10)").unwrap();
assert!(matches!(&p.stages[0], Stage::Sort(spec) if spec.key.is_some() && spec.descending));
assert!(
matches!(p.stage_kernels[0], BodyKernel::FieldRead(ref k) if k.as_ref() == "score")
);
}
#[test]
fn descending_sort_take_uses_bounded_topk_strategy_and_matches_vm() {
use serde_json::json;
let p = lower_query("$.rows.sort_by(-score).take(2)").unwrap();
let strategies = compute_strategies(&p.stages, &p.sink);
assert!(matches!(strategies[0], StageStrategy::SortTopK(2)));
assert_pipeline_matches_vm_query(
"$.rows.sort_by(-score).take(2)",
"$.rows.sort(-score).first(2)",
json!({
"rows": [
{"id": 1, "score": 10},
{"id": 2, "score": 30},
{"id": 3, "score": 20}
]
}),
);
}
#[test]
fn sort_take_while_take_uses_prefix_demand_without_key_correlation() {
let p = lower_query("$.rows.sort_by(-price).take_while(price > 10).take(2)").unwrap();
let strategies = compute_strategies_with_kernels(&p.stages, &p.stage_kernels, &p.sink);
assert!(matches!(strategies[0], StageStrategy::SortTopK(2)));
let p = lower_query("$.rows.sort_by(-score).take_while(price > 10).take(2)").unwrap();
let strategies = compute_strategies_with_kernels(&p.stages, &p.stage_kernels, &p.sink);
assert!(matches!(strategies[0], StageStrategy::SortTopK(2)));
let p = lower_query("$.rows.sort().take_while(price > 10).take(2)").unwrap();
let strategies = compute_strategies_with_kernels(&p.stages, &p.stage_kernels, &p.sink);
assert!(matches!(strategies[0], StageStrategy::SortTopK(2)));
let p = lower_query("$.rows.sort_by(-score).filter(price > 10).take(2)").unwrap();
let strategies = compute_strategies_with_kernels(&p.stages, &p.stage_kernels, &p.sink);
assert!(matches!(strategies[0], StageStrategy::SortUntilOutput(2)));
}
#[test]
fn sort_filter_take_uses_lazy_ordered_until_output_and_matches_vm() {
use serde_json::json;
let p = lower_query("$.rows.sort_by(-price).filter(test > 10).take(2)").unwrap();
let strategies = compute_strategies_with_kernels(&p.stages, &p.stage_kernels, &p.sink);
assert!(matches!(strategies[0], StageStrategy::SortUntilOutput(2)));
assert_pipeline_matches_vm_query(
"$.rows.sort_by(-price).filter(test > 10).take(2)",
"$.rows.sort(-price).filter(test > 10).first(2)",
json!({
"rows": [
{"id": 1, "price": 100, "test": 0},
{"id": 2, "price": 90, "test": 20},
{"id": 3, "price": 80, "test": 0},
{"id": 4, "price": 70, "test": 30}
]
}),
);
}
#[test]
fn sort_filter_map_last_uses_lazy_tail_scan_and_matches_vm() {
use serde_json::json;
let query = "$.rows.sort(-score).filter(price > 20).map(isbn).last()";
let p = lower_query(query).unwrap();
let strategies = compute_strategies_with_kernels(&p.stages, &p.stage_kernels, &p.sink);
assert!(matches!(strategies[0], StageStrategy::SortUntilOutput(1)));
assert_pipeline_matches_vm(
query,
json!({
"rows": [
{"isbn": "top-fails", "score": 100, "price": 10},
{"isbn": "answer", "score": 90, "price": 30},
{"isbn": "tail-fails", "score": 80, "price": 5}
]
}),
);
}
#[test]
fn sort_prefix_filter_last_does_not_shrink_before_filtering() {
use serde_json::json;
let query = "$.rows.sort(-score).filter(score > 80).map(isbn).last()";
let p = lower_query(query).unwrap();
let strategies = compute_strategies_with_kernels(&p.stages, &p.stage_kernels, &p.sink);
assert!(matches!(strategies[0], StageStrategy::SortUntilOutput(1)));
assert_pipeline_matches_vm(
query,
json!({
"rows": [
{"isbn": "top", "score": 100},
{"isbn": "answer", "score": 90},
{"isbn": "tail-fails", "score": 70}
]
}),
);
}
#[test]
fn sort_map_last_can_still_use_bounded_bottomk() {
use serde_json::json;
let query = "$.rows.sort(-score).map(isbn).last()";
let p = lower_query(query).unwrap();
let strategies = compute_strategies_with_kernels(&p.stages, &p.stage_kernels, &p.sink);
assert!(matches!(strategies[0], StageStrategy::SortBottomK(1)));
assert_pipeline_matches_vm(
query,
json!({
"rows": [
{"isbn": "top", "score": 100},
{"isbn": "answer", "score": 70},
{"isbn": "mid", "score": 90}
]
}),
);
}
#[test]
fn adversarial_bounded_demand_chains_match_vm() {
use serde_json::json;
let doc = json!({
"rows": [
{"isbn": "top-fails-price", "score": 100, "price": 10, "tags": ["a", "b"]},
{"isbn": "high-pass", "score": 90, "price": 30, "tags": []},
{"isbn": "middle-pass", "score": 80, "price": 40, "tags": ["c"]},
{"isbn": "low-fails-price", "score": 70, "price": 5, "tags": ["d", "e"]},
{"isbn": "tail-pass", "score": 60, "price": 50, "tags": ["f"]}
],
"dups": ["a", "b", "a", "c", "b"]
});
for (pipeline_query, vm_query) in [
(
"$.rows.filter(price > 20).map(isbn).last()",
"$.rows.filter(price > 20).map(isbn).last()",
),
(
"$.rows.map(price).filter(@ > 20).last()",
"$.rows.map(price).filter(@ > 20).last()",
),
(
"$.rows.take(2).filter(price > 20).last()",
"$.rows.first(2).filter(price > 20).last()",
),
(
"$.rows.take_while(price > 20).last()",
"$.rows.take_while(price > 20).last()",
),
(
"$.rows.drop_while(price < 20).last()",
"$.rows.drop_while(price < 20).last()",
),
(
"$.rows.sort(-score).filter(price > 20).map(isbn).last()",
"$.rows.sort(-score).filter(price > 20).map(isbn).last()",
),
(
"$.rows.sort(score).filter(price > 20).map(isbn).last()",
"$.rows.sort(score).filter(price > 20).map(isbn).last()",
),
(
"$.rows.sort(-score).filter(score > 80).map(isbn).last()",
"$.rows.sort(-score).filter(score > 80).map(isbn).last()",
),
(
"$.rows.sort(-score).filter(score < 80).map(isbn).last()",
"$.rows.sort(-score).filter(score < 80).map(isbn).last()",
),
(
"$.rows.sort(-score).filter(price > 20).map(isbn).nth(1)",
"$.rows.sort(-score).filter(price > 20).map(isbn).nth(1)",
),
(
"$.rows.sort(-score).take(3).filter(price > 20).map(isbn).last()",
"$.rows.sort(-score).first(3).filter(price > 20).map(isbn).last()",
),
(
"$.rows.reverse().filter(price > 20).map(isbn).last()",
"$.rows.reverse().filter(price > 20).map(isbn).last()",
),
("$.dups.unique().last()", "$.dups.unique().last()"),
] {
assert_pipeline_matches_vm_query(pipeline_query, vm_query, doc.clone());
}
let pipeline = lower_query("$.rows.skip(1).last()").unwrap();
let actual: serde_json::Value = pipeline.run(&Val::from(&doc)).unwrap().into();
assert_eq!(
actual,
json!({"isbn": "tail-pass", "score": 60, "price": 50, "tags": ["f"]})
);
let pipeline = lower_query("$.rows.flat_map(tags).last()").unwrap();
let actual: serde_json::Value = pipeline.run(&Val::from(&doc)).unwrap().into();
assert_eq!(actual, json!("f"));
let pipeline = lower_query("$.rows.sort(-score).flat_map(tags).last()").unwrap();
let actual: serde_json::Value = pipeline.run(&Val::from(&doc)).unwrap().into();
assert_eq!(actual, json!("f"));
}
#[test]
fn first_sink_stops_after_first_passing_filter_row() {
use serde_json::json;
let doc: Val = (&json!({
"data": [
{"score": 901, "bad": 0},
{"score": 1, "bad": "not a number"}
]
}))
.into();
let p = lower_query("$.data.filter(score > 900 or 1 / 0 > 0).first()").unwrap();
let out = p.run(&doc).unwrap();
let out_json: serde_json::Value = out.into();
assert_eq!(out_json, json!({"score": 901, "bad": 0}));
}
#[test]
fn take_before_filter_caps_upstream_inputs() {
use serde_json::json;
let doc: Val = (&json!({
"data": [
{"score": 1},
{"score": 2},
{"score": 901}
]
}))
.into();
let p = lower_query("$.data.take(2).filter(score > 900).first()").unwrap();
let demand = p.source_demand();
assert_eq!(
demand.chain.pull,
crate::plan::demand::PullDemand::FirstInput(2)
);
let out = p.run(&doc).unwrap();
assert_eq!(out, Val::Null);
}
#[test]
fn filter_take_collect_stops_after_required_outputs() {
use serde_json::json;
let doc: Val = (&json!({
"data": [
{"score": 901, "bad": 0},
{"score": 902, "bad": 0},
{"score": 1, "bad": "not a number"}
]
}))
.into();
let p = lower_query("$.data.filter(score > 900 or 1 / 0 > 0).take(2)").unwrap();
let demand = p.source_demand();
assert_eq!(
demand.chain.pull,
crate::plan::demand::PullDemand::UntilOutput(2)
);
let out = p.run(&doc).unwrap();
let out_json: serde_json::Value = out.into();
assert_eq!(
out_json,
json!([{"score": 901, "bad": 0}, {"score": 902, "bad": 0}])
);
}
#[test]
fn last_sink_requests_reverse_input_when_available() {
use serde_json::json;
let doc: Val = (&json!({
"data": [
{"score": 901},
{"score": 902}
]
}))
.into();
let p = lower_query("$.data.filter(score > 900).last()").unwrap();
let demand = p.source_demand();
assert_eq!(
demand.chain.pull,
crate::plan::demand::PullDemand::LastInput(1)
);
let out = p.run(&doc).unwrap();
let out_json: serde_json::Value = out.into();
assert_eq!(out_json, json!({"score": 902}));
}
#[test]
fn reverse_propagates_positional_demand() {
use serde_json::json;
let doc: Val = (&json!({"xs": [10, 20, 30, 40]})).into();
let first = lower_query("$.xs.reverse().first()").unwrap();
assert_eq!(
first.source_demand().chain.pull,
crate::plan::demand::PullDemand::LastInput(1)
);
assert_eq!(first.run(&doc).unwrap(), Val::Int(40));
let last = lower_query("$.xs.reverse().last()").unwrap();
assert_eq!(
last.source_demand().chain.pull,
crate::plan::demand::PullDemand::FirstInput(1)
);
assert_eq!(last.run(&doc).unwrap(), Val::Int(10));
let take = lower_query("$.xs.reverse().take(2)").unwrap();
assert_eq!(
take.source_demand().chain.pull,
crate::plan::demand::PullDemand::LastInput(2)
);
let take_json: serde_json::Value = take.run(&doc).unwrap().into();
assert_eq!(take_json, json!([40, 30]));
}
#[test]
fn nth_sink_requests_indexed_input_when_available() {
use serde_json::json;
let doc: Val = (&json!({
"data": [
{"score": 901},
{"score": 902},
{"score": 903}
]
}))
.into();
let p = lower_query("$.data.map(score).nth(1)").unwrap();
let demand = p.source_demand();
assert_eq!(
demand.chain.pull,
crate::plan::demand::PullDemand::NthInput(1)
);
let out = p.run(&doc).unwrap();
let out_json: serde_json::Value = out.into();
assert_eq!(out_json, json!(902));
}
#[test]
fn positional_many_terminal_sinks_match_runtime_semantics() {
use serde_json::json;
let root = Val::from(&json!({"xs": [1, 2, 3, 4]}));
let first_one: serde_json::Value =
lower_query("$.xs.first(1)").unwrap().run(&root).unwrap().into();
assert_eq!(first_one, json!(1));
let first_two: serde_json::Value =
lower_query("$.xs.first(2)").unwrap().run(&root).unwrap().into();
assert_eq!(first_two, json!([1, 2]));
let last_one: serde_json::Value =
lower_query("$.xs.last(1)").unwrap().run(&root).unwrap().into();
assert_eq!(last_one, json!(4));
let last_two: serde_json::Value =
lower_query("$.xs.last(2)").unwrap().run(&root).unwrap().into();
assert_eq!(last_two, json!([3, 4]));
let mapped_first: serde_json::Value = lower_query("$.xs.map(@ + 1).first(2)")
.unwrap()
.run(&root)
.unwrap()
.into();
assert_eq!(mapped_first, json!([2, 3]));
let mapped_last: serde_json::Value = lower_query("$.xs.map(@ + 1).last(2)")
.unwrap()
.run(&root)
.unwrap()
.into();
assert_eq!(mapped_last, json!([4, 5]));
}
#[test]
fn positional_many_terminal_sinks_use_indexed_dispatch_for_indexed_chains() {
let first = lower_query("$.xs.map(@ + 1).first(2)").unwrap();
assert_eq!(first.exec_path, PhysicalExecPath::Indexed);
let last = lower_query("$.xs.map(@ + 1).last(2)").unwrap();
assert_eq!(last.exec_path, PhysicalExecPath::Indexed);
}
#[test]
fn positional_many_terminal_sinks_propagate_bounded_demand() {
let first = lower_query("$.xs.first(3)").unwrap();
assert!(matches!(
first.sink,
Sink::SelectMany {
n: 3,
from_end: false
}
));
assert_eq!(
first.source_demand().chain.pull,
crate::plan::demand::PullDemand::FirstInput(3)
);
let last = lower_query("$.xs.last(3)").unwrap();
assert!(matches!(
last.sink,
Sink::SelectMany {
n: 3,
from_end: true
}
));
assert_eq!(
last.source_demand().chain.pull,
crate::plan::demand::PullDemand::LastInput(3)
);
}
#[test]
fn filter_nth_sink_keeps_filtered_semantics() {
use serde_json::json;
let doc: Val = (&json!({
"data": [
{"score": 1},
{"score": 901},
{"score": 902},
{"score": 903}
]
}))
.into();
let p = lower_query("$.data.filter(score > 900).map(score).nth(1)").unwrap();
let demand = p.source_demand();
assert_eq!(demand.chain.pull, crate::plan::demand::PullDemand::All);
let out = p.run(&doc).unwrap();
let out_json: serde_json::Value = out.into();
assert_eq!(out_json, json!(902));
}
#[test]
fn compact_first_and_last_are_filter_like() {
use serde_json::json;
let doc: Val = (&json!({"xs": [null, 10, null, 20]})).into();
let first = lower_query("$.xs.compact().first()").unwrap();
assert_eq!(
first.source_demand().chain.pull,
crate::plan::demand::PullDemand::UntilOutput(1)
);
assert_eq!(first.run(&doc).unwrap(), Val::Int(10));
let last = lower_query("$.xs.compact().last()").unwrap();
assert_eq!(
last.source_demand().chain.pull,
crate::plan::demand::PullDemand::LastInput(1)
);
assert_eq!(last.run(&doc).unwrap(), Val::Int(20));
}
#[test]
fn remove_first_and_last_are_filter_like() {
use serde_json::json;
let doc: Val = (&json!({"xs": [2, 1, 2, 3, 2]})).into();
let collect = lower_query("$.xs.remove(2)").unwrap();
let collect_json: serde_json::Value = collect.run(&doc).unwrap().into();
assert_eq!(collect_json, json!([1, 3]));
let first = lower_query("$.xs.remove(2).first()").unwrap();
assert_eq!(
first.source_demand().chain.pull,
crate::plan::demand::PullDemand::UntilOutput(1)
);
assert_eq!(first.run(&doc).unwrap(), Val::Int(1));
let last = lower_query("$.xs.remove(2).last()").unwrap();
assert_eq!(
last.source_demand().chain.pull,
crate::plan::demand::PullDemand::LastInput(1)
);
assert_eq!(last.run(&doc).unwrap(), Val::Int(3));
}
#[test]
fn find_first_lowers_to_filter_first_demand() {
use serde_json::json;
let doc: Val = (&json!({
"xs": [
{"score": 1},
{"score": 9},
{"score": 11}
]
}))
.into();
let p = lower_query("$.xs.find_first(score > 5)").unwrap();
assert!(matches!(p.sink, Sink::Terminal(BuiltinMethod::First)));
assert_eq!(
p.source_demand().chain.pull,
crate::plan::demand::PullDemand::UntilOutput(1)
);
let out_json: serde_json::Value = p.run(&doc).unwrap().into();
assert_eq!(out_json, json!({"score": 9}));
}
#[test]
fn rewrite_filter_const_true_dropped() {
let p = lower_query("$.xs.filter(true).count()").unwrap();
assert_eq!(p.stages.len(), 0);
assert!(matches!(p.sink, Sink::Reducer(ref spec) if spec.op == ReducerOp::Count));
}
#[test]
fn run_take_skip() {
use serde_json::json;
let doc: Val = (&json!({"xs":[10, 20, 30, 40, 50]})).into();
let p = lower_query("$.xs.skip(1).take(2).sum()").unwrap();
let out = p.run(&doc).unwrap();
assert_eq!(out, Val::Int(50));
}
#[test]
fn path_slice_lowers_to_bounded_positional_stages() {
use serde_json::json;
let doc: Val = (&json!({"xs": [10, 20, 30, 40, 50]})).into();
let p = lower_query("$.xs[0:3].last()").unwrap();
assert_eq!(
p.source_demand().chain.pull,
crate::plan::demand::PullDemand::FirstInput(3)
);
assert_eq!(p.run(&doc).unwrap(), Val::Int(30));
let p = lower_query("$.xs[2:].take(2)").unwrap();
assert_eq!(
p.source_demand().chain.pull,
crate::plan::demand::PullDemand::FirstInput(4)
);
let out: serde_json::Value = p.run(&doc).unwrap().into();
assert_eq!(out, json!([30, 40]));
assert!(lower_query("$.xs[-2:].first()").is_none());
}
#[test]
fn scalar_slice_preserves_pipeline_demand() {
use serde_json::json;
let doc: Val = (&json!({"rows": ["abc", "def"]})).into();
let p = lower_query("$.rows.slice(0, 2).last()").unwrap();
assert_eq!(
p.source_demand().chain.pull,
crate::plan::demand::PullDemand::LastInput(1)
);
let out: serde_json::Value = p.run(&doc).unwrap().into();
assert_eq!(out, json!("de"));
}
#[test]
fn chunk_and_window_bounded_demand_match_vm() {
use serde_json::json;
let doc: Val = (&json!({"xs": [1, 2, 3, 4, 5, 6, 7, 8]})).into();
let p = lower_query("$.xs.chunk(3).take(2)").unwrap();
assert_eq!(
p.source_demand().chain.pull,
crate::plan::demand::PullDemand::FirstInput(6)
);
let out: serde_json::Value = p.run(&doc).unwrap().into();
assert_eq!(out, json!([[1, 2, 3], [4, 5, 6]]));
let p = lower_query("$.xs.window(3).take(2)").unwrap();
assert_eq!(
p.source_demand().chain.pull,
crate::plan::demand::PullDemand::FirstInput(4)
);
let out: serde_json::Value = p.run(&doc).unwrap().into();
assert_eq!(out, json!([[1, 2, 3], [2, 3, 4]]));
let p = lower_query("$.xs.window(3).last()").unwrap();
assert_eq!(
p.source_demand().chain.pull,
crate::plan::demand::PullDemand::All
);
let out: serde_json::Value = p.run(&doc).unwrap().into();
assert_eq!(out, json!([6, 7, 8]));
}
#[test]
fn predicate_terminal_sinks_match_vm() {
use serde_json::json;
let doc = json!({
"xs": [
{"score": 1, "isbn": "a"},
{"score": 9, "isbn": "b"},
{"score": 11, "isbn": "c"}
]
});
for query in [
"$.xs.any(score > 10)",
"$.xs.exists(score > 10)",
"$.xs.all(score > 0)",
"$.xs.find_index(score > 10)",
"$.xs.indices_where(score > 5)",
"$.xs.map(isbn).find_index(@ == \"c\")",
] {
assert_pipeline_matches_vm(query, doc.clone());
}
let pipeline = lower_query("$.xs.find_one(score == 9)").unwrap();
let root = Val::from(&doc);
let actual: serde_json::Value = pipeline.run(&root).unwrap().into();
assert_eq!(actual["isbn"], json!("b"));
}
#[test]
fn predicate_terminal_sinks_keep_conservative_source_demand() {
let any = lower_query("$.xs.any(@ > 2)").unwrap();
assert!(matches!(any.sink, Sink::Predicate(ref spec) if spec.op == PredicateSinkOp::Any));
assert_eq!(
any.source_demand().chain.pull,
crate::plan::demand::PullDemand::All
);
assert_eq!(
any.source_demand().chain.value,
crate::plan::demand::ValueNeed::Predicate
);
let all = lower_query("$.xs.all(@ > 2)").unwrap();
assert!(matches!(all.sink, Sink::Predicate(ref spec) if spec.op == PredicateSinkOp::All));
assert_eq!(
all.source_demand().chain.pull,
crate::plan::demand::PullDemand::All
);
let find_index = lower_query("$.xs.find_index(@ > 2)").unwrap();
assert!(
matches!(find_index.sink, Sink::Predicate(ref spec) if spec.op == PredicateSinkOp::FindIndex)
);
assert_eq!(
find_index.source_demand().chain.pull,
crate::plan::demand::PullDemand::All
);
let indices_where = lower_query("$.xs.indices_where(@ > 2)").unwrap();
assert!(
matches!(indices_where.sink, Sink::Predicate(ref spec) if spec.op == PredicateSinkOp::IndicesWhere)
);
assert_eq!(
indices_where.source_demand().chain.value,
crate::plan::demand::ValueNeed::Predicate
);
let find_one = lower_query("$.xs.find_one(@ > 2)").unwrap();
assert!(matches!(find_one.sink, Sink::Predicate(ref spec) if spec.op == PredicateSinkOp::FindOne));
assert_eq!(
find_one.source_demand().chain.pull,
crate::plan::demand::PullDemand::All
);
assert_eq!(
find_one.source_demand().chain.value,
crate::plan::demand::ValueNeed::Whole
);
}
#[test]
fn find_one_terminal_sink_errors_on_zero_or_multiple_matches() {
use serde_json::json;
let doc = json!({
"xs": [
{"score": 1},
{"score": 9},
{"score": 11}
]
});
let root = Val::from(&doc);
let zero = lower_query("$.xs.find_one(score > 100)").unwrap();
assert!(zero.run(&root).is_err());
let multiple = lower_query("$.xs.find_one(score > 5)").unwrap();
assert!(multiple.run(&root).is_err());
}
#[test]
fn membership_terminal_sinks_match_vm() {
use serde_json::json;
let doc = json!({
"xs": ["a", "urgent", "x", "urgent"],
"needle": "urgent",
"s": "hello",
"substring": "ell",
"obj": {"urgent": true}
});
for query in [
"$.xs.includes(\"urgent\")",
"$.xs.contains(\"urgent\")",
"$.xs.index(\"urgent\")",
"$.xs.indices_of(\"urgent\")",
"$.xs.includes($.needle)",
"$.xs.index($.needle)",
"$.xs.indices_of($.needle)",
"$.xs.map(@).includes(\"x\")",
"$.xs.map(@).includes($.needle)",
"$.s.includes(\"ell\")",
"$.s.includes($.substring)",
"$.obj.includes(\"urgent\")",
] {
assert_pipeline_matches_vm(query, doc.clone());
}
}
#[test]
fn membership_terminal_sinks_keep_conservative_source_demand() {
let includes = lower_query("$.xs.includes(\"urgent\")").unwrap();
assert!(
matches!(includes.sink, Sink::Membership(ref spec) if spec.op == MembershipSinkOp::Includes)
);
assert_eq!(
includes.source_demand().chain.pull,
crate::plan::demand::PullDemand::All
);
assert_eq!(
includes.source_demand().chain.value,
crate::plan::demand::ValueNeed::Whole
);
let index = lower_query("$.xs.index(\"urgent\")").unwrap();
assert!(matches!(index.sink, Sink::Membership(ref spec) if spec.op == MembershipSinkOp::Index));
let indices = lower_query("$.xs.indices_of(\"urgent\")").unwrap();
assert!(
matches!(indices.sink, Sink::Membership(ref spec) if spec.op == MembershipSinkOp::IndicesOf)
);
let dynamic = lower_query("$.xs.includes($.needle)").unwrap();
assert!(
matches!(dynamic.sink, Sink::Membership(ref spec)
if matches!(spec.target, MembershipSinkTarget::Program(_)))
);
}
#[test]
fn arg_extreme_terminal_sinks_match_vm() {
use serde_json::json;
let doc = json!({
"xs": [
{"title": "A", "score": 1, "price": 20},
{"title": "B", "score": 9, "price": 10},
{"title": "C", "score": 9, "price": 30}
],
"names": ["a", "abc", "ab"]
});
for query in [
"$.xs.max_by(score)",
"$.xs.min_by(price)",
"$.names.max_by(@.len())",
"$.names.min_by(@.len())",
"$.xs.map(@).max_by(score)",
] {
assert_pipeline_matches_vm(query, doc.clone());
}
}
#[test]
fn arg_extreme_terminal_sinks_keep_conservative_source_demand() {
let max_by = lower_query("$.xs.max_by(score)").unwrap();
assert!(matches!(max_by.sink, Sink::ArgExtreme(ref spec) if spec.want_max));
assert_eq!(
max_by.source_demand().chain.pull,
crate::plan::demand::PullDemand::All
);
assert_eq!(
max_by.source_demand().chain.value,
crate::plan::demand::ValueNeed::Whole
);
assert!(max_by.source_demand().chain.order);
let min_by = lower_query("$.xs.min_by(score)").unwrap();
assert!(matches!(min_by.sink, Sink::ArgExtreme(ref spec) if !spec.want_max));
}
}