use arrow::array::*;
use arrow::compute::kernels::bitwise::{
bitwise_and, bitwise_and_scalar, bitwise_or, bitwise_or_scalar, bitwise_shift_left,
bitwise_shift_left_scalar, bitwise_shift_right, bitwise_shift_right_scalar,
bitwise_xor, bitwise_xor_scalar,
};
use arrow::compute::kernels::boolean::not;
use arrow::compute::kernels::comparison::{regexp_is_match, regexp_is_match_scalar};
use arrow::datatypes::DataType;
use arrow::error::ArrowError;
use datafusion_common::{Result, ScalarValue};
use datafusion_common::{internal_err, plan_err};
use std::sync::Arc;
macro_rules! call_kernel {
($LEFT:expr, $RIGHT:expr, $KERNEL:expr, $ARRAY_TYPE:ident) => {{
let left = $LEFT.as_any().downcast_ref::<$ARRAY_TYPE>().unwrap();
let right = $RIGHT.as_any().downcast_ref::<$ARRAY_TYPE>().unwrap();
let result: $ARRAY_TYPE = $KERNEL(left, right)?;
Ok(Arc::new(result))
}};
}
macro_rules! create_left_integral_dyn_kernel {
($FUNC:ident, $KERNEL:ident) => {
pub(crate) fn $FUNC(left: ArrayRef, right: ArrayRef) -> Result<ArrayRef> {
match &left.data_type() {
DataType::Int8 => {
call_kernel!(left, right, $KERNEL, Int8Array)
}
DataType::Int16 => {
call_kernel!(left, right, $KERNEL, Int16Array)
}
DataType::Int32 => {
call_kernel!(left, right, $KERNEL, Int32Array)
}
DataType::Int64 => {
call_kernel!(left, right, $KERNEL, Int64Array)
}
DataType::UInt8 => {
call_kernel!(left, right, $KERNEL, UInt8Array)
}
DataType::UInt16 => {
call_kernel!(left, right, $KERNEL, UInt16Array)
}
DataType::UInt32 => {
call_kernel!(left, right, $KERNEL, UInt32Array)
}
DataType::UInt64 => {
call_kernel!(left, right, $KERNEL, UInt64Array)
}
other => plan_err!(
"Data type {} not supported for binary operation '{}' on dyn arrays",
other,
stringify!($KERNEL)
),
}
}
};
}
create_left_integral_dyn_kernel!(bitwise_or_dyn, bitwise_or);
create_left_integral_dyn_kernel!(bitwise_xor_dyn, bitwise_xor);
create_left_integral_dyn_kernel!(bitwise_and_dyn, bitwise_and);
create_left_integral_dyn_kernel!(bitwise_shift_right_dyn, bitwise_shift_right);
create_left_integral_dyn_kernel!(bitwise_shift_left_dyn, bitwise_shift_left);
macro_rules! call_scalar_kernel {
($LEFT:expr, $RIGHT:expr, $KERNEL:ident, $ARRAY_TYPE:ident, $TYPE:ty) => {{
let len = $LEFT.len();
let array = $LEFT.as_any().downcast_ref::<$ARRAY_TYPE>().unwrap();
let scalar = $RIGHT;
if scalar.is_null() {
Ok(new_null_array(array.data_type(), len))
} else {
let scalar: $TYPE = scalar.try_into().unwrap();
let result: $ARRAY_TYPE = $KERNEL(array, scalar).unwrap();
Ok(Arc::new(result) as ArrayRef)
}
}};
}
macro_rules! create_left_integral_dyn_scalar_kernel {
($FUNC:ident, $KERNEL:ident) => {
pub(crate) fn $FUNC(
array: &dyn Array,
scalar: ScalarValue,
) -> Option<Result<ArrayRef>> {
let result = match array.data_type() {
DataType::Int8 => {
call_scalar_kernel!(array, scalar, $KERNEL, Int8Array, i8)
}
DataType::Int16 => {
call_scalar_kernel!(array, scalar, $KERNEL, Int16Array, i16)
}
DataType::Int32 => {
call_scalar_kernel!(array, scalar, $KERNEL, Int32Array, i32)
}
DataType::Int64 => {
call_scalar_kernel!(array, scalar, $KERNEL, Int64Array, i64)
}
DataType::UInt8 => {
call_scalar_kernel!(array, scalar, $KERNEL, UInt8Array, u8)
}
DataType::UInt16 => {
call_scalar_kernel!(array, scalar, $KERNEL, UInt16Array, u16)
}
DataType::UInt32 => {
call_scalar_kernel!(array, scalar, $KERNEL, UInt32Array, u32)
}
DataType::UInt64 => {
call_scalar_kernel!(array, scalar, $KERNEL, UInt64Array, u64)
}
other => plan_err!(
"Data type {} not supported for binary operation '{}' on dyn arrays",
other,
stringify!($KERNEL)
),
};
Some(result)
}
};
}
create_left_integral_dyn_scalar_kernel!(bitwise_and_dyn_scalar, bitwise_and_scalar);
create_left_integral_dyn_scalar_kernel!(bitwise_or_dyn_scalar, bitwise_or_scalar);
create_left_integral_dyn_scalar_kernel!(bitwise_xor_dyn_scalar, bitwise_xor_scalar);
create_left_integral_dyn_scalar_kernel!(
bitwise_shift_right_dyn_scalar,
bitwise_shift_right_scalar
);
create_left_integral_dyn_scalar_kernel!(
bitwise_shift_left_dyn_scalar,
bitwise_shift_left_scalar
);
pub fn concat_elements_utf8view(
left: &StringViewArray,
right: &StringViewArray,
) -> std::result::Result<StringViewArray, ArrowError> {
if left.len() != right.len() {
return Err(ArrowError::ComputeError(format!(
"Arrays must have the same length: {} != {}",
left.len(),
right.len()
)));
}
let capacity = left.len();
let mut result = StringViewBuilder::with_capacity(capacity);
let mut buffer = String::new();
for (left, right) in left.iter().zip(right.iter()) {
if let (Some(left), Some(right)) = (left, right) {
use std::fmt::Write;
buffer.clear();
write!(&mut buffer, "{left}{right}")
.expect("writing into string buffer failed");
result.try_append_value(&buffer)?;
} else {
result.append_null()
}
}
Ok(result.finish())
}
macro_rules! regexp_is_match_flag {
($LEFT:expr, $RIGHT:expr, $ARRAYTYPE:ident, $NOT:expr, $FLAG:expr) => {{
let ll = $LEFT
.as_any()
.downcast_ref::<$ARRAYTYPE>()
.expect("failed to downcast array");
let rr = $RIGHT
.as_any()
.downcast_ref::<$ARRAYTYPE>()
.expect("failed to downcast array");
let flag = if $FLAG {
Some($ARRAYTYPE::from(vec!["i"; ll.len()]))
} else {
None
};
let mut array = regexp_is_match(ll, rr, flag.as_ref())?;
if $NOT {
array = not(&array).unwrap();
}
Ok(Arc::new(array))
}};
}
pub(crate) fn regex_match_dyn(
left: &ArrayRef,
right: &ArrayRef,
not_match: bool,
flag: bool,
) -> Result<ArrayRef> {
match left.data_type() {
DataType::Utf8 => {
regexp_is_match_flag!(left, right, StringArray, not_match, flag)
}
DataType::Utf8View => {
regexp_is_match_flag!(left, right, StringViewArray, not_match, flag)
}
DataType::LargeUtf8 => {
regexp_is_match_flag!(left, right, LargeStringArray, not_match, flag)
}
other => internal_err!(
"Data type {} not supported for regex_match_dyn on string array",
other
),
}
}
macro_rules! regexp_is_match_flag_scalar {
($LEFT:expr, $RIGHT:expr, $ARRAYTYPE:ident, $NOT:expr, $FLAG:expr) => {{
let ll = $LEFT
.as_any()
.downcast_ref::<$ARRAYTYPE>()
.expect("failed to downcast array");
if let Some(Some(string_value)) = $RIGHT.try_as_str() {
let flag = $FLAG.then_some("i");
match regexp_is_match_scalar(ll, &string_value, flag) {
Ok(mut array) => {
if $NOT {
array = not(&array).unwrap();
}
Ok(Arc::new(array))
}
Err(e) => internal_err!("failed to call 'regex_match_dyn_scalar' {}", e),
}
} else {
internal_err!(
"failed to cast literal value {} for operation 'regex_match_dyn_scalar'",
$RIGHT
)
}
}};
}
pub(crate) fn regex_match_dyn_scalar(
left: &dyn Array,
right: &ScalarValue,
not_match: bool,
flag: bool,
) -> Option<Result<ArrayRef>> {
let result: Result<ArrayRef> = match left.data_type() {
DataType::Utf8 => {
regexp_is_match_flag_scalar!(left, right, StringArray, not_match, flag)
}
DataType::Utf8View => {
regexp_is_match_flag_scalar!(left, right, StringViewArray, not_match, flag)
}
DataType::LargeUtf8 => {
regexp_is_match_flag_scalar!(left, right, LargeStringArray, not_match, flag)
}
DataType::Dictionary(_, _) => {
let values = left.as_any_dictionary().values();
match values.data_type() {
DataType::Utf8 => regexp_is_match_flag_scalar!(values, right, StringArray, not_match, flag),
DataType::Utf8View => regexp_is_match_flag_scalar!(values, right, StringViewArray, not_match, flag),
DataType::LargeUtf8 => regexp_is_match_flag_scalar!(values, right, LargeStringArray, not_match, flag),
other => internal_err!(
"Data type {} not supported as a dictionary value type for operation 'regex_match_dyn_scalar' on string array",
other
),
}.map(
|evaluated_values| downcast_dictionary_array! {
left => {
let unpacked_dict = evaluated_values.take_iter(left.keys().iter().map(|opt| opt.map(|v| v as _))).collect::<BooleanArray>();
Arc::new(unpacked_dict) as ArrayRef
},
_ => unreachable!(),
}
)
}
other => internal_err!(
"Data type {} not supported for operation 'regex_match_dyn_scalar' on string array",
other
),
};
Some(result)
}