use arrow::array::{
Array, ArrayAccessor, ArrayIter, ArrayRef, ByteView, GenericStringArray, Int64Array,
OffsetSizeTrait, StringViewArray, make_view,
};
use arrow::datatypes::DataType;
use arrow_buffer::{NullBuffer, ScalarBuffer};
use datafusion_common::cast::{
as_generic_string_array, as_int64_array, as_string_view_array,
};
use datafusion_common::exec_err;
use std::cmp::Ordering;
use std::ops::Range;
use std::sync::Arc;
pub(crate) trait LeftRightSlicer {
fn slice(string: &str, n: i64) -> Range<usize>;
}
pub(crate) struct LeftSlicer {}
impl LeftRightSlicer for LeftSlicer {
fn slice(string: &str, n: i64) -> Range<usize> {
0..left_right_byte_length(string, n)
}
}
pub(crate) struct RightSlicer {}
impl LeftRightSlicer for RightSlicer {
fn slice(string: &str, n: i64) -> Range<usize> {
if n == 0 {
0..0
} else if n == i64::MIN {
0..0
} else {
left_right_byte_length(string, -n)..string.len()
}
}
}
#[inline]
fn left_right_byte_length(string: &str, n: i64) -> usize {
match n.cmp(&0) {
Ordering::Less => string
.char_indices()
.nth_back((n.unsigned_abs().min(usize::MAX as u64) - 1) as usize)
.map(|(index, _)| index)
.unwrap_or(0),
Ordering::Equal => 0,
Ordering::Greater => string
.char_indices()
.nth(n.unsigned_abs().min(usize::MAX as u64) as usize)
.map(|(index, _)| index)
.unwrap_or(string.len()),
}
}
pub(crate) fn general_left_right<F: LeftRightSlicer>(
args: &[ArrayRef],
) -> datafusion_common::Result<ArrayRef> {
let n_array = as_int64_array(&args[1])?;
match args[0].data_type() {
DataType::Utf8 => {
let string_array = as_generic_string_array::<i32>(&args[0])?;
general_left_right_array::<i32, _, F>(string_array, n_array)
}
DataType::LargeUtf8 => {
let string_array = as_generic_string_array::<i64>(&args[0])?;
general_left_right_array::<i64, _, F>(string_array, n_array)
}
DataType::Utf8View => {
let string_view_array = as_string_view_array(&args[0])?;
general_left_right_view::<F>(string_view_array, n_array)
}
_ => exec_err!("Not supported"),
}
}
fn general_left_right_array<
'a,
T: OffsetSizeTrait,
V: ArrayAccessor<Item = &'a str>,
F: LeftRightSlicer,
>(
string_array: V,
n_array: &Int64Array,
) -> datafusion_common::Result<ArrayRef> {
let iter = ArrayIter::new(string_array);
let result = iter
.zip(n_array.iter())
.map(|(string, n)| match (string, n) {
(Some(string), Some(n)) => {
let range = F::slice(string, n);
Some(&string[range])
}
_ => None,
})
.collect::<GenericStringArray<T>>();
Ok(Arc::new(result) as ArrayRef)
}
fn general_left_right_view<F: LeftRightSlicer>(
string_view_array: &StringViewArray,
n_array: &Int64Array,
) -> datafusion_common::Result<ArrayRef> {
let len = n_array.len();
let views = string_view_array.views();
debug_assert!(views.len() == string_view_array.len());
let string_nulls = string_view_array.nulls();
let n_nulls = n_array.nulls();
let new_nulls = NullBuffer::union(string_nulls, n_nulls);
let new_views = (0..len)
.map(|idx| {
let view = views[idx];
let is_valid = match &new_nulls {
Some(nulls_buf) => nulls_buf.is_valid(idx),
None => true,
};
if is_valid {
let string: &str = string_view_array.value(idx);
let n = n_array.value(idx);
let range = F::slice(string, n);
let result_bytes = &string.as_bytes()[range.clone()];
let byte_view = ByteView::from(view);
let new_offset = byte_view.offset + (range.start as u32);
make_view(result_bytes, byte_view.buffer_index, new_offset)
} else {
view
}
})
.collect::<Vec<u128>>();
let result = StringViewArray::try_new(
ScalarBuffer::from(new_views),
Vec::from(string_view_array.data_buffers()),
new_nulls,
)?;
Ok(Arc::new(result) as ArrayRef)
}