use arrow::array::{
Array, ArrayRef, ArrowNumericType, AsArray, BinaryArray, BinaryViewArray,
BooleanArray, LargeBinaryArray, LargeStringArray, PrimitiveArray, StringArray,
StringViewArray, StructArray,
};
use arrow::buffer::NullBuffer;
use arrow::datatypes::DataType;
use datafusion_common::{Result, not_impl_err};
use std::sync::Arc;
pub fn set_nulls<T: ArrowNumericType + Send>(
array: PrimitiveArray<T>,
nulls: Option<NullBuffer>,
) -> PrimitiveArray<T> {
let (dt, values, _old_nulls) = array.into_parts();
PrimitiveArray::<T>::new(values, nulls).with_data_type(dt)
}
fn filter_to_nulls(filter: &BooleanArray) -> Option<NullBuffer> {
let (filter_bools, filter_nulls) = filter.clone().into_parts();
let filter_bools = NullBuffer::from(filter_bools);
NullBuffer::union(Some(&filter_bools), filter_nulls.as_ref())
}
pub fn filtered_null_mask(
opt_filter: Option<&BooleanArray>,
input: &dyn Array,
) -> Option<NullBuffer> {
let opt_filter = opt_filter.and_then(filter_to_nulls);
NullBuffer::union(opt_filter.as_ref(), input.nulls())
}
pub fn apply_filter_as_nulls(
input: &dyn Array,
opt_filter: Option<&BooleanArray>,
) -> Result<ArrayRef> {
let nulls = filtered_null_mask(opt_filter, input);
set_nulls_dyn(input, nulls)
}
pub fn set_nulls_dyn(input: &dyn Array, nulls: Option<NullBuffer>) -> Result<ArrayRef> {
if let Some(nulls) = nulls.as_ref() {
assert_eq!(nulls.len(), input.len());
}
let output: ArrayRef = match input.data_type() {
DataType::Utf8 => {
let input = input.as_string::<i32>();
unsafe {
Arc::new(StringArray::new_unchecked(
input.offsets().clone(),
input.values().clone(),
nulls,
))
}
}
DataType::LargeUtf8 => {
let input = input.as_string::<i64>();
unsafe {
Arc::new(LargeStringArray::new_unchecked(
input.offsets().clone(),
input.values().clone(),
nulls,
))
}
}
DataType::Utf8View => {
let input = input.as_string_view();
unsafe {
Arc::new(StringViewArray::new_unchecked(
input.views().clone(),
input.data_buffers().to_vec(),
nulls,
))
}
}
DataType::Binary => {
let input = input.as_binary::<i32>();
unsafe {
Arc::new(BinaryArray::new_unchecked(
input.offsets().clone(),
input.values().clone(),
nulls,
))
}
}
DataType::LargeBinary => {
let input = input.as_binary::<i64>();
unsafe {
Arc::new(LargeBinaryArray::new_unchecked(
input.offsets().clone(),
input.values().clone(),
nulls,
))
}
}
DataType::BinaryView => {
let input = input.as_binary_view();
unsafe {
Arc::new(BinaryViewArray::new_unchecked(
input.views().clone(),
input.data_buffers().to_vec(),
nulls,
))
}
}
DataType::Struct(_) => {
let input = input.as_struct();
unsafe {
Arc::new(StructArray::new_unchecked(
input.fields().clone(),
input.columns().to_vec(),
nulls,
))
}
}
_ => {
return not_impl_err!("Applying nulls {:?}", input.data_type());
}
};
assert_eq!(input.len(), output.len());
assert_eq!(input.data_type(), output.data_type());
Ok(output)
}