use itertools::Itertools;
use locustdb_derive::reify_types;
use regex::Regex;
use crate::engine::*;
use crate::engine::Aggregator;
use crate::ingest::raw_val::RawVal;
use crate::mem_store::*;
use crate::QueryError;
use std::fmt::Write;
use std::intrinsics::type_name;
use std::marker::PhantomData;
use std::result::Result;
use super::aggregate::*;
use super::assemble_nullable::AssembleNullable;
use super::binary_operator::*;
use super::bit_unpack::BitUnpackOperator;
use super::bool_op::*;
use super::column_ops::*;
use super::combine_null_maps::CombineNullMaps;
use super::compact::Compact;
use super::comparison_operators::*;
use super::constant::Constant;
use super::constant_expand::ConstantExpand;
use super::constant_vec::ConstantVec;
use super::delta_decode::*;
use super::dict_lookup::*;
use super::encode_const::*;
use super::exists::Exists;
use super::filter::{Filter, NullableFilter};
use super::functions::*;
use super::fuse_nulls::*;
use super::get_null_map::GetNullMap;
use super::hashmap_grouping::HashMapGrouping;
use super::hashmap_grouping_byte_slices::HashMapGroupingByteSlices;
use super::hashmap_grouping_val_rows::HashMapGroupingValRows;
use super::identity::Identity;
use super::indices::Indices;
use super::is_null::*;
use super::make_nullable::MakeNullable;
use super::map_operator::MapOperator;
use super::merge::Merge;
use super::merge_aggregate::MergeAggregate;
use super::merge_deduplicate::MergeDeduplicate;
use super::merge_deduplicate_partitioned::MergeDeduplicatePartitioned;
use super::merge_drop::MergeDrop;
use super::merge_keep::*;
use super::merge_partitioned::MergePartitioned;
use super::nonzero_compact::NonzeroCompact;
use super::nonzero_indices::NonzeroIndices;
use super::null_vec::NullVec;
use super::numeric_operators::*;
use super::parameterized_vec_vec_int_op::*;
use super::partition::Partition;
use super::propagate_nullability::PropagateNullability;
use super::scalar_i64::ScalarI64;
use super::scalar_str::ScalarStr;
use super::select::*;
use super::slice_pack::*;
use super::slice_unpack::*;
use super::sort_by::*;
use super::sort_by_slices::SortBySlices;
use super::sort_by_val_rows::SortByValRows;
use super::subpartition::SubPartition;
use super::to_val::*;
use super::top_n::TopN;
use super::type_conversion::TypeConversionOperator;
use super::unhexpack_strings::UnhexpackStrings;
use super::unpack_strings::UnpackStrings;
use super::val_rows_pack::*;
use super::val_rows_unpack::*;
pub type BoxedOperator<'a> = Box<dyn VecOperator<'a> + 'a>;
pub trait VecOperator<'a> {
fn execute(&mut self, stream: bool, scratchpad: &mut Scratchpad<'a>) -> Result<(), QueryError>;
fn finalize(&mut self, _scratchpad: &mut Scratchpad<'a>) {}
fn init(&mut self, _total_count: usize, _batch_size: usize, _scratchpad: &mut Scratchpad<'a>) {}
fn inputs(&self) -> Vec<BufferRef<Any>>;
fn outputs(&self) -> Vec<BufferRef<Any>>;
fn can_stream_input(&self, i: usize) -> bool;
fn can_stream_output(&self, i: usize) -> bool;
fn mutates(&self, _i: usize) -> bool { false }
fn allocates(&self) -> bool;
fn is_streaming_producer(&self) -> bool { false }
fn has_more(&self) -> bool { false }
fn custom_output_len(&self) -> Option<usize> { None }
fn display(&self, full: bool) -> String {
let mut s = String::new();
if self.display_output() {
write!(s, "{:<12} = ", self.outputs().iter().map(|o| format!("{}", o)).join(", ")).unwrap();
}
write!(s, "{}", self.display_op(full)).unwrap();
format!("{:<60} {}", s, short_type_name::<Self>())
}
fn display_output(&self) -> bool { true }
fn display_op(&self, alternate: bool) -> String;
}
fn short_type_name<T: ?Sized>() -> String {
let full_name = type_name::<T>();
let re = Regex::new(r"\w+::").unwrap();
re.replace_all(full_name, "").into_owned()
}
impl<'a> dyn VecOperator<'a> {
pub fn read_column_data(colname: String,
section_index: usize,
output: BufferRef<Any>) -> BoxedOperator<'a> {
Box::new(ReadColumnData {
colname,
section_index,
output,
batch_size: 0,
current_index: 0,
has_more: true,
})
}
pub fn nullable(data: TypedBufferRef,
present: BufferRef<u8>,
nullable_data: TypedBufferRef) -> Result<BoxedOperator<'a>, QueryError> {
match data.tag {
EncodingType::U8 => Ok(Box::new(AssembleNullable { data: data.u8()?, present, nullable_data: nullable_data.nullable_u8()? })),
EncodingType::I64 => Ok(Box::new(AssembleNullable { data: data.i64()?, present, nullable_data: nullable_data.nullable_i64()? })),
EncodingType::Str => Ok(Box::new(AssembleNullable { data: data.str()?, present, nullable_data: nullable_data.nullable_str()? })),
_ => Err(fatal!("nullable not implemented for type {:?}", data.tag)),
}
}
pub fn make_nullable(data: TypedBufferRef,
present: BufferRef<u8>,
nullable_data: TypedBufferRef) -> Result<BoxedOperator<'a>, QueryError> {
match data.tag {
EncodingType::U8 => Ok(Box::new(MakeNullable { data: data.u8()?, present, nullable_data: nullable_data.nullable_u8()? })),
EncodingType::I64 => Ok(Box::new(MakeNullable { data: data.i64()?, present, nullable_data: nullable_data.nullable_i64()? })),
EncodingType::Str => Ok(Box::new(MakeNullable { data: data.str()?, present, nullable_data: nullable_data.nullable_str()? })),
_ => Err(fatal!("make_nullable not implemented for type {:?}", data.tag)),
}
}
pub fn combine_null_maps(lhs: TypedBufferRef,
rhs: TypedBufferRef,
output: BufferRef<u8>) -> Result<BoxedOperator<'a>, QueryError> {
Ok(Box::new(CombineNullMaps { lhs: lhs.nullable_any()?, rhs: rhs.nullable_any()?, output }))
}
pub fn propagate_nullability(nullability: BufferRef<Nullable<Any>>,
data: TypedBufferRef,
output: TypedBufferRef) -> Result<BoxedOperator<'a>, QueryError> {
match data.tag {
EncodingType::U8 => Ok(Box::new(PropagateNullability { from: nullability, to: data.u8()?, output: output.nullable_u8()? })),
EncodingType::I64 => Ok(Box::new(PropagateNullability { from: nullability, to: data.i64()?, output: output.nullable_i64()? })),
EncodingType::Str => Ok(Box::new(PropagateNullability { from: nullability, to: data.str()?, output: output.nullable_str()? })),
_ => Err(fatal!("propagate_nullability not implemented for type {:?}", data.tag)),
}
}
pub fn get_null_map(nullability: BufferRef<Nullable<Any>>,
present: BufferRef<u8>) -> BoxedOperator<'a> {
Box::new(GetNullMap { from: nullability, present })
}
pub fn fuse_nulls(input: TypedBufferRef, fused: TypedBufferRef) -> Result<BoxedOperator<'a>, QueryError> {
if input.tag == EncodingType::NullableI64 {
Ok(Box::new(FuseNullsI64 { input: input.nullable_i64()?, fused: fused.i64()? }))
} else {
Ok(Box::new(FuseNullsStr { input: input.nullable_str()?, fused: fused.opt_str()? }))
}
}
pub fn fuse_int_nulls(offset: i64, input: TypedBufferRef, fused: TypedBufferRef) -> Result<BoxedOperator<'a>, QueryError> {
match fused.tag {
EncodingType::U8 => Ok(Box::new(FuseIntNulls { offset: offset as u8, input: input.nullable_u8()?, fused: fused.u8()? })),
EncodingType::U16 => Ok(Box::new(FuseIntNulls { offset: offset as u16, input: input.nullable_u16()?, fused: fused.u16()? })),
EncodingType::U32 => Ok(Box::new(FuseIntNulls { offset: offset as u32, input: input.nullable_u32()?, fused: fused.u32()? })),
EncodingType::I64 => Ok(Box::new(FuseIntNulls { offset: offset as i64, input: input.nullable_i64()?, fused: fused.i64()? })),
_ => Err(fatal!("fuse_int_nulls not implemented for type {:?}", fused.tag)),
}
}
pub fn unfuse_int_nulls(offset: i64,
fused: TypedBufferRef,
data: TypedBufferRef,
present: BufferRef<u8>,
unfused: TypedBufferRef) -> Result<BoxedOperator<'a>, QueryError> {
match fused.tag {
EncodingType::U8 => Ok(Box::new(UnfuseIntNulls { offset: offset as u8, fused: fused.u8()?, data: data.u8()?, present, unfused: unfused.nullable_u8()? })),
EncodingType::U16 => Ok(Box::new(UnfuseIntNulls { offset: offset as u16, fused: fused.u16()?, data: data.u16()?, present, unfused: unfused.nullable_u16()? })),
EncodingType::U32 => Ok(Box::new(UnfuseIntNulls { offset: offset as u32, fused: fused.u32()?, data: data.u32()?, present, unfused: unfused.nullable_u32()? })),
EncodingType::I64 => Ok(Box::new(UnfuseIntNulls { offset: offset as i64, fused: fused.i64()?, data: data.i64()?, present, unfused: unfused.nullable_i64()? })),
_ => Err(fatal!("unfuse_int_nulls not implemented for type {:?}", fused.tag)),
}
}
pub fn unfuse_nulls(fused: TypedBufferRef,
data: TypedBufferRef,
present: BufferRef<u8>,
unfused: TypedBufferRef) -> Result<BoxedOperator<'a>, QueryError> {
if fused.tag == EncodingType::I64 {
Ok(Box::new(UnfuseNullsI64 { fused: fused.i64()?, present, unfused: unfused.nullable_i64()? }))
} else {
Ok(Box::new(UnfuseNullsStr { fused: fused.opt_str()?, data: data.str()?, present, unfused: unfused.nullable_str()? }))
}
}
pub fn is_null(input: BufferRef<Nullable<Any>>, is_null: BufferRef<u8>) -> BoxedOperator<'a> {
Box::new(IsNull { input, is_null })
}
pub fn is_not_null(input: BufferRef<Nullable<Any>>, is_not_null: BufferRef<u8>) -> BoxedOperator<'a> {
Box::new(IsNotNull { input, is_not_null })
}
pub fn identity(input: TypedBufferRef, output: TypedBufferRef) -> BoxedOperator<'a> {
Box::new(Identity {
input: input.any(),
output: output.any(),
})
}
pub fn dict_lookup(indices: TypedBufferRef,
dict_indices: BufferRef<u64>,
dict_data: BufferRef<u8>,
output: BufferRef<&'a str>) -> Result<BoxedOperator<'a>, QueryError> {
reify_types![
"dict_lookup";
indices: Integer;
Ok(Box::new(DictLookup { indices, output, dict_indices, dict_data }))
]
}
#[cfg(feature = "enable_lz4")]
pub fn lz4_decode(encoded: BufferRef<u8>,
decoded_len: usize,
decoded: TypedBufferRef) -> Result<BoxedOperator<'a>, QueryError> {
use super::lz4_decode::LZ4Decode;
use std::io::Read;
let reader: Box<dyn Read> = Box::new(&[] as &[u8]);
reify_types! {
"lz4_decode";
decoded: Integer;
Ok(Box::new(LZ4Decode::<'a, _> { encoded, decoded, decoded_len, reader, has_more: true }))
}
}
#[cfg(not(feature = "enable_lz4"))]
pub fn lz4_decode(_: BufferRef<u8>,
_: usize,
_: TypedBufferRef) -> Result<BoxedOperator<'a>, QueryError> {
panic!("LZ4 is not enabled in this build of LocustDB. Recompile with `features enable_lz4`")
}
pub fn unpack_strings(packed: BufferRef<u8>, unpacked: BufferRef<&'a str>) -> BoxedOperator<'a> {
Box::new(UnpackStrings::<'a> { packed, unpacked, iterator: None, has_more: true })
}
pub fn unhexpack_strings(packed: BufferRef<u8>,
uppercase: bool,
total_bytes: usize,
stringstore: BufferRef<u8>,
unpacked: BufferRef<&'a str>) -> BoxedOperator<'a> {
Box::new(UnhexpackStrings::<'a> { packed, unpacked, stringstore, uppercase, total_bytes, iterator: None, has_more: true })
}
pub fn delta_decode(encoded: TypedBufferRef, decoded: BufferRef<i64>) -> Result<BoxedOperator<'a>, QueryError> {
reify_types! {
"delta_decode";
encoded: Integer;
Ok(Box::new(DeltaDecode { encoded, decoded, previous: 0 }))
}
}
pub fn inverse_dict_lookup(dict_indices: BufferRef<u64>,
dict_data: BufferRef<u8>,
constant: BufferRef<Scalar<&'a str>>,
output: BufferRef<Scalar<i64>>) -> BoxedOperator<'a> {
Box::new(InverseDictLookup { dict_indices, dict_data, constant, output })
}
pub fn encode_int_const(constant: BufferRef<Scalar<i64>>,
codec: Codec,
output: BufferRef<Scalar<i64>>) -> BoxedOperator<'a> {
Box::new(EncodeIntConstant { constant, output, codec })
}
pub fn filter(input: TypedBufferRef,
filter: BufferRef<u8>,
output: TypedBufferRef) -> Result<BoxedOperator<'a>, QueryError> {
reify_types! {
"filter";
input, output: PrimitiveUSize;
Ok(Box::new(Filter { input, filter, output }))
}
}
pub fn nullable_filter(input: TypedBufferRef,
filter: BufferRef<Nullable<u8>>,
output: TypedBufferRef) -> Result<BoxedOperator<'a>, QueryError> {
reify_types! {
"nullable_filter";
input, output: PrimitiveUSize;
Ok(Box::new(NullableFilter { input, filter, output }))
}
}
pub fn select(input: TypedBufferRef,
indices: BufferRef<usize>,
output: TypedBufferRef) -> Result<BoxedOperator<'a>, QueryError> {
reify_types! {
"select";
input, output: PrimitiveUSize;
Ok(Box::new(Select { input, indices, output }));
input, output: NullablePrimitive;
Ok(Box::new(SelectNullable { input, indices, output }))
}
}
pub fn constant(val: RawVal, hide_value: bool, output: BufferRef<RawVal>) -> BoxedOperator<'a> {
Box::new(Constant { val, hide_value, output })
}
pub fn scalar_i64(val: i64, hide_value: bool, output: BufferRef<Scalar<i64>>) -> BoxedOperator<'a> {
Box::new(ScalarI64 { val, hide_value, output })
}
pub fn scalar_str(val: String, pinned: BufferRef<Scalar<String>>, output: BufferRef<Scalar<&'a str>>) -> BoxedOperator<'a> {
Box::new(ScalarStr { val, pinned, output })
}
pub fn null_vec(len: usize, output: BufferRef<Any>) -> BoxedOperator<'a> {
Box::new(NullVec { len, output })
}
pub fn constant_expand(val: i64, len: usize, output: TypedBufferRef) -> Result<BoxedOperator<'a>, QueryError> {
match output.tag {
EncodingType::U8 => Ok(Box::new(ConstantExpand {
val: val as u8,
output: output.u8()?,
current_index: 0,
len,
batch_size: 0, })),
_ => Err(fatal!("constant_expand not implemented for type {:?}", output.tag)),
}
}
pub fn constant_vec(val: BoxedData<'a>, output: BufferRef<Any>) -> BoxedOperator<'a> {
Box::new(ConstantVec { val, output })
}
pub fn less_than(lhs: TypedBufferRef, rhs: TypedBufferRef, output: BufferRef<u8>) -> Result<BoxedOperator<'a>, QueryError> {
reify_types! {
"less_than";
lhs: Str, rhs: ScalarStr;
Ok(Box::new(BinaryVSOperator { lhs, rhs, output, op: PhantomData::<LessThan> }));
lhs: ScalarStr, rhs: Str;
Ok(Box::new(BinarySVOperator { lhs, rhs, output, op: PhantomData::<LessThan> }));
lhs: Str, rhs: Str;
Ok(Box::new(BinaryOperator { lhs, rhs, output, op: PhantomData::<LessThan> }));
lhs: IntegerNoU64, rhs: ScalarI64;
Ok(Box::new(BinaryVSOperator { lhs, rhs, output, op: PhantomData::<LessThan> }));
lhs: ScalarI64, rhs: IntegerNoU64;
Ok(Box::new(BinarySVOperator { lhs, rhs, output, op: PhantomData::<LessThan> }));
lhs: IntegerNoU64, rhs: IntegerNoU64;
Ok(Box::new(BinaryOperator { lhs, rhs, output, op: PhantomData::<LessThan> }))
}
}
pub fn less_than_equals(lhs: TypedBufferRef, rhs: TypedBufferRef, output: BufferRef<u8>) -> Result<BoxedOperator<'a>, QueryError> {
reify_types! {
"less_than_equals";
lhs: Str, rhs: ScalarStr;
Ok(Box::new(BinaryVSOperator { lhs, rhs, output, op: PhantomData::<LessThanEquals> }));
lhs: ScalarStr, rhs: Str;
Ok(Box::new(BinarySVOperator { lhs, rhs, output, op: PhantomData::<LessThanEquals> }));
lhs: Str, rhs: Str;
Ok(Box::new(BinaryOperator { lhs, rhs, output, op: PhantomData::<LessThanEquals> }));
lhs: IntegerNoU64, rhs: ScalarI64;
Ok(Box::new(BinaryVSOperator { lhs, rhs, output, op: PhantomData::<LessThanEquals> }));
lhs: ScalarI64, rhs: IntegerNoU64;
Ok(Box::new(BinarySVOperator { lhs, rhs, output, op: PhantomData::<LessThanEquals> }));
lhs: IntegerNoU64, rhs: IntegerNoU64;
Ok(Box::new(BinaryOperator { lhs, rhs, output, op: PhantomData::<LessThanEquals> }))
}
}
pub fn equals(lhs: TypedBufferRef,
rhs: TypedBufferRef,
output: BufferRef<u8>) -> Result<BoxedOperator<'a>, QueryError> {
reify_types! {
"equals";
lhs: Str, rhs: ScalarStr;
Ok(Box::new(BinaryVSOperator { lhs, rhs, output, op: PhantomData::<Equals> }));
lhs: ScalarStr, rhs: Str;
Ok(Box::new(BinaryVSOperator { lhs: rhs, rhs: lhs, output, op: PhantomData::<Equals> }));
lhs: Str, rhs: Str;
Ok(Box::new(BinaryOperator { lhs, rhs, output, op: PhantomData::<Equals> }));
lhs: IntegerNoU64, rhs: ScalarI64;
Ok(Box::new(BinaryVSOperator { lhs, rhs, output, op: PhantomData::<Equals> }));
lhs: ScalarI64, rhs: IntegerNoU64;
Ok(Box::new(BinaryVSOperator { lhs: rhs, rhs: lhs, output, op: PhantomData::<Equals> }));
lhs: IntegerNoU64, rhs: IntegerNoU64;
Ok(Box::new(BinaryOperator { lhs, rhs, output, op: PhantomData::<Equals> }))
}
}
pub fn not_equals(lhs: TypedBufferRef,
rhs: TypedBufferRef,
output: BufferRef<u8>) -> Result<BoxedOperator<'a>, QueryError> {
reify_types! {
"not_equals";
lhs: Str, rhs: ScalarStr;
Ok(Box::new(BinaryVSOperator { lhs, rhs, output, op: PhantomData::<NotEquals> }));
lhs: ScalarStr, rhs: Str;
Ok(Box::new(BinaryVSOperator { lhs: rhs, rhs: lhs, output, op: PhantomData::<NotEquals> }));
lhs: Str, rhs: Str;
Ok(Box::new(BinaryOperator { lhs, rhs, output, op: PhantomData::<NotEquals> }));
lhs: IntegerNoU64, rhs: ScalarI64;
Ok(Box::new(BinaryVSOperator { lhs, rhs, output, op: PhantomData::<NotEquals> }));
lhs: ScalarI64, rhs: IntegerNoU64;
Ok(Box::new(BinaryVSOperator { lhs: rhs, rhs: lhs, output, op: PhantomData::<NotEquals> }));
lhs: IntegerNoU64, rhs: IntegerNoU64;
Ok(Box::new(BinaryOperator { lhs, rhs, output, op: PhantomData::<NotEquals> }))
}
}
pub fn addition(lhs: TypedBufferRef,
rhs: TypedBufferRef,
output: BufferRef<i64>) -> Result<BoxedOperator<'a>, QueryError> {
reify_types! {
"addition";
lhs: ScalarI64, rhs: IntegerNoU64;
Ok(Box::new(BinaryVSOperator { lhs: rhs, rhs: lhs, output, op: PhantomData::<Addition<_, _>> }));
lhs: IntegerNoU64, rhs: ScalarI64;
Ok(Box::new(BinaryVSOperator { lhs, rhs, output, op: PhantomData::<Addition<_, _>> }));
lhs: IntegerNoU64, rhs: IntegerNoU64;
Ok(Box::new(BinaryOperator { lhs, rhs, output, op: PhantomData::<Addition<_, _>> }))
}
}
pub fn checked_addition(lhs: TypedBufferRef,
rhs: TypedBufferRef,
output: BufferRef<i64>) -> Result<BoxedOperator<'a>, QueryError> {
reify_types! {
"check_addition";
lhs: ScalarI64, rhs: IntegerNoU64;
Ok(Box::new(CheckedBinaryVSOperator { lhs: rhs, rhs: lhs, output, op: PhantomData::<Addition<_, _>> }));
lhs: IntegerNoU64, rhs: ScalarI64;
Ok(Box::new(CheckedBinaryVSOperator { lhs, rhs, output, op: PhantomData::<Addition<_, _>> }));
lhs: IntegerNoU64, rhs: IntegerNoU64;
Ok(Box::new(CheckedBinaryOperator { lhs, rhs, output, op: PhantomData::<Addition<_, _>> }))
}
}
pub fn nullable_checked_addition(lhs: TypedBufferRef,
rhs: TypedBufferRef,
present: BufferRef<u8>,
output: BufferRef<Nullable<i64>>) -> Result<BoxedOperator<'a>, QueryError> {
reify_types! {
"nullable_check_addition";
lhs: ScalarI64, rhs: IntegerNoU64;
Ok(Box::new(NullableCheckedBinaryVSOperator { lhs: rhs, rhs: lhs, present, output, op: PhantomData::<Addition<_, _>> }));
lhs: IntegerNoU64, rhs: ScalarI64;
Ok(Box::new(NullableCheckedBinaryVSOperator { lhs, rhs, output, present, op: PhantomData::<Addition<_, _>> }));
lhs: IntegerNoU64, rhs: IntegerNoU64;
Ok(Box::new(NullableCheckedBinaryOperator { lhs, rhs, output, present, op: PhantomData::<Addition<_, _>> }))
}
}
pub fn subtraction(lhs: TypedBufferRef,
rhs: TypedBufferRef,
output: BufferRef<i64>) -> Result<BoxedOperator<'a>, QueryError> {
reify_types! {
"subtraction";
lhs: ScalarI64, rhs: IntegerNoU64;
Ok(Box::new(BinarySVOperator { lhs, rhs, output, op: PhantomData::<Subtraction<_, _>> }));
lhs: IntegerNoU64, rhs: ScalarI64;
Ok(Box::new(BinaryVSOperator { lhs, rhs, output, op: PhantomData::<Subtraction<_, _>> }));
lhs: IntegerNoU64, rhs: IntegerNoU64;
Ok(Box::new(BinaryOperator { lhs, rhs, output, op: PhantomData::<Subtraction<_, _>> }))
}
}
pub fn checked_subtraction(lhs: TypedBufferRef,
rhs: TypedBufferRef,
output: BufferRef<i64>) -> Result<BoxedOperator<'a>, QueryError> {
reify_types! {
"checked_subtraction";
lhs: ScalarI64, rhs: IntegerNoU64;
Ok(Box::new(CheckedBinarySVOperator { lhs, rhs, output, op: PhantomData::<Subtraction<_, _>> }));
lhs: IntegerNoU64, rhs: ScalarI64;
Ok(Box::new(CheckedBinaryVSOperator { lhs, rhs, output, op: PhantomData::<Subtraction<_, _>> }));
lhs: IntegerNoU64, rhs: IntegerNoU64;
Ok(Box::new(CheckedBinaryOperator { lhs, rhs, output, op: PhantomData::<Subtraction<_, _>> }))
}
}
pub fn nullable_checked_subtraction(lhs: TypedBufferRef,
rhs: TypedBufferRef,
present: BufferRef<u8>,
output: BufferRef<Nullable<i64>>) -> Result<BoxedOperator<'a>, QueryError> {
reify_types! {
"nullable_checked_subtraction";
lhs: ScalarI64, rhs: IntegerNoU64;
Ok(Box::new(NullableCheckedBinarySVOperator { lhs, rhs, output, present, op: PhantomData::<Subtraction<_, _>> }));
lhs: IntegerNoU64, rhs: ScalarI64;
Ok(Box::new(NullableCheckedBinaryVSOperator { lhs, rhs, output, present, op: PhantomData::<Subtraction<_, _>> }));
lhs: IntegerNoU64, rhs: IntegerNoU64;
Ok(Box::new(NullableCheckedBinaryOperator { lhs, rhs, output, present, op: PhantomData::<Subtraction<_, _>> }))
}
}
pub fn multiplication(lhs: TypedBufferRef,
rhs: TypedBufferRef,
output: BufferRef<i64>) -> Result<BoxedOperator<'a>, QueryError> {
reify_types! {
"multiplication";
lhs: ScalarI64, rhs: IntegerNoU64;
Ok(Box::new(BinaryVSOperator { lhs: rhs, rhs: lhs, output, op: PhantomData::<Multiplication<_, _>> }));
lhs: IntegerNoU64, rhs: ScalarI64;
Ok(Box::new(BinaryVSOperator { lhs, rhs, output, op: PhantomData::<Multiplication<_, _>> }));
lhs: IntegerNoU64, rhs: IntegerNoU64;
Ok(Box::new(BinaryOperator { lhs, rhs, output, op: PhantomData::<Multiplication<_, _>> }))
}
}
pub fn checked_multiplication(lhs: TypedBufferRef,
rhs: TypedBufferRef,
output: BufferRef<i64>) -> Result<BoxedOperator<'a>, QueryError> {
reify_types! {
"checked_multiplication";
lhs: ScalarI64, rhs: IntegerNoU64;
Ok(Box::new(CheckedBinaryVSOperator { lhs: rhs, rhs: lhs, output, op: PhantomData::<Multiplication<_, _>> }));
lhs: IntegerNoU64, rhs: ScalarI64;
Ok(Box::new(CheckedBinaryVSOperator { lhs, rhs, output, op: PhantomData::<Multiplication<_, _>> }));
lhs: IntegerNoU64, rhs: IntegerNoU64;
Ok(Box::new(CheckedBinaryOperator { lhs, rhs, output, op: PhantomData::<Multiplication<_, _>> }))
}
}
pub fn nullable_checked_multiplication(lhs: TypedBufferRef,
rhs: TypedBufferRef,
present: BufferRef<u8>,
output: BufferRef<Nullable<i64>>) -> Result<BoxedOperator<'a>, QueryError> {
reify_types! {
"nullable_checked_multiplication";
lhs: ScalarI64, rhs: IntegerNoU64;
Ok(Box::new(NullableCheckedBinaryVSOperator { lhs: rhs, rhs: lhs, output, present, op: PhantomData::<Multiplication<_, _>> }));
lhs: IntegerNoU64, rhs: ScalarI64;
Ok(Box::new(NullableCheckedBinaryVSOperator { lhs, rhs, output, present, op: PhantomData::<Multiplication<_, _>> }));
lhs: IntegerNoU64, rhs: IntegerNoU64;
Ok(Box::new(NullableCheckedBinaryOperator { lhs, rhs, output, present, op: PhantomData::<Multiplication<_, _>> }))
}
}
pub fn division(lhs: TypedBufferRef,
rhs: TypedBufferRef,
output: BufferRef<i64>) -> Result<BoxedOperator<'a>, QueryError> {
reify_types! {
"division";
lhs: ScalarI64, rhs: IntegerNoU64;
Ok(Box::new(BinarySVOperator { lhs, rhs, output, op: PhantomData::<Division<_, _>> }));
lhs: IntegerNoU64, rhs: ScalarI64;
Ok(Box::new(BinaryVSOperator { lhs, rhs, output, op: PhantomData::<Division<_, _>> }));
lhs: IntegerNoU64, rhs: IntegerNoU64;
Ok(Box::new(BinaryOperator { lhs, rhs, output, op: PhantomData::<Division<_, _>> }))
}
}
pub fn checked_division(lhs: TypedBufferRef,
rhs: TypedBufferRef,
output: BufferRef<i64>) -> Result<BoxedOperator<'a>, QueryError> {
reify_types! {
"checked_division";
lhs: ScalarI64, rhs: IntegerNoU64;
Ok(Box::new(CheckedBinarySVOperator { lhs, rhs, output, op: PhantomData::<Division<_, _>> }));
lhs: IntegerNoU64, rhs: ScalarI64;
Ok(Box::new(CheckedBinaryVSOperator { lhs, rhs, output, op: PhantomData::<Division<_, _>> }));
lhs: IntegerNoU64, rhs: IntegerNoU64;
Ok(Box::new(CheckedBinaryOperator { lhs, rhs, output, op: PhantomData::<Division<_, _>> }))
}
}
pub fn nullable_checked_division(lhs: TypedBufferRef,
rhs: TypedBufferRef,
present: BufferRef<u8>,
output: BufferRef<Nullable<i64>>) -> Result<BoxedOperator<'a>, QueryError> {
reify_types! {
"nullable_checked_division";
lhs: ScalarI64, rhs: IntegerNoU64;
Ok(Box::new(NullableCheckedBinarySVOperator { lhs, rhs, output, present, op: PhantomData::<Division<_, _>> }));
lhs: IntegerNoU64, rhs: ScalarI64;
Ok(Box::new(NullableCheckedBinaryVSOperator { lhs, rhs, output, present, op: PhantomData::<Division<_, _>> }));
lhs: IntegerNoU64, rhs: IntegerNoU64;
Ok(Box::new(NullableCheckedBinaryOperator { lhs, rhs, output, present, op: PhantomData::<Division<_, _>> }))
}
}
pub fn modulo(lhs: TypedBufferRef,
rhs: TypedBufferRef,
output: BufferRef<i64>) -> Result<BoxedOperator<'a>, QueryError> {
reify_types! {
"modulo";
lhs: ScalarI64, rhs: IntegerNoU64;
Ok(Box::new(BinarySVOperator { lhs, rhs, output, op: PhantomData::<Modulo<_, _>> }));
lhs: IntegerNoU64, rhs: ScalarI64;
Ok(Box::new(BinaryVSOperator { lhs, rhs, output, op: PhantomData::<Modulo<_, _>> }));
lhs: IntegerNoU64, rhs: IntegerNoU64;
Ok(Box::new(BinaryOperator { lhs, rhs, output, op: PhantomData::<Modulo<_, _>> }))
}
}
pub fn checked_modulo(lhs: TypedBufferRef,
rhs: TypedBufferRef,
output: BufferRef<i64>) -> Result<BoxedOperator<'a>, QueryError> {
reify_types! {
"checked_modulo";
lhs: ScalarI64, rhs: IntegerNoU64;
Ok(Box::new(CheckedBinarySVOperator { lhs, rhs, output, op: PhantomData::<Modulo<_, _>> }));
lhs: IntegerNoU64, rhs: ScalarI64;
Ok(Box::new(CheckedBinaryVSOperator { lhs, rhs, output, op: PhantomData::<Modulo<_, _>> }));
lhs: IntegerNoU64, rhs: IntegerNoU64;
Ok(Box::new(CheckedBinaryOperator { lhs, rhs, output, op: PhantomData::<Modulo<_, _>> }))
}
}
pub fn nullable_checked_modulo(lhs: TypedBufferRef,
rhs: TypedBufferRef,
present: BufferRef<u8>,
output: BufferRef<Nullable<i64>>) -> Result<BoxedOperator<'a>, QueryError> {
reify_types! {
"nullable_checked_modulo";
lhs: ScalarI64, rhs: IntegerNoU64;
Ok(Box::new(NullableCheckedBinarySVOperator { lhs, rhs, output, present, op: PhantomData::<Modulo<_, _>> }));
lhs: IntegerNoU64, rhs: ScalarI64;
Ok(Box::new(NullableCheckedBinaryVSOperator { lhs, rhs, output, present, op: PhantomData::<Modulo<_, _>> }));
lhs: IntegerNoU64, rhs: IntegerNoU64;
Ok(Box::new(NullableCheckedBinaryOperator { lhs, rhs, output, present, op: PhantomData::<Modulo<_, _>> }))
}
}
pub fn or(lhs: BufferRef<u8>, rhs: BufferRef<u8>, output: BufferRef<u8>) -> BoxedOperator<'a> {
BooleanOperator::<BooleanOr>::compare(lhs, rhs, output)
}
pub fn and(lhs: BufferRef<u8>, rhs: BufferRef<u8>, output: BufferRef<u8>) -> BoxedOperator<'a> {
BooleanOperator::<BooleanAnd>::compare(lhs, rhs, output)
}
pub fn bit_shift_left_add(lhs: BufferRef<i64>,
rhs: BufferRef<i64>,
output: BufferRef<i64>,
shift_amount: i64) -> BoxedOperator<'a> {
Box::new(ParameterizedVecVecIntegerOperator::<BitShiftLeftAdd> { lhs, rhs, output, parameter: shift_amount, op: PhantomData })
}
pub fn bit_unpack(inner: BufferRef<i64>, shift: u8, width: u8, output: BufferRef<i64>) -> BoxedOperator<'a> {
Box::new(BitUnpackOperator { input: inner, output, shift, width })
}
pub fn slice_pack(input: TypedBufferRef, stride: usize, offset: usize, output: BufferRef<Any>) -> Result<BoxedOperator<'a>, QueryError> {
if let EncodingType::Str = input.tag {
return Ok(Box::new(SlicePackString { input: input.str()?, output, stride, offset }));
}
reify_types! {
"slice_pack";
input: Integer;
Ok(Box::new(SlicePackInt { input, output, stride, offset }))
}
}
pub fn slice_unpack(input: BufferRef<Any>, stride: usize, offset: usize, output: TypedBufferRef) -> Result<BoxedOperator<'a>, QueryError> {
if let EncodingType::Str = output.tag {
return Ok(Box::new(SliceUnpackString { input, output: output.str()?, stride, offset }));
}
reify_types! {
"slice_unpack";
output: Integer;
Ok(Box::new(SliceUnpackInt { input, output, stride, offset }))
}
}
pub fn val_rows_pack(input: BufferRef<Val<'a>>, stride: usize, offset: usize, output: BufferRef<ValRows<'a>>) -> BoxedOperator<'a> {
Box::new(ValRowsPack { input, stride, offset, output })
}
pub fn val_rows_unpack(input: BufferRef<ValRows<'a>>, stride: usize, offset: usize, output: BufferRef<Val<'a>>) -> BoxedOperator<'a> {
Box::new(ValRowsUnpack { input, stride, offset, output })
}
pub fn type_conversion(input: TypedBufferRef, output: TypedBufferRef) -> Result<BoxedOperator<'a>, QueryError> {
if output.tag == EncodingType::Val {
let output = output.val()?;
if input.tag.is_nullable() {
if input.tag == EncodingType::NullableStr {
Ok(Box::new(NullableStrToVal { input: input.nullable_str()?, vals: output }) as BoxedOperator<'a>)
} else {
reify_types! {
"nullable_int_to_val";
input: NullableInteger;
Ok(Box::new(NullableIntToVal { input, vals: output }) as BoxedOperator<'a>)
}
}
} else {
reify_types! {
"type_conversion";
input: PrimitiveNoU64;
Ok(Box::new(TypeConversionOperator { input, output }) as BoxedOperator<'a>)
}
}
} else if input.tag == EncodingType::Val {
let input = input.val()?;
if output.tag.is_nullable() {
if output.tag == EncodingType::NullableStr {
Ok(Box::new(ValToNullableStr { vals: input, nullable: output.nullable_str()? }) as BoxedOperator<'a>)
} else {
reify_types! {
"nullable_int_to_val";
output: NullableInteger;
Ok(Box::new(ValToNullableInt { vals: input, nullable: output }) as BoxedOperator<'a>)
}
}
} else {
reify_types! {
"type_conversion";
output: PrimitiveNoU64;
Ok(Box::new(TypeConversionOperator { input, output }) as BoxedOperator<'a>)
}
}
} else {
if input.tag == EncodingType::Str && output.tag == EncodingType::OptStr {
return Ok(Box::new(TypeConversionOperator { input: input.str()?, output: output.opt_str()? }));
}
reify_types! {
"type_conversion";
input: Integer, output: Integer;
Ok(Box::new(TypeConversionOperator { input, output }))
}
}
}
pub fn not(input: BufferRef<u8>, output: BufferRef<u8>) -> BoxedOperator<'a> {
Box::new(MapOperator { input, output, map: BooleanNot })
}
#[allow(clippy::wrong_self_convention)]
pub fn to_year(input: BufferRef<i64>, output: BufferRef<i64>) -> BoxedOperator<'a> {
Box::new(MapOperator { input, output, map: ToYear })
}
pub fn regex(input: BufferRef<&'a str>, r: &str, output: BufferRef<u8>) -> BoxedOperator<'a> {
Box::new(MapOperator { input, output, map: RegexMatch { r: regex::Regex::new(r).unwrap() } })
}
pub fn length(input: BufferRef<&'a str>, output: BufferRef<i64>) -> BoxedOperator<'a> {
Box::new(MapOperator { input, output, map: Length })
}
pub fn aggregate(input: TypedBufferRef,
grouping: TypedBufferRef,
max_index: BufferRef<Scalar<i64>>,
aggregator: Aggregator,
output: TypedBufferRef) -> Result<BoxedOperator<'a>, QueryError> {
if input.is_nullable() {
reify_types! {
"nullable_aggregation";
input: NullableInteger, grouping: Integer, aggregator: Aggregator;
Ok(Box::new(AggregateNullable { input, grouping, output: output.into(), max_index, a: aggregator }))
}
} else {
reify_types! {
"aggregation";
input: IntegerNoU64, grouping: Integer, aggregator: Aggregator;
Ok(Box::new(Aggregate { input, grouping, output: output.into(), max_index, a: aggregator }))
}
}
}
pub fn checked_aggregate(input: TypedBufferRef,
grouping: TypedBufferRef,
max_index: BufferRef<Scalar<i64>>,
aggregator: Aggregator,
output: TypedBufferRef) -> Result<BoxedOperator<'a>, QueryError> {
assert!(aggregator == Aggregator::Sum);
if input.is_nullable() {
reify_types! {
"checked_nullable_aggregation";
input: NullableInteger, grouping: Integer;
Ok(Box::new(CheckedAggregateNullable { input, grouping, output: output.into(), max_index, a: PhantomData::<Sum> }))
}
} else {
reify_types! {
"checked_aggregation";
input: IntegerNoU64, grouping: Integer;
Ok(Box::new(CheckedAggregate { input, grouping, output: output.into(), max_index, a: PhantomData::<Sum> }))
}
}
}
pub fn exists(input: TypedBufferRef, max_index: BufferRef<Scalar<i64>>, output: BufferRef<u8>) -> Result<BoxedOperator<'a>, QueryError> {
reify_types! {
"exists";
input: Integer;
Ok(Box::new(Exists { input, output, max_index }))
}
}
pub fn nonzero_compact(data: TypedBufferRef, compacted: TypedBufferRef) -> Result<BoxedOperator<'a>, QueryError> {
reify_types! {
"nonzero_compact";
data, compacted: Integer;
Ok(Box::new(NonzeroCompact { data, compacted }))
}
}
pub fn nonzero_indices(input: TypedBufferRef, output: TypedBufferRef) -> Result<BoxedOperator<'a>, QueryError> {
reify_types! {
"nonzero_indices";
input: Integer, output: Integer;
Ok(Box::new(NonzeroIndices { input, output }))
}
}
pub fn compact(data: TypedBufferRef, select: TypedBufferRef, compacted: TypedBufferRef) -> Result<BoxedOperator<'a>, QueryError> {
reify_types! {
"compact";
data, compacted: Integer, select: Integer;
Ok(Box::new(Compact { data, select, compacted }))
}
}
pub fn hash_map_grouping(raw_grouping_key: TypedBufferRef,
max_cardinality: usize,
unique_out: TypedBufferRef,
grouping_key_out: BufferRef<u32>,
cardinality_out: BufferRef<Scalar<i64>>) -> Result<BoxedOperator<'a>, QueryError> {
if let EncodingType::ByteSlices(columns) = raw_grouping_key.tag {
return Ok(HashMapGroupingByteSlices::boxed(
raw_grouping_key.buffer, unique_out.buffer, grouping_key_out, cardinality_out, columns));
}
reify_types! {
"hash_map_grouping";
raw_grouping_key, unique_out: Primitive;
Ok(HashMapGrouping::boxed(raw_grouping_key, unique_out, grouping_key_out, cardinality_out, max_cardinality))
}
}
pub fn hash_map_grouping_val_rows(raw_grouping_key: BufferRef<ValRows<'a>>,
columns: usize,
_max_cardinality: usize,
unique_out: BufferRef<ValRows<'a>>,
grouping_key_out: BufferRef<u32>,
cardinality_out: BufferRef<Scalar<i64>>) -> Result<BoxedOperator<'a>, QueryError> {
Ok(HashMapGroupingValRows::boxed(raw_grouping_key, unique_out, grouping_key_out, cardinality_out, columns))
}
pub fn indices(input: TypedBufferRef, indices_out: BufferRef<usize>) -> BoxedOperator<'a> {
Box::new(Indices { input: input.buffer, indices_out })
}
pub fn sort_by(ranking: TypedBufferRef,
indices: BufferRef<usize>,
descending: bool,
stable: bool,
output: BufferRef<usize>) -> Result<BoxedOperator<'a>, QueryError> {
if let EncodingType::ByteSlices(_) = ranking.tag {
return Ok(Box::new(
SortBySlices { ranking: ranking.any(), output, indices, descending, stable }));
}
if let EncodingType::ValRows = ranking.tag {
return Ok(Box::new(
SortByValRows { ranking: ranking.val_rows()?, output, indices, descending, stable }));
}
if ranking.is_nullable() {
reify_types! {
"sort_indices";
ranking: NullablePrimitive;
Ok(Box::new(SortByNullable { ranking, output, indices, descending, stable }))
}
} else {
reify_types! {
"sort_indices";
ranking: Primitive;
Ok(Box::new(SortBy { ranking, output, indices, descending, stable }))
}
}
}
pub fn top_n(input: TypedBufferRef,
keys: TypedBufferRef,
n: usize, desc: bool,
indices_out: BufferRef<usize>) -> Result<BoxedOperator<'a>, QueryError> {
if desc {
reify_types! {
"top_n_desc";
input, keys: Primitive;
Ok(Box::new(TopN { input, keys, indices: indices_out, last_index: 0, n, c: PhantomData::<CmpGreaterThan> }))
}
} else {
reify_types! {
"top_n_asc";
input, keys: Primitive;
Ok(Box::new(TopN { input, keys, indices: indices_out, last_index: 0, n, c: PhantomData::<CmpLessThan> }))
}
}
}
pub fn merge_deduplicate(left: TypedBufferRef,
right: TypedBufferRef,
ops_out: BufferRef<MergeOp>,
merged_out: TypedBufferRef) -> Result<BoxedOperator<'a>, QueryError> {
reify_types! {
"merge_deduplicate";
left, right, merged_out: Primitive;
Ok(Box::new(MergeDeduplicate { left, right, deduplicated: merged_out, merge_ops: ops_out }))
}
}
pub fn partition(left: TypedBufferRef,
right: TypedBufferRef,
limit: usize, desc: bool,
partition_out: BufferRef<Premerge>) -> Result<BoxedOperator<'a>, QueryError> {
if desc {
reify_types! {
"partition";
left, right: Primitive;
Ok(Box::new(Partition { left, right, partitioning: partition_out, limit, c: PhantomData::<CmpGreaterThan> }))
}
} else {
reify_types! {
"partition";
left, right: Primitive;
Ok(Box::new(Partition { left, right, partitioning: partition_out, limit, c: PhantomData::<CmpLessThan> }))
}
}
}
pub fn subpartition(partitioning: BufferRef<Premerge>,
left: TypedBufferRef,
right: TypedBufferRef,
desc: bool,
subpartition_out: BufferRef<Premerge>) -> Result<BoxedOperator<'a>, QueryError> {
if desc {
reify_types! {
"subpartition";
left, right: Primitive;
Ok(Box::new(SubPartition { partitioning, left, right, sub_partitioning: subpartition_out, c: PhantomData::<CmpGreaterThan> }))
}
} else {
reify_types! {
"subpartition";
left, right: Primitive;
Ok(Box::new(SubPartition { partitioning, left, right, sub_partitioning: subpartition_out, c: PhantomData::<CmpLessThan> }))
}
}
}
pub fn merge_deduplicate_partitioned(partitioning: BufferRef<Premerge>,
left: TypedBufferRef,
right: TypedBufferRef,
ops_out: BufferRef<MergeOp>,
merged_out: TypedBufferRef) -> Result<BoxedOperator<'a>, QueryError> {
reify_types! {
"merge_deduplicate_partitioned";
left, right, merged_out: Primitive;
Ok(Box::new(MergeDeduplicatePartitioned { partitioning, left, right, deduplicated: merged_out, merge_ops: ops_out }))
}
}
pub fn merge_drop(merge_ops: BufferRef<MergeOp>,
left: TypedBufferRef,
right: TypedBufferRef,
merged_out: TypedBufferRef) -> Result<BoxedOperator<'a>, QueryError> {
reify_types! {
"merge_drop";
left, right, merged_out: Primitive;
Ok(Box::new(MergeDrop { merge_ops, left, right, deduplicated: merged_out }))
}
}
pub fn merge_aggregate(merge_ops: BufferRef<MergeOp>,
left: BufferRef<i64>,
right: BufferRef<i64>,
aggregator: Aggregator,
aggregated_out: BufferRef<i64>) -> BoxedOperator<'a> {
Box::new(MergeAggregate { merge_ops, left, right, aggregated: aggregated_out, aggregator })
}
pub fn merge_partitioned(partitioning: BufferRef<Premerge>,
left: TypedBufferRef,
right: TypedBufferRef,
limit: usize, desc: bool,
ops_out: BufferRef<u8>,
merged_out: TypedBufferRef) -> Result<BoxedOperator<'a>, QueryError> {
if desc {
reify_types! {
"merge_partitioned_desc";
left, right, merged_out: Primitive;
Ok(Box::new(MergePartitioned { partitioning, left, right, merged: merged_out, take_left: ops_out, limit, c: PhantomData::<CmpGreaterThan> }))
}
} else {
reify_types! {
"merge_partitioned_asc";
left, right, merged_out: Primitive;
Ok(Box::new(MergePartitioned { partitioning, left, right, merged: merged_out, take_left: ops_out, limit, c: PhantomData::<CmpLessThan> }))
}
}
}
pub fn merge(left: TypedBufferRef,
right: TypedBufferRef,
limit: usize,
desc: bool,
ops_out: BufferRef<u8>,
merged_out: TypedBufferRef) -> Result<BoxedOperator<'a>, QueryError> {
if desc {
reify_types! {
"merge_desc";
left, right, merged_out: Primitive;
Ok(Box::new(Merge { left, right, merged: merged_out, merge_ops: ops_out, limit, c: PhantomData::<CmpGreaterThan> }))
}
} else {
reify_types! {
"merge_desc";
left, right, merged_out: Primitive;
Ok(Box::new(Merge { left, right, merged: merged_out, merge_ops: ops_out, limit, c: PhantomData::<CmpLessThan> }))
}
}
}
pub fn merge_keep(merge_ops: BufferRef<u8>,
left: TypedBufferRef,
right: TypedBufferRef,
merged_out: TypedBufferRef) -> Result<BoxedOperator<'a>, QueryError> {
reify_types! {
"merge_keep";
left, right, merged_out: NullablePrimitive;
Ok(Box::new(MergeKeepNullable { merge_ops, left, right, merged: merged_out }));
left, right, merged_out: Primitive;
Ok(Box::new(MergeKeep { merge_ops, left, right, merged: merged_out }))
}
}
}