use std::{any::Any, sync::Arc};
use arrow::array::*;
use arrow::compute::kernels::arithmetic::{
add, divide, divide_scalar, multiply, subtract,
};
use arrow::compute::kernels::boolean::{and_kleene, or_kleene};
use arrow::compute::kernels::comparison::{eq, gt, gt_eq, lt, lt_eq, neq};
use arrow::compute::kernels::comparison::{
eq_scalar, gt_eq_scalar, gt_scalar, lt_eq_scalar, lt_scalar, neq_scalar,
};
use arrow::compute::kernels::comparison::{
eq_utf8, gt_eq_utf8, gt_utf8, like_utf8, like_utf8_scalar, lt_eq_utf8, lt_utf8,
neq_utf8, nlike_utf8, nlike_utf8_scalar,
};
use arrow::compute::kernels::comparison::{
eq_utf8_scalar, gt_eq_utf8_scalar, gt_utf8_scalar, lt_eq_utf8_scalar, lt_utf8_scalar,
neq_utf8_scalar,
};
use arrow::datatypes::{DataType, Schema, TimeUnit};
use arrow::record_batch::RecordBatch;
use crate::error::{DataFusionError, Result};
use crate::logical_plan::Operator;
use crate::physical_plan::expressions::try_cast;
use crate::physical_plan::{ColumnarValue, PhysicalExpr};
use crate::scalar::ScalarValue;
use super::coercion::{eq_coercion, numerical_coercion, order_coercion, string_coercion};
#[derive(Debug)]
pub struct BinaryExpr {
left: Arc<dyn PhysicalExpr>,
op: Operator,
right: Arc<dyn PhysicalExpr>,
}
impl BinaryExpr {
pub fn new(
left: Arc<dyn PhysicalExpr>,
op: Operator,
right: Arc<dyn PhysicalExpr>,
) -> Self {
Self { left, op, right }
}
pub fn left(&self) -> &Arc<dyn PhysicalExpr> {
&self.left
}
pub fn right(&self) -> &Arc<dyn PhysicalExpr> {
&self.right
}
pub fn op(&self) -> &Operator {
&self.op
}
}
impl std::fmt::Display for BinaryExpr {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{} {} {}", self.left, self.op, self.right)
}
}
macro_rules! compute_utf8_op {
($LEFT:expr, $RIGHT:expr, $OP:ident, $DT:ident) => {{
let ll = $LEFT
.as_any()
.downcast_ref::<$DT>()
.expect("compute_op failed to downcast array");
let rr = $RIGHT
.as_any()
.downcast_ref::<$DT>()
.expect("compute_op failed to downcast array");
Ok(Arc::new(paste::expr! {[<$OP _utf8>]}(&ll, &rr)?))
}};
}
macro_rules! compute_utf8_op_scalar {
($LEFT:expr, $RIGHT:expr, $OP:ident, $DT:ident) => {{
let ll = $LEFT
.as_any()
.downcast_ref::<$DT>()
.expect("compute_op failed to downcast array");
if let ScalarValue::Utf8(Some(string_value)) = $RIGHT {
Ok(Arc::new(paste::expr! {[<$OP _utf8_scalar>]}(
&ll,
&string_value,
)?))
} else {
Err(DataFusionError::Internal(format!(
"compute_utf8_op_scalar failed to cast literal value {}",
$RIGHT
)))
}
}};
}
macro_rules! compute_op_scalar {
($LEFT:expr, $RIGHT:expr, $OP:ident, $DT:ident) => {{
use std::convert::TryInto;
let ll = $LEFT
.as_any()
.downcast_ref::<$DT>()
.expect("compute_op failed to downcast array");
Ok(Arc::new(paste::expr! {[<$OP _scalar>]}(
&ll,
$RIGHT.try_into()?,
)?))
}};
}
macro_rules! compute_op {
($LEFT:expr, $RIGHT:expr, $OP:ident, $DT:ident) => {{
let ll = $LEFT
.as_any()
.downcast_ref::<$DT>()
.expect("compute_op failed to downcast array");
let rr = $RIGHT
.as_any()
.downcast_ref::<$DT>()
.expect("compute_op failed to downcast array");
Ok(Arc::new($OP(&ll, &rr)?))
}};
($OPERAND:expr, $OP:ident, $DT:ident) => {{
let operand = $OPERAND
.as_any()
.downcast_ref::<$DT>()
.expect("compute_op failed to downcast array");
Ok(Arc::new($OP(&operand)?))
}};
}
macro_rules! binary_string_array_op_scalar {
($LEFT:expr, $RIGHT:expr, $OP:ident) => {{
let result: Result<Arc<dyn Array>> = match $LEFT.data_type() {
DataType::Utf8 => compute_utf8_op_scalar!($LEFT, $RIGHT, $OP, StringArray),
other => Err(DataFusionError::Internal(format!(
"Data type {:?} not supported for scalar operation on string array",
other
))),
};
Some(result)
}};
}
macro_rules! binary_string_array_op {
($LEFT:expr, $RIGHT:expr, $OP:ident) => {{
match $LEFT.data_type() {
DataType::Utf8 => compute_utf8_op!($LEFT, $RIGHT, $OP, StringArray),
other => Err(DataFusionError::Internal(format!(
"Data type {:?} not supported for binary operation on string arrays",
other
))),
}
}};
}
macro_rules! binary_primitive_array_op {
($LEFT:expr, $RIGHT:expr, $OP:ident) => {{
match $LEFT.data_type() {
DataType::Int8 => compute_op!($LEFT, $RIGHT, $OP, Int8Array),
DataType::Int16 => compute_op!($LEFT, $RIGHT, $OP, Int16Array),
DataType::Int32 => compute_op!($LEFT, $RIGHT, $OP, Int32Array),
DataType::Int64 => compute_op!($LEFT, $RIGHT, $OP, Int64Array),
DataType::UInt8 => compute_op!($LEFT, $RIGHT, $OP, UInt8Array),
DataType::UInt16 => compute_op!($LEFT, $RIGHT, $OP, UInt16Array),
DataType::UInt32 => compute_op!($LEFT, $RIGHT, $OP, UInt32Array),
DataType::UInt64 => compute_op!($LEFT, $RIGHT, $OP, UInt64Array),
DataType::Float32 => compute_op!($LEFT, $RIGHT, $OP, Float32Array),
DataType::Float64 => compute_op!($LEFT, $RIGHT, $OP, Float64Array),
other => Err(DataFusionError::Internal(format!(
"Data type {:?} not supported for binary operation on primitive arrays",
other
))),
}
}};
}
macro_rules! binary_primitive_array_op_scalar {
($LEFT:expr, $RIGHT:expr, $OP:ident) => {{
let result: Result<Arc<dyn Array>> = match $LEFT.data_type() {
DataType::Int8 => compute_op_scalar!($LEFT, $RIGHT, $OP, Int8Array),
DataType::Int16 => compute_op_scalar!($LEFT, $RIGHT, $OP, Int16Array),
DataType::Int32 => compute_op_scalar!($LEFT, $RIGHT, $OP, Int32Array),
DataType::Int64 => compute_op_scalar!($LEFT, $RIGHT, $OP, Int64Array),
DataType::UInt8 => compute_op_scalar!($LEFT, $RIGHT, $OP, UInt8Array),
DataType::UInt16 => compute_op_scalar!($LEFT, $RIGHT, $OP, UInt16Array),
DataType::UInt32 => compute_op_scalar!($LEFT, $RIGHT, $OP, UInt32Array),
DataType::UInt64 => compute_op_scalar!($LEFT, $RIGHT, $OP, UInt64Array),
DataType::Float32 => compute_op_scalar!($LEFT, $RIGHT, $OP, Float32Array),
DataType::Float64 => compute_op_scalar!($LEFT, $RIGHT, $OP, Float64Array),
other => Err(DataFusionError::Internal(format!(
"Data type {:?} not supported for scalar operation on primitive array",
other
))),
};
Some(result)
}};
}
#[macro_export]
macro_rules! binary_array_op_scalar {
($LEFT:expr, $RIGHT:expr, $OP:ident) => {{
let result: Result<Arc<dyn Array>> = match $LEFT.data_type() {
DataType::Int8 => compute_op_scalar!($LEFT, $RIGHT, $OP, Int8Array),
DataType::Int16 => compute_op_scalar!($LEFT, $RIGHT, $OP, Int16Array),
DataType::Int32 => compute_op_scalar!($LEFT, $RIGHT, $OP, Int32Array),
DataType::Int64 => compute_op_scalar!($LEFT, $RIGHT, $OP, Int64Array),
DataType::UInt8 => compute_op_scalar!($LEFT, $RIGHT, $OP, UInt8Array),
DataType::UInt16 => compute_op_scalar!($LEFT, $RIGHT, $OP, UInt16Array),
DataType::UInt32 => compute_op_scalar!($LEFT, $RIGHT, $OP, UInt32Array),
DataType::UInt64 => compute_op_scalar!($LEFT, $RIGHT, $OP, UInt64Array),
DataType::Float32 => compute_op_scalar!($LEFT, $RIGHT, $OP, Float32Array),
DataType::Float64 => compute_op_scalar!($LEFT, $RIGHT, $OP, Float64Array),
DataType::Utf8 => compute_utf8_op_scalar!($LEFT, $RIGHT, $OP, StringArray),
DataType::Timestamp(TimeUnit::Nanosecond, None) => {
compute_op_scalar!($LEFT, $RIGHT, $OP, TimestampNanosecondArray)
}
DataType::Date32 => {
compute_op_scalar!($LEFT, $RIGHT, $OP, Date32Array)
}
other => Err(DataFusionError::Internal(format!(
"Data type {:?} not supported for scalar operation on dyn array",
other
))),
};
Some(result)
}};
}
#[macro_export]
macro_rules! binary_array_op {
($LEFT:expr, $RIGHT:expr, $OP:ident) => {{
match $LEFT.data_type() {
DataType::Int8 => compute_op!($LEFT, $RIGHT, $OP, Int8Array),
DataType::Int16 => compute_op!($LEFT, $RIGHT, $OP, Int16Array),
DataType::Int32 => compute_op!($LEFT, $RIGHT, $OP, Int32Array),
DataType::Int64 => compute_op!($LEFT, $RIGHT, $OP, Int64Array),
DataType::UInt8 => compute_op!($LEFT, $RIGHT, $OP, UInt8Array),
DataType::UInt16 => compute_op!($LEFT, $RIGHT, $OP, UInt16Array),
DataType::UInt32 => compute_op!($LEFT, $RIGHT, $OP, UInt32Array),
DataType::UInt64 => compute_op!($LEFT, $RIGHT, $OP, UInt64Array),
DataType::Float32 => compute_op!($LEFT, $RIGHT, $OP, Float32Array),
DataType::Float64 => compute_op!($LEFT, $RIGHT, $OP, Float64Array),
DataType::Utf8 => compute_utf8_op!($LEFT, $RIGHT, $OP, StringArray),
DataType::Timestamp(TimeUnit::Nanosecond, None) => {
compute_op!($LEFT, $RIGHT, $OP, TimestampNanosecondArray)
}
DataType::Date32 => {
compute_op!($LEFT, $RIGHT, $OP, Date32Array)
}
DataType::Date64 => {
compute_op!($LEFT, $RIGHT, $OP, Date64Array)
}
other => Err(DataFusionError::Internal(format!(
"Data type {:?} not supported for binary operation on dyn arrays",
other
))),
}
}};
}
macro_rules! boolean_op {
($LEFT:expr, $RIGHT:expr, $OP:ident) => {{
let ll = $LEFT
.as_any()
.downcast_ref::<BooleanArray>()
.expect("boolean_op failed to downcast array");
let rr = $RIGHT
.as_any()
.downcast_ref::<BooleanArray>()
.expect("boolean_op failed to downcast array");
Ok(Arc::new($OP(&ll, &rr)?))
}};
}
fn common_binary_type(
lhs_type: &DataType,
op: &Operator,
rhs_type: &DataType,
) -> Result<DataType> {
let result = match op {
Operator::And | Operator::Or => match (lhs_type, rhs_type) {
(DataType::Boolean, DataType::Boolean) => Some(DataType::Boolean),
_ => None,
},
Operator::Eq | Operator::NotEq => eq_coercion(lhs_type, rhs_type),
Operator::Like | Operator::NotLike => string_coercion(lhs_type, rhs_type),
Operator::Lt | Operator::Gt | Operator::GtEq | Operator::LtEq => {
order_coercion(lhs_type, rhs_type)
}
Operator::Plus | Operator::Minus | Operator::Divide | Operator::Multiply => {
numerical_coercion(lhs_type, rhs_type)
}
Operator::Modulus => {
return Err(DataFusionError::NotImplemented(
"Modulus operator is still not supported".to_string(),
))
}
};
match result {
None => Err(DataFusionError::Plan(
format!(
"'{:?} {} {:?}' can't be evaluated because there isn't a common type to coerce the types to",
lhs_type, op, rhs_type
),
)),
Some(t) => Ok(t)
}
}
pub fn binary_operator_data_type(
lhs_type: &DataType,
op: &Operator,
rhs_type: &DataType,
) -> Result<DataType> {
let common_type = common_binary_type(lhs_type, op, rhs_type)?;
match op {
Operator::Eq
| Operator::NotEq
| Operator::And
| Operator::Or
| Operator::Like
| Operator::NotLike
| Operator::Lt
| Operator::Gt
| Operator::GtEq
| Operator::LtEq => Ok(DataType::Boolean),
Operator::Plus | Operator::Minus | Operator::Divide | Operator::Multiply => {
Ok(common_type)
}
Operator::Modulus => Err(DataFusionError::NotImplemented(
"Modulus operator is still not supported".to_string(),
)),
}
}
impl PhysicalExpr for BinaryExpr {
fn as_any(&self) -> &dyn Any {
self
}
fn data_type(&self, input_schema: &Schema) -> Result<DataType> {
binary_operator_data_type(
&self.left.data_type(input_schema)?,
&self.op,
&self.right.data_type(input_schema)?,
)
}
fn nullable(&self, input_schema: &Schema) -> Result<bool> {
Ok(self.left.nullable(input_schema)? || self.right.nullable(input_schema)?)
}
fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
let left_value = self.left.evaluate(batch)?;
let right_value = self.right.evaluate(batch)?;
let left_data_type = left_value.data_type();
let right_data_type = right_value.data_type();
if left_data_type != right_data_type {
return Err(DataFusionError::Internal(format!(
"Cannot evaluate binary expression {:?} with types {:?} and {:?}",
self.op, left_data_type, right_data_type
)));
}
let scalar_result = match (&left_value, &right_value) {
(ColumnarValue::Array(array), ColumnarValue::Scalar(scalar)) => {
match &self.op {
Operator::Lt => binary_array_op_scalar!(array, scalar.clone(), lt),
Operator::LtEq => {
binary_array_op_scalar!(array, scalar.clone(), lt_eq)
}
Operator::Gt => binary_array_op_scalar!(array, scalar.clone(), gt),
Operator::GtEq => {
binary_array_op_scalar!(array, scalar.clone(), gt_eq)
}
Operator::Eq => binary_array_op_scalar!(array, scalar.clone(), eq),
Operator::NotEq => {
binary_array_op_scalar!(array, scalar.clone(), neq)
}
Operator::Like => {
binary_string_array_op_scalar!(array, scalar.clone(), like)
}
Operator::NotLike => {
binary_string_array_op_scalar!(array, scalar.clone(), nlike)
}
Operator::Divide => {
binary_primitive_array_op_scalar!(array, scalar.clone(), divide)
}
_ => None,
}
}
(ColumnarValue::Scalar(scalar), ColumnarValue::Array(array)) => {
match &self.op {
Operator::Lt => binary_array_op_scalar!(array, scalar.clone(), gt),
Operator::LtEq => {
binary_array_op_scalar!(array, scalar.clone(), gt_eq)
}
Operator::Gt => binary_array_op_scalar!(array, scalar.clone(), lt),
Operator::GtEq => {
binary_array_op_scalar!(array, scalar.clone(), lt_eq)
}
Operator::Eq => binary_array_op_scalar!(array, scalar.clone(), eq),
Operator::NotEq => {
binary_array_op_scalar!(array, scalar.clone(), neq)
}
_ => None,
}
}
(_, _) => None,
};
if let Some(result) = scalar_result {
return result.map(|a| ColumnarValue::Array(a));
}
let (left, right) = (
left_value.into_array(batch.num_rows()),
right_value.into_array(batch.num_rows()),
);
let result: Result<ArrayRef> = match &self.op {
Operator::Like => binary_string_array_op!(left, right, like),
Operator::NotLike => binary_string_array_op!(left, right, nlike),
Operator::Lt => binary_array_op!(left, right, lt),
Operator::LtEq => binary_array_op!(left, right, lt_eq),
Operator::Gt => binary_array_op!(left, right, gt),
Operator::GtEq => binary_array_op!(left, right, gt_eq),
Operator::Eq => binary_array_op!(left, right, eq),
Operator::NotEq => binary_array_op!(left, right, neq),
Operator::Plus => binary_primitive_array_op!(left, right, add),
Operator::Minus => binary_primitive_array_op!(left, right, subtract),
Operator::Multiply => binary_primitive_array_op!(left, right, multiply),
Operator::Divide => binary_primitive_array_op!(left, right, divide),
Operator::And => {
if left_data_type == DataType::Boolean {
boolean_op!(left, right, and_kleene)
} else {
return Err(DataFusionError::Internal(format!(
"Cannot evaluate binary expression {:?} with types {:?} and {:?}",
self.op,
left.data_type(),
right.data_type()
)));
}
}
Operator::Or => {
if left_data_type == DataType::Boolean {
boolean_op!(left, right, or_kleene)
} else {
return Err(DataFusionError::Internal(format!(
"Cannot evaluate binary expression {:?} with types {:?} and {:?}",
self.op, left_data_type, right_data_type
)));
}
}
Operator::Modulus => Err(DataFusionError::NotImplemented(
"Modulus operator is still not supported".to_string(),
)),
};
result.map(|a| ColumnarValue::Array(a))
}
}
fn binary_cast(
lhs: Arc<dyn PhysicalExpr>,
op: &Operator,
rhs: Arc<dyn PhysicalExpr>,
input_schema: &Schema,
) -> Result<(Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>)> {
let lhs_type = &lhs.data_type(input_schema)?;
let rhs_type = &rhs.data_type(input_schema)?;
let cast_type = common_binary_type(lhs_type, op, rhs_type)?;
Ok((
try_cast(lhs, input_schema, cast_type.clone())?,
try_cast(rhs, input_schema, cast_type)?,
))
}
pub fn binary(
lhs: Arc<dyn PhysicalExpr>,
op: Operator,
rhs: Arc<dyn PhysicalExpr>,
input_schema: &Schema,
) -> Result<Arc<dyn PhysicalExpr>> {
let (l, r) = binary_cast(lhs, &op, rhs, input_schema)?;
Ok(Arc::new(BinaryExpr::new(l, op, r)))
}
#[cfg(test)]
mod tests {
use arrow::datatypes::{ArrowNumericType, Field, Int32Type, SchemaRef};
use arrow::util::display::array_value_to_string;
use super::*;
use crate::error::Result;
use crate::physical_plan::expressions::col;
fn binary_simple(
l: Arc<dyn PhysicalExpr>,
op: Operator,
r: Arc<dyn PhysicalExpr>,
) -> Arc<dyn PhysicalExpr> {
Arc::new(BinaryExpr::new(l, op, r))
}
#[test]
fn binary_comparison() -> Result<()> {
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
]);
let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
let b = Int32Array::from(vec![1, 2, 4, 8, 16]);
let batch =
RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)])?;
let lt = binary_simple(col("a"), Operator::Lt, col("b"));
let result = lt.evaluate(&batch)?.into_array(batch.num_rows());
assert_eq!(result.len(), 5);
let expected = vec![false, false, true, true, true];
let result = result
.as_any()
.downcast_ref::<BooleanArray>()
.expect("failed to downcast to BooleanArray");
for (i, &expected_item) in expected.iter().enumerate().take(5) {
assert_eq!(result.value(i), expected_item);
}
Ok(())
}
#[test]
fn binary_nested() -> Result<()> {
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
]);
let a = Int32Array::from(vec![2, 4, 6, 8, 10]);
let b = Int32Array::from(vec![2, 5, 4, 8, 8]);
let batch =
RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)])?;
let expr = binary_simple(
binary_simple(col("a"), Operator::Lt, col("b")),
Operator::Or,
binary_simple(col("a"), Operator::Eq, col("b")),
);
assert_eq!("a < b OR a = b", format!("{}", expr));
let result = expr.evaluate(&batch)?.into_array(batch.num_rows());
assert_eq!(result.len(), 5);
let expected = vec![true, true, false, true, false];
let result = result
.as_any()
.downcast_ref::<BooleanArray>()
.expect("failed to downcast to BooleanArray");
for (i, &expected_item) in expected.iter().enumerate().take(5) {
assert_eq!(result.value(i), expected_item);
}
Ok(())
}
macro_rules! test_coercion {
($A_ARRAY:ident, $A_TYPE:expr, $A_VEC:expr, $B_ARRAY:ident, $B_TYPE:expr, $B_VEC:expr, $OP:expr, $C_ARRAY:ident, $C_TYPE:expr, $VEC:expr) => {{
let schema = Schema::new(vec![
Field::new("a", $A_TYPE, false),
Field::new("b", $B_TYPE, false),
]);
let a = $A_ARRAY::from($A_VEC);
let b = $B_ARRAY::from($B_VEC);
let batch = RecordBatch::try_new(
Arc::new(schema.clone()),
vec![Arc::new(a), Arc::new(b)],
)?;
let expression = binary(col("a"), $OP, col("b"), &schema)?;
assert_eq!(expression.data_type(&schema)?, $C_TYPE);
let result = expression.evaluate(&batch)?.into_array(batch.num_rows());
assert_eq!(*result.data_type(), $C_TYPE);
let result = result
.as_any()
.downcast_ref::<$C_ARRAY>()
.expect("failed to downcast");
for (i, x) in $VEC.iter().enumerate() {
assert_eq!(result.value(i), *x);
}
}};
}
#[test]
fn test_type_coersion() -> Result<()> {
test_coercion!(
Int32Array,
DataType::Int32,
vec![1i32, 2i32],
UInt32Array,
DataType::UInt32,
vec![1u32, 2u32],
Operator::Plus,
Int32Array,
DataType::Int32,
vec![2i32, 4i32]
);
test_coercion!(
Int32Array,
DataType::Int32,
vec![1i32],
UInt16Array,
DataType::UInt16,
vec![1u16],
Operator::Plus,
Int32Array,
DataType::Int32,
vec![2i32]
);
test_coercion!(
Float32Array,
DataType::Float32,
vec![1f32],
UInt16Array,
DataType::UInt16,
vec![1u16],
Operator::Plus,
Float32Array,
DataType::Float32,
vec![2f32]
);
test_coercion!(
Float32Array,
DataType::Float32,
vec![2f32],
UInt16Array,
DataType::UInt16,
vec![1u16],
Operator::Multiply,
Float32Array,
DataType::Float32,
vec![2f32]
);
test_coercion!(
StringArray,
DataType::Utf8,
vec!["hello world", "world"],
StringArray,
DataType::Utf8,
vec!["%hello%", "%hello%"],
Operator::Like,
BooleanArray,
DataType::Boolean,
vec![true, false]
);
test_coercion!(
StringArray,
DataType::Utf8,
vec!["1994-12-13", "1995-01-26"],
Date32Array,
DataType::Date32,
vec![9112, 9156],
Operator::Eq,
BooleanArray,
DataType::Boolean,
vec![true, true]
);
test_coercion!(
StringArray,
DataType::Utf8,
vec!["1994-12-13", "1995-01-26"],
Date32Array,
DataType::Date32,
vec![9113, 9154],
Operator::Lt,
BooleanArray,
DataType::Boolean,
vec![true, false]
);
test_coercion!(
StringArray,
DataType::Utf8,
vec!["1994-12-13T12:34:56", "1995-01-26T01:23:45"],
Date64Array,
DataType::Date64,
vec![787322096000, 791083425000],
Operator::Eq,
BooleanArray,
DataType::Boolean,
vec![true, true]
);
test_coercion!(
StringArray,
DataType::Utf8,
vec!["1994-12-13T12:34:56", "1995-01-26T01:23:45"],
Date64Array,
DataType::Date64,
vec![787322096001, 791083424999],
Operator::Lt,
BooleanArray,
DataType::Boolean,
vec![true, false]
);
Ok(())
}
#[test]
fn test_dictionary_type_to_array_coersion() -> Result<()> {
let dict_type =
DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8));
let string_type = DataType::Utf8;
let keys_builder = PrimitiveBuilder::<Int32Type>::new(10);
let values_builder = arrow::array::StringBuilder::new(10);
let mut dict_builder = StringDictionaryBuilder::new(keys_builder, values_builder);
dict_builder.append("one")?;
dict_builder.append_null()?;
dict_builder.append("three")?;
dict_builder.append("four")?;
let dict_array = dict_builder.finish();
let str_array =
StringArray::from(vec![Some("not one"), Some("two"), None, Some("four")]);
let schema = Arc::new(Schema::new(vec![
Field::new("dict", dict_type, true),
Field::new("str", string_type, true),
]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(dict_array), Arc::new(str_array)],
)?;
let expected = "false\n\n\ntrue";
let expression = binary(col("dict"), Operator::Eq, col("str"), &schema)?;
assert_eq!(expression.data_type(&schema)?, DataType::Boolean);
let result = expression.evaluate(&batch)?.into_array(batch.num_rows());
assert_eq!(result.data_type(), &DataType::Boolean);
assert_eq!(expected, array_to_string(&result)?);
let expression = binary(col("str"), Operator::Eq, col("dict"), &schema)?;
assert_eq!(expression.data_type(&schema)?, DataType::Boolean);
let result = expression.evaluate(&batch)?.into_array(batch.num_rows());
assert_eq!(result.data_type(), &DataType::Boolean);
assert_eq!(expected, array_to_string(&result)?);
Ok(())
}
fn array_to_string(array: &ArrayRef) -> Result<String> {
let s = (0..array.len())
.map(|i| array_value_to_string(array, i))
.collect::<std::result::Result<Vec<_>, arrow::error::ArrowError>>()?
.join("\n");
Ok(s)
}
#[test]
fn plus_op() -> Result<()> {
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
]);
let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
let b = Int32Array::from(vec![1, 2, 4, 8, 16]);
apply_arithmetic::<Int32Type>(
Arc::new(schema),
vec![Arc::new(a), Arc::new(b)],
Operator::Plus,
Int32Array::from(vec![2, 4, 7, 12, 21]),
)?;
Ok(())
}
#[test]
fn minus_op() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
]));
let a = Arc::new(Int32Array::from(vec![1, 2, 4, 8, 16]));
let b = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]));
apply_arithmetic::<Int32Type>(
schema.clone(),
vec![a.clone(), b.clone()],
Operator::Minus,
Int32Array::from(vec![0, 0, 1, 4, 11]),
)?;
apply_arithmetic::<Int32Type>(
schema,
vec![b, a],
Operator::Minus,
Int32Array::from(vec![0, 0, -1, -4, -11]),
)?;
Ok(())
}
#[test]
fn multiply_op() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
]));
let a = Arc::new(Int32Array::from(vec![4, 8, 16, 32, 64]));
let b = Arc::new(Int32Array::from(vec![2, 4, 8, 16, 32]));
apply_arithmetic::<Int32Type>(
schema,
vec![a, b],
Operator::Multiply,
Int32Array::from(vec![8, 32, 128, 512, 2048]),
)?;
Ok(())
}
#[test]
fn divide_op() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
]));
let a = Arc::new(Int32Array::from(vec![8, 32, 128, 512, 2048]));
let b = Arc::new(Int32Array::from(vec![2, 4, 8, 16, 32]));
apply_arithmetic::<Int32Type>(
schema,
vec![a, b],
Operator::Divide,
Int32Array::from(vec![4, 8, 16, 32, 64]),
)?;
Ok(())
}
fn apply_arithmetic<T: ArrowNumericType>(
schema: SchemaRef,
data: Vec<ArrayRef>,
op: Operator,
expected: PrimitiveArray<T>,
) -> Result<()> {
let arithmetic_op = binary_simple(col("a"), op, col("b"));
let batch = RecordBatch::try_new(schema, data)?;
let result = arithmetic_op.evaluate(&batch)?.into_array(batch.num_rows());
assert_eq!(result.as_ref(), &expected);
Ok(())
}
fn apply_logic_op(
schema: SchemaRef,
left: BooleanArray,
right: BooleanArray,
op: Operator,
expected: BooleanArray,
) -> Result<()> {
let arithmetic_op = binary_simple(col("a"), op, col("b"));
let data: Vec<ArrayRef> = vec![Arc::new(left), Arc::new(right)];
let batch = RecordBatch::try_new(schema, data)?;
let result = arithmetic_op.evaluate(&batch)?.into_array(batch.num_rows());
assert_eq!(result.as_ref(), &expected);
Ok(())
}
#[test]
fn and_with_nulls_op() -> Result<()> {
let schema = Schema::new(vec![
Field::new("a", DataType::Boolean, true),
Field::new("b", DataType::Boolean, true),
]);
let a = BooleanArray::from(vec![
Some(true),
Some(false),
None,
Some(true),
Some(false),
None,
Some(true),
Some(false),
None,
]);
let b = BooleanArray::from(vec![
Some(true),
Some(true),
Some(true),
Some(false),
Some(false),
Some(false),
None,
None,
None,
]);
let expected = BooleanArray::from(vec![
Some(true),
Some(false),
None,
Some(false),
Some(false),
Some(false),
None,
Some(false),
None,
]);
apply_logic_op(Arc::new(schema), a, b, Operator::And, expected)?;
Ok(())
}
#[test]
fn or_with_nulls_op() -> Result<()> {
let schema = Schema::new(vec![
Field::new("a", DataType::Boolean, true),
Field::new("b", DataType::Boolean, true),
]);
let a = BooleanArray::from(vec![
Some(true),
Some(false),
None,
Some(true),
Some(false),
None,
Some(true),
Some(false),
None,
]);
let b = BooleanArray::from(vec![
Some(true),
Some(true),
Some(true),
Some(false),
Some(false),
Some(false),
None,
None,
None,
]);
let expected = BooleanArray::from(vec![
Some(true),
Some(true),
Some(true),
Some(true),
Some(false),
None,
Some(true),
None,
None,
]);
apply_logic_op(Arc::new(schema), a, b, Operator::Or, expected)?;
Ok(())
}
#[test]
fn test_coersion_error() -> Result<()> {
let expr =
common_binary_type(&DataType::Float32, &Operator::Plus, &DataType::Utf8);
if let Err(DataFusionError::Plan(e)) = expr {
assert_eq!(e, "'Float32 + Utf8' can't be evaluated because there isn't a common type to coerce the types to");
Ok(())
} else {
Err(DataFusionError::Internal(
"Coercion should have returned an DataFusionError::Internal".to_string(),
))
}
}
}