use chrono::{Datelike, NaiveDateTime};
use locustdb_derive::ASTBuilder;
use regex;
use regex::Regex;
use crate::QueryError;
use crate::engine::*;
use crate::ingest::raw_val::RawVal;
use crate::mem_store::*;
use crate::mem_store::column::DataSource;
use crate::mem_store::value::Val;
use std::collections::HashMap;
use std::i64;
use std::result::Result;
use std::sync::Arc;
use crate::syntax::expression::*;
#[derive(Debug, Clone, ASTBuilder)]
pub enum QueryPlan {
ColumnSection {
name: String,
section: usize,
#[nohash]
range: Option<(i64, i64)>,
#[output(t = "base=provided")]
column_section: TypedBufferRef,
},
Connect {
input: TypedBufferRef,
output: TypedBufferRef,
},
AssembleNullable {
data: TypedBufferRef,
present: BufferRef<u8>,
#[output(t = "base=data;null=_always")]
nullable: TypedBufferRef,
},
PropagateNullability {
nullable: TypedBufferRef,
data: TypedBufferRef,
#[output(t = "base=data;null=_always")]
nullable_data: TypedBufferRef,
},
CombineNullMaps {
lhs: TypedBufferRef,
rhs: TypedBufferRef,
#[output]
present: BufferRef<u8>,
},
GetNullMap {
nullable: TypedBufferRef,
#[output]
present: BufferRef<u8>,
},
MakeNullable {
data: TypedBufferRef,
#[internal]
present: BufferRef<u8>,
#[output(t = "base=data;null=_always")]
nullable: TypedBufferRef,
},
FuseNulls {
nullable: TypedBufferRef,
#[output(t = "base=nullable;null=_fused")]
fused: TypedBufferRef,
},
FuseIntNulls {
offset: i64,
nullable: TypedBufferRef,
#[output(t = "base=nullable;null=_never")]
fused: TypedBufferRef,
},
UnfuseNulls {
fused: TypedBufferRef,
#[internal(t = "base=fused;null=_never")]
data: TypedBufferRef,
#[internal]
present: BufferRef<u8>,
#[output(t = "base=fused;null=_always")]
unfused: TypedBufferRef,
},
UnfuseIntNulls {
offset: i64,
fused: TypedBufferRef,
#[internal(t = "base=fused;null=_never")]
data: TypedBufferRef,
#[internal]
present: BufferRef<u8>,
#[output(t = "base=fused;null=_always")]
unfused: TypedBufferRef,
},
IsNull {
plan: BufferRef<Nullable<Any>>,
#[output]
is_null: BufferRef<u8>,
},
IsNotNull {
plan: BufferRef<Nullable<Any>>,
#[output]
is_not_null: BufferRef<u8>,
},
DictLookup {
indices: TypedBufferRef,
offset_len: BufferRef<u64>,
backing_store: BufferRef<u8>,
#[output(t = "base=str;null=indices")]
decoded: TypedBufferRef,
},
InverseDictLookup {
offset_len: BufferRef<u64>,
backing_store: BufferRef<u8>,
constant: BufferRef<Scalar<&'static str>>,
#[output]
decoded: BufferRef<Scalar<i64>>,
},
Cast {
input: TypedBufferRef,
#[output(t = "base=provided;null=input")]
casted: TypedBufferRef,
},
LZ4Decode {
bytes: BufferRef<u8>,
decoded_len: usize,
#[output(t = "base=provided")]
decoded: TypedBufferRef,
},
UnpackStrings {
bytes: BufferRef<u8>,
#[output]
unpacked_strings: BufferRef<&'static str>,
},
UnhexpackStrings {
bytes: BufferRef<u8>,
uppercase: bool,
total_bytes: usize,
#[internal]
string_store: BufferRef<u8>,
#[output]
unpacked_strings: BufferRef<&'static str>,
},
DeltaDecode {
plan: TypedBufferRef,
#[output]
delta_decoded: BufferRef<i64>,
},
HashMapGrouping {
raw_grouping_key: TypedBufferRef,
max_cardinality: usize,
#[output(t = "base=raw_grouping_key")]
unique: TypedBufferRef,
#[output]
grouping_key: BufferRef<u32>,
#[output]
cardinality: BufferRef<Scalar<i64>>,
},
HashMapGroupingValRows {
raw_grouping_key: BufferRef<ValRows<'static>>,
columns: usize,
max_cardinality: usize,
#[output]
unique: BufferRef<ValRows<'static>>,
#[output]
grouping_key: BufferRef<u32>,
#[output]
cardinality: BufferRef<Scalar<i64>>,
},
Exists {
indices: TypedBufferRef,
max_index: BufferRef<Scalar<i64>>,
#[output]
exists: BufferRef<u8>,
},
NonzeroCompact {
plan: TypedBufferRef,
#[output(t = "base=plan")]
compacted: TypedBufferRef,
},
NonzeroIndices {
plan: TypedBufferRef,
#[output(t = "base=provided")]
nonzero_indices: TypedBufferRef,
},
Compact {
plan: TypedBufferRef,
select: TypedBufferRef,
#[output(t = "base=plan")]
compacted: TypedBufferRef,
},
BitPack {
lhs: BufferRef<i64>,
rhs: BufferRef<i64>,
shift: i64,
#[output]
bit_packed: BufferRef<i64>,
},
BitUnpack {
plan: BufferRef<i64>,
shift: u8,
width: u8,
#[output]
unpacked: BufferRef<i64>,
},
SlicePack {
plan: TypedBufferRef,
stride: usize,
offset: usize,
#[output(shared_byte_slices)]
packed: BufferRef<Any>,
},
SliceUnpack {
plan: BufferRef<Any>,
stride: usize,
offset: usize,
#[output(t = "base=provided")]
unpacked: TypedBufferRef,
},
ValRowsPack {
plan: BufferRef<Val<'static>>,
stride: usize,
offset: usize,
#[output(shared_val_rows)]
packed: BufferRef<ValRows<'static>>,
},
ValRowsUnpack {
plan: BufferRef<ValRows<'static>>,
stride: usize,
offset: usize,
#[output]
unpacked: BufferRef<Val<'static>>,
},
Aggregate {
plan: TypedBufferRef,
grouping_key: TypedBufferRef,
max_index: BufferRef<Scalar<i64>>,
aggregator: Aggregator,
#[output(t = "base=provided")]
aggregate: TypedBufferRef,
},
CheckedAggregate {
plan: TypedBufferRef,
grouping_key: TypedBufferRef,
max_index: BufferRef<Scalar<i64>>,
aggregator: Aggregator,
#[output(t = "base=provided")]
aggregate: TypedBufferRef,
},
LessThan {
lhs: TypedBufferRef,
rhs: TypedBufferRef,
#[output(t = "base=u8;null=lhs,rhs")]
less_than: TypedBufferRef,
},
LessThanEquals {
lhs: TypedBufferRef,
rhs: TypedBufferRef,
#[output(t = "base=u8;null=lhs,rhs")]
less_than_equals: TypedBufferRef,
},
Equals {
lhs: TypedBufferRef,
rhs: TypedBufferRef,
#[output(t = "base=u8;null=lhs,rhs")]
equals: TypedBufferRef,
},
NotEquals {
lhs: TypedBufferRef,
rhs: TypedBufferRef,
#[output(t = "base=u8;null=lhs,rhs")]
not_equals: TypedBufferRef,
},
Add {
lhs: TypedBufferRef,
rhs: TypedBufferRef,
#[output(t = "base=i64;null=lhs,rhs")]
sum: TypedBufferRef,
},
CheckedAdd {
lhs: TypedBufferRef,
rhs: TypedBufferRef,
#[output(t = "base=i64;null=lhs,rhs")]
sum: TypedBufferRef,
},
NullableCheckedAdd {
lhs: TypedBufferRef,
rhs: TypedBufferRef,
present: BufferRef<u8>,
#[output]
sum: BufferRef<Nullable<i64>>,
},
Subtract {
lhs: TypedBufferRef,
rhs: TypedBufferRef,
#[output(t = "base=i64;null=lhs,rhs")]
difference: TypedBufferRef,
},
CheckedSubtract {
lhs: TypedBufferRef,
rhs: TypedBufferRef,
#[output(t = "base=i64;null=lhs,rhs")]
difference: TypedBufferRef,
},
NullableCheckedSubtract {
lhs: TypedBufferRef,
rhs: TypedBufferRef,
present: BufferRef<u8>,
#[output]
difference: BufferRef<Nullable<i64>>,
},
Multiply {
lhs: TypedBufferRef,
rhs: TypedBufferRef,
#[output(t = "base=i64;null=lhs,rhs")]
product: TypedBufferRef,
},
CheckedMultiply {
lhs: TypedBufferRef,
rhs: TypedBufferRef,
#[output(t = "base=i64;null=lhs,rhs")]
product: TypedBufferRef,
},
NullableCheckedMultiply {
lhs: TypedBufferRef,
rhs: TypedBufferRef,
present: BufferRef<u8>,
#[output]
product: BufferRef<Nullable<i64>>,
},
Divide {
lhs: TypedBufferRef,
rhs: TypedBufferRef,
#[output(t = "base=i64;null=lhs,rhs")]
division: TypedBufferRef,
},
CheckedDivide {
lhs: TypedBufferRef,
rhs: TypedBufferRef,
#[output(t = "base=i64;null=lhs,rhs")]
division: TypedBufferRef,
},
NullableCheckedDivide {
lhs: TypedBufferRef,
rhs: TypedBufferRef,
present: BufferRef<u8>,
#[output]
division: BufferRef<Nullable<i64>>,
},
Modulo {
lhs: TypedBufferRef,
rhs: TypedBufferRef,
#[output(t = "base=i64;null=lhs,rhs")]
modulo: TypedBufferRef,
},
CheckedModulo {
lhs: TypedBufferRef,
rhs: TypedBufferRef,
#[output(t = "base=i64;null=lhs,rhs")]
modulo: TypedBufferRef,
},
NullableCheckedModulo {
lhs: TypedBufferRef,
rhs: TypedBufferRef,
present: BufferRef<u8>,
#[output]
modulo: BufferRef<Nullable<i64>>,
},
And {
lhs: TypedBufferRef,
rhs: TypedBufferRef,
#[output(t = "base=u8;null=lhs,rhs")]
and: TypedBufferRef,
},
Or {
lhs: TypedBufferRef,
rhs: TypedBufferRef,
#[output(t = "base=u8;null=lhs,rhs")]
or: TypedBufferRef,
},
Not {
input: BufferRef<u8>,
#[output]
not: BufferRef<u8>,
},
ToYear {
timestamp: TypedBufferRef,
#[output(t = "base=i64;null=timestamp")]
year: TypedBufferRef,
},
Regex {
plan: BufferRef<&'static str>,
regex: String,
#[output]
matches: BufferRef<u8>,
},
Length {
string: BufferRef<&'static str>,
#[output]
length: BufferRef<i64>,
},
Indices {
plan: TypedBufferRef,
#[output]
indices: BufferRef<usize>,
},
SortBy {
ranking: TypedBufferRef,
indices: BufferRef<usize>,
desc: bool,
stable: bool,
#[output]
permutation: BufferRef<usize>,
},
TopN {
ranking: TypedBufferRef,
n: usize,
desc: bool,
#[internal(t = "base=ranking")]
tmp_keys: TypedBufferRef,
#[output]
top_n: BufferRef<usize>,
},
Select {
plan: TypedBufferRef,
indices: BufferRef<usize>,
#[output(t = "base=plan")]
selection: TypedBufferRef,
},
Filter {
plan: TypedBufferRef,
select: BufferRef<u8>,
#[output(t = "base=plan")]
filtered: TypedBufferRef,
},
NullableFilter {
plan: TypedBufferRef,
select: BufferRef<Nullable<u8>>,
#[output(t = "base=plan")]
filtered: TypedBufferRef,
},
ConstantVec {
index: usize,
#[output(t = "base=provided")]
constant_vec: TypedBufferRef,
},
NullVec {
len: usize,
#[output(t = "base=provided")]
nulls: TypedBufferRef,
},
ScalarI64 {
value: i64,
hide_value: bool,
#[output]
scalar_i64: BufferRef<Scalar<i64>>,
},
ScalarStr {
value: String,
#[internal]
pinned_string: BufferRef<Scalar<String>>,
#[output]
scalar_str: BufferRef<Scalar<&'static str>>,
},
ConstantExpand {
value: i64,
len: usize,
#[output(t = "base=provided")]
expanded: TypedBufferRef,
},
Merge {
lhs: TypedBufferRef,
rhs: TypedBufferRef,
limit: usize,
desc: bool,
#[output]
merge_ops: BufferRef<u8>,
#[output(t = "base=lhs;null=lhs,rhs")]
merged: TypedBufferRef,
},
MergePartitioned {
partitioning: BufferRef<Premerge>,
lhs: TypedBufferRef,
rhs: TypedBufferRef,
limit: usize,
desc: bool,
#[output]
take_left: BufferRef<u8>,
#[output(t = "base=lhs")]
merged: TypedBufferRef,
},
MergeDeduplicate {
lhs: TypedBufferRef,
rhs: TypedBufferRef,
#[output]
merge_ops: BufferRef<MergeOp>,
#[output(t = "base=lhs")]
merged: TypedBufferRef,
},
MergeDeduplicatePartitioned {
partitioning: BufferRef<Premerge>,
lhs: TypedBufferRef,
rhs: TypedBufferRef,
#[output]
merge_ops: BufferRef<MergeOp>,
#[output(t = "base=lhs")]
merged: TypedBufferRef,
},
Partition {
lhs: TypedBufferRef,
rhs: TypedBufferRef,
limit: usize,
desc: bool,
#[output]
partitioning: BufferRef<Premerge>,
},
Subpartition {
partitioning: BufferRef<Premerge>,
lhs: TypedBufferRef,
rhs: TypedBufferRef,
desc: bool,
#[output]
subpartitioning: BufferRef<Premerge>,
},
MergeKeep {
take_left: BufferRef<u8>,
lhs: TypedBufferRef,
rhs: TypedBufferRef,
#[output(t = "base=lhs;null=lhs,rhs")]
merged: TypedBufferRef,
},
MergeDrop {
merge_ops: BufferRef<MergeOp>,
lhs: TypedBufferRef,
rhs: TypedBufferRef,
#[output(t = "base=lhs")]
merged: TypedBufferRef,
},
MergeAggregate {
merge_ops: BufferRef<MergeOp>,
lhs: BufferRef<i64>,
rhs: BufferRef<i64>,
aggregator: Aggregator,
#[output]
merged: BufferRef<i64>,
},
}
#[allow(clippy::type_complexity)]
pub fn prepare_hashmap_grouping(raw_grouping_key: TypedBufferRef,
columns: usize,
max_cardinality: usize,
planner: &mut QueryPlanner)
-> Result<(Option<TypedBufferRef>,
TypedBufferRef,
bool,
BufferRef<Scalar<i64>>), QueryError> {
let (unique_out, grouping_key_out, cardinality_out) =
if raw_grouping_key.tag == EncodingType::ValRows {
let (u, g, c) = planner.hash_map_grouping_val_rows(raw_grouping_key.val_rows()?, columns, max_cardinality);
(u.into(), g, c)
} else {
planner.hash_map_grouping(raw_grouping_key, max_cardinality)
};
Ok((Some(unique_out),
grouping_key_out.into(),
false,
cardinality_out))
}
pub fn prepare_aggregation(mut plan: TypedBufferRef,
plan_type: Type,
grouping_key: TypedBufferRef,
max_index: BufferRef<Scalar<i64>>,
aggregator: Aggregator,
planner: &mut QueryPlanner)
-> Result<(TypedBufferRef, Type), QueryError> {
Ok(match aggregator {
Aggregator::Count => {
let plan = if plan.tag == EncodingType::ScalarI64 { grouping_key } else { plan };
(planner.aggregate(plan, grouping_key, max_index, Aggregator::Count, EncodingType::U32),
Type::encoded(Codec::integer_cast(EncodingType::U32)))
}
Aggregator::Sum => {
if !plan_type.is_summation_preserving() {
plan = plan_type.codec.unwrap().decode(plan, planner);
}
(planner.checked_aggregate(plan, grouping_key, max_index, Aggregator::Sum, EncodingType::I64),
Type::unencoded(BasicType::Integer))
}
Aggregator::Max | Aggregator::Min => {
plan = plan_type.codec.unwrap().decode(plan, planner);
(planner.aggregate(plan, grouping_key, max_index, aggregator, EncodingType::I64),
Type::unencoded(BasicType::Integer))
}
})
}
pub fn order_preserving((plan, t): (TypedBufferRef, Type),
planner: &mut QueryPlanner) -> (TypedBufferRef, Type) {
if t.is_order_preserving() {
(plan, t)
} else {
let new_type = t.decoded();
(t.codec.unwrap().decode(plan, planner), new_type)
}
}
type Factory = Box<dyn Fn(&mut QueryPlanner, TypedBufferRef, TypedBufferRef) -> TypedBufferRef + Sync>;
struct Function2 {
pub factory: Factory,
pub type_rhs: BasicType,
pub type_lhs: BasicType,
pub type_out: Type,
pub encoding_invariance: bool,
}
impl Function2 {
pub fn integer_op(factory: Factory) -> Function2 {
Function2 {
factory,
type_lhs: BasicType::Integer,
type_rhs: BasicType::Integer,
type_out: Type::unencoded(BasicType::Integer).mutable(),
encoding_invariance: false,
}
}
pub fn comparison_op(factory: Factory,
t: BasicType) -> Function2 {
Function2 {
factory,
type_lhs: t,
type_rhs: t,
type_out: Type::unencoded(BasicType::Boolean).mutable(),
encoding_invariance: true,
}
}
}
lazy_static! {
static ref FUNCTION2_REGISTRY: HashMap<Func2Type, Vec<Function2>> = function2_registry();
}
fn function2_registry() -> HashMap<Func2Type, Vec<Function2>> {
vec![
(Func2Type::Add,
vec![Function2::integer_op(Box::new(|qp, lhs, rhs| qp.checked_add(lhs, rhs)))]),
(Func2Type::Subtract,
vec![Function2::integer_op(Box::new(|qp, lhs, rhs| qp.checked_subtract(lhs, rhs)))]),
(Func2Type::Multiply,
vec![Function2::integer_op(Box::new(|qp, lhs, rhs| qp.checked_multiply(lhs, rhs)))]),
(Func2Type::Divide,
vec![Function2::integer_op(Box::new(|qp, lhs, rhs| qp.checked_divide(lhs, rhs)))]),
(Func2Type::Modulo,
vec![Function2::integer_op(Box::new(|qp, lhs, rhs| qp.checked_modulo(lhs, rhs)))]),
(Func2Type::LT,
vec![Function2::comparison_op(Box::new(|qp, lhs, rhs| qp.less_than(lhs, rhs)),
BasicType::Integer),
Function2::comparison_op(Box::new(|qp, lhs, rhs| qp.less_than(lhs, rhs)),
BasicType::String)]),
(Func2Type::LTE,
vec![Function2::comparison_op(Box::new(|qp, lhs, rhs| qp.less_than_equals(lhs, rhs)),
BasicType::Integer),
Function2::comparison_op(Box::new(|qp, lhs, rhs| qp.less_than_equals(lhs, rhs)),
BasicType::String)]),
(Func2Type::GT,
vec![Function2::comparison_op(Box::new(|qp, lhs, rhs| qp.less_than(rhs, lhs)),
BasicType::Integer),
Function2::comparison_op(Box::new(|qp, lhs, rhs| qp.less_than(rhs, lhs)),
BasicType::String)]),
(Func2Type::GTE,
vec![Function2::comparison_op(Box::new(|qp, lhs, rhs| qp.less_than_equals(rhs, lhs)),
BasicType::Integer),
Function2::comparison_op(Box::new(|qp, lhs, rhs| qp.less_than_equals(rhs, lhs)),
BasicType::String)]),
(Func2Type::Equals,
vec![Function2::comparison_op(Box::new(|qp, lhs, rhs| qp.equals(lhs, rhs)),
BasicType::Integer),
Function2::comparison_op(Box::new(|qp, lhs, rhs| qp.equals(lhs, rhs)),
BasicType::String)]),
(Func2Type::NotEquals,
vec![Function2::comparison_op(Box::new(|qp, lhs, rhs| qp.not_equals(lhs, rhs)),
BasicType::Integer),
Function2::comparison_op(Box::new(|qp, lhs, rhs| qp.not_equals(lhs, rhs)),
BasicType::String)]),
].into_iter().collect()
}
impl QueryPlan {
#[allow(clippy::trivial_regex)]
pub fn compile_expr(
expr: &Expr,
filter: Filter,
columns: &HashMap<String, Arc<dyn DataSource>>,
column_len: usize,
planner: &mut QueryPlanner) -> Result<(TypedBufferRef, Type), QueryError> {
use self::Expr::*;
use self::Func2Type::*;
Ok(match *expr {
ColName(ref name) => match columns.get::<str>(name.as_ref()) {
Some(c) => {
let mut plan = planner.column_section(name, 0, c.range(), c.encoding_type());
let mut t = c.full_type();
if !c.codec().is_elementwise_decodable() {
let (codec, fixed_width) = c.codec().ensure_fixed_width(plan, planner);
t = Type::encoded(codec);
plan = fixed_width;
}
plan = match filter {
Filter::U8(filter) => planner.filter(plan, filter),
Filter::NullableU8(filter) => planner.nullable_filter(plan, filter),
Filter::Indices(indices) => planner.select(plan, indices),
Filter::None => plan,
};
(plan, t)
}
None => (planner.null_vec(column_len, EncodingType::Null), Type::new(BasicType::Null, None)),
}
Func2(Or, ref lhs, ref rhs) => {
let (plan_lhs, type_lhs) = QueryPlan::compile_expr(lhs, filter, columns, column_len, planner)?;
let (plan_rhs, type_rhs) = QueryPlan::compile_expr(rhs, filter, columns, column_len, planner)?;
if type_lhs.decoded != BasicType::Boolean || type_rhs.decoded != BasicType::Boolean {
bail!(QueryError::TypeError, "Found {} OR {}, expected bool OR bool")
}
(planner.or(plan_lhs, plan_rhs), Type::bit_vec())
}
Func2(And, ref lhs, ref rhs) => {
let (plan_lhs, type_lhs) = QueryPlan::compile_expr(lhs, filter, columns, column_len, planner)?;
let (plan_rhs, type_rhs) = QueryPlan::compile_expr(rhs, filter, columns, column_len, planner)?;
if type_lhs.decoded != BasicType::Boolean || type_rhs.decoded != BasicType::Boolean {
bail!(QueryError::TypeError, "Found {} AND {}, expected bool AND bool")
}
(planner.and(plan_lhs, plan_rhs), Type::bit_vec())
}
Func2(Like, ref expr, ref pattern) => {
match pattern {
box Const(RawVal::Str(pattern)) => {
let mut pattern = pattern.to_string();
pattern = regex::escape(&pattern);
pattern = Regex::new(r"([^\\])_").unwrap().replace_all(&pattern, "$1.").to_string();
pattern = Regex::new(r"\\_").unwrap().replace_all(&pattern, "_").to_string();
while pattern.contains("%%%%") {
pattern = pattern.replace("%%%%", "%%");
}
pattern = pattern.replace("%%%", "(%.*)|(.*%)");
pattern = Regex::new(r"([^%])%([^%])").unwrap().replace_all(&pattern, "$1.*$2").to_string();
pattern = Regex::new(r"^%([^%])").unwrap().replace_all(&pattern, ".*$1").to_string();
pattern = Regex::new(r"([^%])%$").unwrap().replace_all(&pattern, "$1.*").to_string();
pattern = Regex::new(r"%%").unwrap().replace_all(&pattern, "%").to_string();
pattern = format!("^{}$", pattern);
let (mut plan, t) = QueryPlan::compile_expr(expr, filter, columns, column_len, planner)?;
if t.decoded != BasicType::String {
bail!(QueryError::TypeError,
"Expected expression of type `String` as first argument to LIKE. Actual: {:?}", t)
}
if let Some(codec) = t.codec {
plan = codec.decode(plan, planner);
}
let type_out = Type::unencoded(BasicType::Boolean).mutable();
(planner.regex(plan.str()?, &pattern).into(), type_out)
}
_ => bail!(QueryError::TypeError,
"Expected string constant as second argument to `LIKE`, actual: {:?}", pattern),
}
}
Func2(NotLike, ref expr, ref pattern) => {
QueryPlan::compile_expr(
&Expr::Func1(
Func1Type::Not,
Box::new(Expr::Func2(Func2Type::Like, expr.clone(), pattern.clone()))
), filter, columns, column_len, planner)?
}
Func2(RegexMatch, ref expr, ref regex) => {
match regex {
box Const(RawVal::Str(regex)) => {
Regex::new(®ex).map_err(|e| QueryError::TypeError(
format!("`{}` is not a valid regex: {}", regex, e)))?;
let (mut plan, t) = QueryPlan::compile_expr(expr, filter, columns, column_len, planner)?;
if t.decoded != BasicType::String {
bail!(QueryError::TypeError, "Expected expression of type `String` as first argument to regex. Actual: {:?}", t)
}
if let Some(codec) = t.codec {
plan = codec.decode(plan, planner);
}
let type_out = Type::unencoded(BasicType::Boolean).mutable();
(planner.regex(plan.str()?, regex).into(), type_out)
}
_ => bail!(QueryError::TypeError, "Expected string constant as second argument to `regex`, actual: {:?}", regex),
}
}
Func2(function, ref lhs, ref rhs) => {
let (mut plan_lhs, type_lhs) = QueryPlan::compile_expr(lhs, filter, columns, column_len, planner)?;
let (mut plan_rhs, type_rhs) = QueryPlan::compile_expr(rhs, filter, columns, column_len, planner)?;
let declarations = match FUNCTION2_REGISTRY.get(&function) {
Some(patterns) => patterns,
None => bail!(QueryError::NotImplemented, "function {:?}", function),
};
let declaration = match declarations.iter().find(
|p| p.type_lhs == type_lhs.decoded.non_nullable() &&
p.type_rhs == type_rhs.decoded.non_nullable()) {
Some(declaration) => declaration,
None => bail!(
QueryError::TypeError,
"Function {:?} is not implemented for types {:?}, {:?}",
function, type_lhs, type_rhs),
};
if declaration.encoding_invariance && type_lhs.is_scalar && type_rhs.is_encoded() {
plan_lhs = if type_rhs.decoded == BasicType::Integer {
if let QueryPlan::ScalarI64 { value, .. } = *planner.resolve(&plan_lhs) {
planner.scalar_i64(type_rhs.codec.unwrap().encode_int(value), true).into()
} else {
panic!("whoops");
}
} else if type_rhs.decoded == BasicType::String {
type_rhs.codec.clone().unwrap().encode_str(plan_lhs.scalar_str()?, planner).into()
} else {
panic!("whoops");
};
} else if declaration.encoding_invariance && type_rhs.is_scalar && type_lhs.is_encoded() {
plan_rhs = if type_lhs.decoded == BasicType::Integer {
if let QueryPlan::ScalarI64 { value, .. } = *planner.resolve(&plan_rhs) {
planner.scalar_i64(type_lhs.codec.unwrap().encode_int(value), true).into()
} else {
panic!("whoops");
}
} else if type_lhs.decoded == BasicType::String {
type_lhs.codec.clone().unwrap().encode_str(plan_rhs.scalar_str()?, planner).into()
} else {
panic!("whoops");
};
} else {
if let Some(codec) = type_lhs.codec {
plan_lhs = codec.decode(plan_lhs, planner);
}
if let Some(codec) = type_rhs.codec {
plan_rhs = codec.decode(plan_rhs, planner);
}
}
let plan = (declaration.factory)(planner, plan_lhs, plan_rhs);
(plan, declaration.type_out.clone())
}
Func1(Func1Type::Negate, box Const(RawVal::Int(i))) => {
QueryPlan::compile_expr(&Const(RawVal::Int(-i)), filter, columns, column_len, planner)?
}
Func1(ftype, ref inner) => {
let (plan, t) = QueryPlan::compile_expr(inner, filter, columns, column_len, planner)?;
let plan = match ftype {
Func1Type::ToYear => {
let decoded = match t.codec.clone() {
Some(codec) => codec.decode(plan, planner),
None => plan,
};
if t.decoded != BasicType::Integer {
bail!(QueryError::TypeError, "Found to_year({:?}), expected to_year(integer)", &t)
}
planner.to_year(decoded)
}
Func1Type::Length => {
let decoded = match t.codec.clone() {
Some(codec) => codec.decode(plan, planner),
None => plan,
};
if t.decoded != BasicType::String {
bail!(QueryError::TypeError, "Found length({:?}), expected length(string)", &t)
}
planner.length(decoded.str()?).into()
}
Func1Type::Not => {
let decoded = match t.codec.clone() {
Some(codec) => codec.decode(plan, planner),
None => plan,
};
if t.decoded != BasicType::Boolean {
bail!(QueryError::TypeError, "Found NOT({:?}), expected NOT(boolean)", &t)
}
planner.not(decoded.u8()?).into()
}
Func1Type::IsNull => if plan.is_nullable() {
planner.is_null(plan.nullable_any()?).into()
} else {
planner.constant_expand((false as u8) as i64, column_len, EncodingType::U8)
}
Func1Type::IsNotNull => if plan.is_nullable() {
planner.is_not_null(plan.nullable_any()?).into()
} else {
planner.constant_expand((true as u8) as i64, column_len, EncodingType::U8)
}
Func1Type::Negate => {
bail!(QueryError::TypeError, "Unary minus not implemented for arbitrary expressions.")
}
};
(plan, t.decoded())
}
Const(RawVal::Int(i)) => (planner.scalar_i64(i, false).into(), Type::scalar(BasicType::Integer)),
Const(RawVal::Str(ref s)) => (planner.scalar_str(s).into(), Type::scalar(BasicType::String)),
ref x => bail!(QueryError::NotImplemented, "{:?}.compile_vec()", x),
})
}
}
fn encoding_range(plan: &TypedBufferRef, qp: &QueryPlanner) -> Option<(i64, i64)> {
use self::QueryPlan::*;
match *qp.resolve(plan) {
ColumnSection { range, .. } => range,
ToYear { timestamp, .. } => encoding_range(×tamp, qp).map(|(min, max)|
(i64::from(NaiveDateTime::from_timestamp(min, 0).year()),
i64::from(NaiveDateTime::from_timestamp(max, 0).year()))
),
Filter { ref plan, .. } => encoding_range(plan, qp),
Divide { ref lhs, ref rhs, .. } => if let ScalarI64 { value: c, .. } = qp.resolve(rhs) {
encoding_range(lhs, qp).map(|(min, max)|
if *c > 0 { (min / *c, max / *c) } else { (max / *c, min / *c) })
} else {
None
},
CheckedDivide { ref lhs, ref rhs, .. } => if let ScalarI64 { value: c, .. } = qp.resolve(rhs) {
encoding_range(lhs, qp).map(|(min, max)|
if *c > 0 { (min / *c, max / *c) } else { (max / *c, min / *c) })
} else {
None
},
Add { ref lhs, ref rhs, .. } => if let ScalarI64 { value: c, .. } = qp.resolve(rhs) {
encoding_range(lhs, qp).map(|(min, max)| (min + *c, max + *c))
} else {
None
},
Cast { ref input, .. } => encoding_range(input, qp),
LZ4Decode { bytes, .. } => encoding_range(&bytes.into(), qp),
DeltaDecode { ref plan, .. } => encoding_range(plan, qp),
AssembleNullable { ref data, .. } => encoding_range(data, qp),
UnpackStrings { .. } | UnhexpackStrings { .. } | Length { .. } => None,
ref plan => {
error!("encoding_range not implement for {:?}", plan);
None
}
}
}
#[allow(clippy::type_complexity)]
pub fn compile_grouping_key(
exprs: &[Expr],
filter: Filter,
columns: &HashMap<String, Arc<dyn DataSource>>,
partition_len: usize,
planner: &mut QueryPlanner)
-> Result<((TypedBufferRef, bool), i64, Vec<(TypedBufferRef, Type)>, TypedBufferRef), QueryError> {
if exprs.is_empty() {
let mut plan = planner.constant_expand(0, partition_len, EncodingType::U8);
plan = match filter {
Filter::U8(filter) => planner.filter(plan, filter),
Filter::NullableU8(filter) => planner.nullable_filter(plan, filter),
Filter::Indices(indices) => planner.select(plan, indices),
Filter::None => plan,
};
Ok((
(plan, true),
1,
vec![],
planner.buffer_provider.named_buffer("empty_group_by", EncodingType::Null)
))
} else if exprs.len() == 1 {
QueryPlan::compile_expr(&exprs[0], filter, columns, partition_len, planner)
.map(|(mut gk_plan, gk_type)| {
let original_plan = gk_plan;
let encoding_range = encoding_range(&gk_plan, planner);
debug!("Encoding range of {:?} for {:?}", &encoding_range, &gk_plan);
let (max_cardinality, offset) = match encoding_range {
Some((min, max)) => if min <= 0 && gk_plan.is_nullable() {
(max - min + 1, Some(-min + 1))
} else if gk_plan.is_nullable() {
(max, Some(0))
} else if min < 0 {
(max - min, Some(-min))
} else {
(max, None)
}
None => (1 << 62, None),
};
if gk_plan.is_nullable() {
gk_plan = match offset {
Some(offset) => planner.fuse_int_nulls(offset, gk_plan),
None => planner.fuse_nulls(gk_plan),
}
} else if let Some(offset) = offset {
let offset = planner.scalar_i64(offset, true);
gk_plan = planner.add(gk_plan, offset.into());
}
let encoded_group_by_placeholder = planner.buffer_provider.named_buffer(
"encoded_group_by_placeholder", gk_plan.tag);
let mut decoded_group_by = encoded_group_by_placeholder;
if original_plan.is_nullable() {
decoded_group_by = match offset {
Some(offset) => planner.unfuse_int_nulls(offset, decoded_group_by),
None => planner.unfuse_nulls(decoded_group_by),
}
} else if let Some(offset) = offset {
let offset = planner.scalar_i64(-offset, true);
let sum = planner.add(decoded_group_by, offset.into());
decoded_group_by = planner.cast(sum, gk_type.encoding_type());
}
if let Some(codec) = gk_type.codec.clone() {
decoded_group_by = codec.decode(decoded_group_by, planner)
}
((gk_plan, gk_type.is_order_preserving()),
max_cardinality,
vec![(decoded_group_by, gk_type.decoded())],
encoded_group_by_placeholder)
})
} else if let Some(result) = try_bitpacking(exprs, filter, columns, partition_len, planner)? {
Ok(result)
} else {
info!("Failed to bitpack grouping key");
let mut pack = Vec::new();
let mut decode_plans = Vec::new();
let encoded_group_by_placeholder = planner.buffer_provider.named_buffer(
"encoded_group_by_placeholder", EncodingType::ValRows);
let mut order_preserving = true;
for (i, expr) in exprs.iter().enumerate() {
let (query_plan, plan_type) = QueryPlan::compile_expr(expr, filter, columns, partition_len, planner)?;
order_preserving = order_preserving && plan_type.is_order_preserving();
let vals = planner.cast(query_plan, EncodingType::Val).val()?;
pack.push(planner.val_rows_pack(vals, exprs.len(), i));
let vals = planner.val_rows_unpack(
encoded_group_by_placeholder.val_rows()?,
exprs.len(),
i).into();
let mut decode_plan = planner.cast(vals, query_plan.tag);
if let Some(codec) = plan_type.codec.clone() {
decode_plan = codec.decode(decode_plan, planner);
}
decode_plans.push(
(decode_plan,
plan_type.decoded()));
}
Ok(((pack[0].clone().into(), order_preserving),
i64::MAX,
decode_plans,
encoded_group_by_placeholder))
}
}
#[allow(clippy::type_complexity)]
fn try_bitpacking(
exprs: &[Expr],
filter: Filter,
columns: &HashMap<String, Arc<dyn DataSource>>,
partition_len: usize,
planner: &mut QueryPlanner)
-> Result<Option<((TypedBufferRef, bool), i64, Vec<(TypedBufferRef, Type)>, TypedBufferRef)>, QueryError> {
planner.checkpoint();
let mut total_width = 0;
let mut largest_key = 0;
let mut plan: Option<BufferRef<i64>> = None;
let mut decode_plans = Vec::with_capacity(exprs.len());
let mut order_preserving = true;
let encoded_group_by_placeholder = planner.buffer_provider.buffer_i64("encoded_group_by_placeholder");
for expr in exprs.iter().rev() {
let (query_plan, plan_type) = QueryPlan::compile_expr(expr, filter, columns, partition_len, planner)?;
let encoding_range = encoding_range(&query_plan, planner);
debug!("Encoding range of {:?} for {:?}", &encoding_range, &query_plan);
if let Some((min, max)) = encoding_range {
fn bits(max: i64) -> i64 {
((max + 1) as f64).log2().ceil() as i64
}
let max = if query_plan.is_nullable() && min <= 0 { max + 1 } else { max };
let subtract_offset = bits(max) - bits(max - min) > 1 || min < 0 || query_plan.is_nullable();
let adjusted_max = if query_plan.is_nullable() { max - min + 1 } else if subtract_offset { max - min } else { max };
order_preserving = order_preserving && plan_type.is_order_preserving();
let adjusted_query_plan = if query_plan.is_nullable() {
let fused = planner.fuse_int_nulls(-min + 1, query_plan);
if fused.tag != EncodingType::I64 {
planner.cast(fused, EncodingType::I64).i64()?
} else {
fused.i64()?
}
} else if subtract_offset {
let offset = planner.scalar_i64(-min, true);
planner.add(query_plan, offset.into()).i64()?
} else {
planner.cast(query_plan, EncodingType::I64).i64()?
};
if total_width == 0 {
plan = Some(adjusted_query_plan);
} else if adjusted_max > 0 || query_plan.is_nullable() {
plan = plan.map(|plan| planner.bit_pack(plan, adjusted_query_plan, total_width));
}
let mut decode_plan = planner.bit_unpack(
encoded_group_by_placeholder,
total_width as u8,
bits(adjusted_max) as u8).into();
if query_plan.is_nullable() {
decode_plan = planner.unfuse_int_nulls(-min + 1, decode_plan);
} else if subtract_offset {
let offset = planner.scalar_i64(min, true);
decode_plan = planner.add(decode_plan, offset.into());
}
decode_plan = planner.cast(decode_plan, plan_type.encoding_type());
if let Some(codec) = plan_type.codec.clone() {
decode_plan = codec.decode(decode_plan, planner);
}
decode_plans.push((decode_plan, plan_type.decoded()));
largest_key += adjusted_max << total_width;
total_width += bits(adjusted_max);
} else {
planner.reset();
return Ok(None);
}
}
Ok(
if total_width <= 63 {
plan.map(|plan| {
decode_plans.reverse();
((plan.into(), order_preserving), largest_key, decode_plans, encoded_group_by_placeholder.into())
})
} else {
planner.reset();
None
}
)
}
pub(super) fn prepare<'a>(plan: QueryPlan, constant_vecs: &mut Vec<BoxedData<'a>>, result: &mut QueryExecutor<'a>) -> Result<TypedBufferRef, QueryError> {
trace!("{:?}", &plan);
let operation: Box<dyn VecOperator> = match plan {
QueryPlan::Select { plan, indices, selection } => VecOperator::select(plan, indices, selection)?,
QueryPlan::ColumnSection { name, section, column_section, .. } => VecOperator::read_column_data(name, section, column_section.any()),
QueryPlan::AssembleNullable { data, present, nullable } => VecOperator::nullable(data, present, nullable)?,
QueryPlan::MakeNullable { data, present, nullable } => VecOperator::make_nullable(data, present, nullable)?,
QueryPlan::PropagateNullability { nullable, data, nullable_data } => VecOperator::propagate_nullability(nullable.nullable_any()?, data, nullable_data)?,
QueryPlan::CombineNullMaps { lhs, rhs, present } => VecOperator::combine_null_maps(lhs, rhs, present)?,
QueryPlan::GetNullMap { nullable, present } => VecOperator::get_null_map(nullable.nullable_any()?, present),
QueryPlan::FuseNulls { nullable, fused } => VecOperator::fuse_nulls(nullable, fused)?,
QueryPlan::FuseIntNulls { offset, nullable, fused } => VecOperator::fuse_int_nulls(offset, nullable, fused)?,
QueryPlan::UnfuseNulls { fused, data, present, unfused } => VecOperator::unfuse_nulls(fused, data, present, unfused)?,
QueryPlan::UnfuseIntNulls { offset, fused, data, present, unfused } => VecOperator::unfuse_int_nulls(offset, fused, data, present, unfused)?,
QueryPlan::Filter { plan, select, filtered } => VecOperator::filter(plan, select, filtered)?,
QueryPlan::NullableFilter { plan, select, filtered } => VecOperator::nullable_filter(plan, select, filtered)?,
QueryPlan::IsNull { plan, is_null } => VecOperator::is_null(plan, is_null),
QueryPlan::IsNotNull { plan, is_not_null } => VecOperator::is_not_null(plan, is_not_null),
QueryPlan::ScalarI64 { value, hide_value, scalar_i64 } => VecOperator::scalar_i64(value, hide_value, scalar_i64),
QueryPlan::ScalarStr { value, pinned_string, scalar_str } => VecOperator::scalar_str(value, pinned_string, scalar_str),
QueryPlan::NullVec { len, nulls } => VecOperator::null_vec(len, nulls.any()),
QueryPlan::ConstantExpand { value, len, expanded } => VecOperator::constant_expand(value, len, expanded)?,
QueryPlan::DictLookup { indices, offset_len, backing_store, decoded } => VecOperator::dict_lookup(indices, offset_len, backing_store, decoded.str()?)?,
QueryPlan::InverseDictLookup { offset_len, backing_store, constant, decoded } => VecOperator::inverse_dict_lookup(offset_len, backing_store, constant, decoded),
QueryPlan::Cast { input, casted } => VecOperator::type_conversion(input, casted)?,
QueryPlan::DeltaDecode { plan, delta_decoded } => VecOperator::delta_decode(plan, delta_decoded)?,
QueryPlan::LZ4Decode { bytes, decoded_len, decoded } => VecOperator::lz4_decode(bytes, decoded_len, decoded)?,
QueryPlan::UnpackStrings { bytes, unpacked_strings } => VecOperator::unpack_strings(bytes, unpacked_strings),
QueryPlan::UnhexpackStrings { bytes, uppercase, total_bytes, string_store, unpacked_strings } => VecOperator::unhexpack_strings(bytes, uppercase, total_bytes, string_store, unpacked_strings),
QueryPlan::HashMapGrouping { raw_grouping_key, max_cardinality, unique, grouping_key, cardinality } => VecOperator::hash_map_grouping(raw_grouping_key, max_cardinality, unique, grouping_key, cardinality)?,
QueryPlan::HashMapGroupingValRows { raw_grouping_key, max_cardinality, columns, unique, grouping_key, cardinality } => VecOperator::hash_map_grouping_val_rows(raw_grouping_key, columns, max_cardinality, unique, grouping_key, cardinality)?,
QueryPlan::Aggregate { plan, grouping_key, max_index, aggregator, aggregate } => VecOperator::aggregate(plan, grouping_key, max_index, aggregator, aggregate)?,
QueryPlan::CheckedAggregate { plan, grouping_key, max_index, aggregator, aggregate } => VecOperator::checked_aggregate(plan, grouping_key, max_index, aggregator, aggregate)?,
QueryPlan::Exists { indices, max_index, exists } => VecOperator::exists(indices, max_index, exists)?,
QueryPlan::Compact { plan, select, compacted } => VecOperator::compact(plan, select, compacted)?,
QueryPlan::NonzeroIndices { plan, nonzero_indices } => VecOperator::nonzero_indices(plan, nonzero_indices)?,
QueryPlan::NonzeroCompact { plan, compacted } => VecOperator::nonzero_compact(plan, compacted)?,
QueryPlan::BitPack { lhs, rhs, shift, bit_packed } => VecOperator::bit_shift_left_add(lhs, rhs, bit_packed, shift),
QueryPlan::BitUnpack { plan, shift, width, unpacked } => VecOperator::bit_unpack(plan, shift, width, unpacked),
QueryPlan::SlicePack { plan, stride, offset, packed } => VecOperator::slice_pack(plan, stride, offset, packed)?,
QueryPlan::SliceUnpack { plan, stride, offset, unpacked } => VecOperator::slice_unpack(plan, stride, offset, unpacked)?,
QueryPlan::ValRowsPack { plan, stride, offset, packed } => VecOperator::val_rows_pack(plan, stride, offset, packed),
QueryPlan::ValRowsUnpack { plan, stride, offset, unpacked } => VecOperator::val_rows_unpack(plan, stride, offset, unpacked),
QueryPlan::LessThan { lhs, rhs, less_than } => VecOperator::less_than(lhs, rhs, less_than.u8()?)?,
QueryPlan::LessThanEquals { lhs, rhs, less_than_equals } => VecOperator::less_than_equals(lhs, rhs, less_than_equals.u8()?)?,
QueryPlan::Equals { lhs, rhs, equals } => VecOperator::equals(lhs, rhs, equals.u8()?)?,
QueryPlan::NotEquals { lhs, rhs, not_equals } => VecOperator::not_equals(lhs, rhs, not_equals.u8()?)?,
QueryPlan::Add { lhs, rhs, sum } => VecOperator::addition(lhs, rhs, sum.i64()?)?,
QueryPlan::CheckedAdd { lhs, rhs, sum } => VecOperator::checked_addition(lhs, rhs, sum.i64()?)?,
QueryPlan::NullableCheckedAdd { lhs, rhs, present, sum } => VecOperator::nullable_checked_addition(lhs, rhs, present, sum)?,
QueryPlan::Subtract { lhs, rhs, difference } => VecOperator::subtraction(lhs, rhs, difference.i64()?)?,
QueryPlan::CheckedSubtract { lhs, rhs, difference } => VecOperator::checked_subtraction(lhs, rhs, difference.i64()?)?,
QueryPlan::NullableCheckedSubtract { lhs, rhs, present, difference } => VecOperator::nullable_checked_subtraction(lhs, rhs, present, difference)?,
QueryPlan::Multiply { lhs, rhs, product } => VecOperator::multiplication(lhs, rhs, product.i64()?)?,
QueryPlan::CheckedMultiply { lhs, rhs, product } => VecOperator::checked_multiplication(lhs, rhs, product.i64()?)?,
QueryPlan::NullableCheckedMultiply { lhs, rhs, present, product } => VecOperator::nullable_checked_multiplication(lhs, rhs, present, product)?,
QueryPlan::Divide { lhs, rhs, division } => VecOperator::division(lhs, rhs, division.i64()?)?,
QueryPlan::CheckedDivide { lhs, rhs, division } => VecOperator::checked_division(lhs, rhs, division.i64()?)?,
QueryPlan::NullableCheckedDivide { lhs, rhs, present, division } => VecOperator::nullable_checked_division(lhs, rhs, present, division)?,
QueryPlan::Modulo { lhs, rhs, modulo } => VecOperator::modulo(lhs, rhs, modulo.i64()?)?,
QueryPlan::CheckedModulo { lhs, rhs, modulo } => VecOperator::checked_modulo(lhs, rhs, modulo.i64()?)?,
QueryPlan::NullableCheckedModulo { lhs, rhs, present, modulo } => VecOperator::nullable_checked_modulo(lhs, rhs, present, modulo)?,
QueryPlan::Or { lhs, rhs, or } => VecOperator::or(lhs.u8()?, rhs.u8()?, or.u8()?),
QueryPlan::And { lhs, rhs, and } => VecOperator::and(lhs.u8()?, rhs.u8()?, and.u8()?),
QueryPlan::Not { input, not } => VecOperator::not(input, not),
QueryPlan::ToYear { timestamp, year } => VecOperator::to_year(timestamp.i64()?, year.i64()?),
QueryPlan::Regex { plan, regex, matches } => VecOperator::regex(plan, ®ex, matches),
QueryPlan::Length { string, length } => VecOperator::length(string, length),
QueryPlan::Indices { plan, indices } => VecOperator::indices(plan, indices),
QueryPlan::SortBy { ranking, indices, desc, stable, permutation } => VecOperator::sort_by(ranking, indices, desc, stable, permutation)?,
QueryPlan::TopN { ranking, n, desc, tmp_keys, top_n } => VecOperator::top_n(ranking, tmp_keys, n, desc, top_n)?,
QueryPlan::Connect { input, output } => VecOperator::identity(input, output),
QueryPlan::Merge { lhs, rhs, limit, desc, merge_ops, merged } => VecOperator::merge(lhs, rhs, limit, desc, merge_ops, merged)?,
QueryPlan::MergePartitioned { partitioning, lhs, rhs, limit, desc, take_left, merged } => VecOperator::merge_partitioned(partitioning, lhs, rhs, limit, desc, take_left, merged)?,
QueryPlan::MergeDeduplicate { lhs, rhs, merge_ops, merged } => VecOperator::merge_deduplicate(lhs, rhs, merge_ops, merged)?,
QueryPlan::MergeDeduplicatePartitioned { partitioning, lhs, rhs, merge_ops, merged } => VecOperator::merge_deduplicate_partitioned(partitioning, lhs, rhs, merge_ops, merged)?,
QueryPlan::Partition { lhs, rhs, limit, desc, partitioning } => VecOperator::partition(lhs, rhs, limit, desc, partitioning)?,
QueryPlan::Subpartition { partitioning, lhs, rhs, desc, subpartitioning } => VecOperator::subpartition(partitioning, lhs, rhs, desc, subpartitioning)?,
QueryPlan::MergeDrop { merge_ops, lhs, rhs, merged } => VecOperator::merge_drop(merge_ops, lhs, rhs, merged)?,
QueryPlan::MergeKeep { take_left, lhs, rhs, merged } => VecOperator::merge_keep(take_left, lhs, rhs, merged)?,
QueryPlan::MergeAggregate { merge_ops, lhs, rhs, aggregator, merged } => VecOperator::merge_aggregate(merge_ops, lhs, rhs, aggregator, merged),
QueryPlan::ConstantVec { index, constant_vec } => VecOperator::constant_vec(std::mem::replace(&mut constant_vecs[index], Data::empty(1)), constant_vec.any()),
};
result.push(operation);
Ok(result.last_buffer())
}