pub(crate) mod aggregate;
pub mod expr;
pub mod memory;
pub mod proxy;
pub mod string_utils;
use crate::assert_or_internal_err;
use crate::error::{_exec_datafusion_err, _exec_err, _internal_datafusion_err};
use crate::{Result, ScalarValue};
use arrow::array::{
Array, ArrayRef, FixedSizeListArray, LargeListArray, ListArray, OffsetSizeTrait,
cast::AsArray,
};
use arrow::array::{
ArrowPrimitiveType, BooleanArray, Datum, GenericListArray, Int32Array, Int64Array,
MutableArrayData, PrimitiveArray, make_array,
};
use arrow::array::{LargeListViewArray, ListViewArray};
use arrow::buffer::{OffsetBuffer, ScalarBuffer};
use arrow::compute::kernels::cmp::eq;
use arrow::compute::kernels::length::length;
use arrow::compute::{SortColumn, SortOptions, partition};
use arrow::datatypes::{
ArrowNativeType, DataType, Field, Int32Type, Int64Type, SchemaRef,
};
#[cfg(feature = "sql")]
use sqlparser::{ast::Ident, dialect::GenericDialect, parser::Parser};
use std::borrow::{Borrow, Cow};
use std::cmp::{Ordering, min};
use std::collections::HashSet;
use std::iter::repeat_n;
use std::num::NonZero;
use std::ops::Range;
use std::sync::{Arc, LazyLock};
use std::thread::available_parallelism;
pub fn project_schema(
schema: &SchemaRef,
projection: Option<&impl AsRef<[usize]>>,
) -> Result<SchemaRef> {
let schema = match projection {
Some(columns) => Arc::new(schema.project(columns.as_ref())?),
None => Arc::clone(schema),
};
Ok(schema)
}
pub fn extract_row_at_idx_to_buf(
columns: &[ArrayRef],
idx: usize,
buf: &mut Vec<ScalarValue>,
) -> Result<()> {
buf.clear();
let iter = columns
.iter()
.map(|arr| ScalarValue::try_from_array(arr, idx));
for v in iter.into_iter() {
buf.push(v?);
}
Ok(())
}
pub fn get_row_at_idx(columns: &[ArrayRef], idx: usize) -> Result<Vec<ScalarValue>> {
columns
.iter()
.map(|arr| ScalarValue::try_from_array(arr, idx))
.collect()
}
pub fn compare_rows(
x: &[ScalarValue],
y: &[ScalarValue],
sort_options: &[SortOptions],
) -> Result<Ordering> {
let zip_it = x.iter().zip(y.iter()).zip(sort_options.iter());
for ((lhs, rhs), sort_options) in zip_it {
let result = match (lhs.is_null(), rhs.is_null(), sort_options.nulls_first) {
(true, false, false) | (false, true, true) => Ordering::Greater,
(true, false, true) | (false, true, false) => Ordering::Less,
(false, false, _) => {
if sort_options.descending {
rhs.try_cmp(lhs)?
} else {
lhs.try_cmp(rhs)?
}
}
(true, true, _) => continue,
};
if result != Ordering::Equal {
return Ok(result);
}
}
Ok(Ordering::Equal)
}
pub fn bisect<const SIDE: bool>(
item_columns: &[ArrayRef],
target: &[ScalarValue],
sort_options: &[SortOptions],
) -> Result<usize> {
let low: usize = 0;
let high: usize = item_columns
.first()
.ok_or_else(|| _internal_datafusion_err!("Column array shouldn't be empty"))?
.len();
let compare_fn = |current: &[ScalarValue], target: &[ScalarValue]| {
let cmp = compare_rows(current, target, sort_options)?;
Ok(if SIDE { cmp.is_lt() } else { cmp.is_le() })
};
find_bisect_point(item_columns, target, compare_fn, low, high)
}
pub fn find_bisect_point<F>(
item_columns: &[ArrayRef],
target: &[ScalarValue],
compare_fn: F,
mut low: usize,
mut high: usize,
) -> Result<usize>
where
F: Fn(&[ScalarValue], &[ScalarValue]) -> Result<bool>,
{
while low < high {
let mid = ((high - low) / 2) + low;
let val = get_row_at_idx(item_columns, mid)?;
if compare_fn(&val, target)? {
low = mid + 1;
} else {
high = mid;
}
}
Ok(low)
}
pub fn linear_search<const SIDE: bool>(
item_columns: &[ArrayRef],
target: &[ScalarValue],
sort_options: &[SortOptions],
) -> Result<usize> {
let low: usize = 0;
let high: usize = item_columns
.first()
.ok_or_else(|| _internal_datafusion_err!("Column array shouldn't be empty"))?
.len();
let compare_fn = |current: &[ScalarValue], target: &[ScalarValue]| {
let cmp = compare_rows(current, target, sort_options)?;
Ok(if SIDE { cmp.is_lt() } else { cmp.is_le() })
};
search_in_slice(item_columns, target, compare_fn, low, high)
}
pub fn search_in_slice<F>(
item_columns: &[ArrayRef],
target: &[ScalarValue],
compare_fn: F,
mut low: usize,
high: usize,
) -> Result<usize>
where
F: Fn(&[ScalarValue], &[ScalarValue]) -> Result<bool>,
{
while low < high {
let val = get_row_at_idx(item_columns, low)?;
if !compare_fn(&val, target)? {
break;
}
low += 1;
}
Ok(low)
}
pub fn evaluate_partition_ranges(
num_rows: usize,
partition_columns: &[SortColumn],
) -> Result<Vec<Range<usize>>> {
Ok(if partition_columns.is_empty() {
vec![Range {
start: 0,
end: num_rows,
}]
} else {
let cols: Vec<_> = partition_columns
.iter()
.map(|x| Arc::clone(&x.values))
.collect();
partition(&cols)?.ranges()
})
}
pub fn quote_identifier(s: &str) -> Cow<'_, str> {
if needs_quotes(s) {
Cow::Owned(format!("\"{}\"", s.replace('"', "\"\"")))
} else {
Cow::Borrowed(s)
}
}
fn needs_quotes(s: &str) -> bool {
let mut chars = s.chars();
if let Some(first_char) = chars.next()
&& !(first_char.is_ascii_lowercase() || first_char == '_')
{
return true;
}
!chars.all(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || c == '_')
}
#[cfg(feature = "sql")]
pub(crate) fn parse_identifiers(s: &str) -> Result<Vec<Ident>> {
let dialect = GenericDialect;
let mut parser = Parser::new(&dialect).try_with_sql(s)?;
let idents = parser.parse_multipart_identifier()?;
Ok(idents)
}
#[cfg(feature = "sql")]
pub(crate) fn parse_identifiers_normalized(s: &str, ignore_case: bool) -> Vec<String> {
parse_identifiers(s)
.unwrap_or_default()
.into_iter()
.map(|id| match id.quote_style {
Some(_) => id.value,
None if ignore_case => id.value,
_ => id.value.to_ascii_lowercase(),
})
.collect::<Vec<_>>()
}
#[cfg(not(feature = "sql"))]
pub(crate) fn parse_identifiers(s: &str) -> Result<Vec<String>> {
let mut result = Vec::new();
let mut current = String::new();
let mut in_quotes = false;
for ch in s.chars() {
match ch {
'"' => {
in_quotes = !in_quotes;
current.push(ch);
}
'.' if !in_quotes => {
result.push(current.clone());
current.clear();
}
_ => {
current.push(ch);
}
}
}
if !current.is_empty() {
result.push(current);
}
Ok(result)
}
#[cfg(not(feature = "sql"))]
pub(crate) fn parse_identifiers_normalized(s: &str, ignore_case: bool) -> Vec<String> {
parse_identifiers(s)
.unwrap_or_default()
.into_iter()
.map(|id| {
let is_double_quoted = if id.len() > 2 {
let mut chars = id.chars();
chars.next() == Some('"') && chars.last() == Some('"')
} else {
false
};
if is_double_quoted {
id[1..id.len() - 1].to_string().replace("\"\"", "\"")
} else if ignore_case {
id
} else {
id.to_ascii_lowercase()
}
})
.collect::<Vec<_>>()
}
pub fn get_at_indices<T: Clone, I: Borrow<usize>>(
items: &[T],
indices: impl IntoIterator<Item = I>,
) -> Result<Vec<T>> {
indices
.into_iter()
.map(|idx| items.get(*idx.borrow()).cloned())
.collect::<Option<Vec<T>>>()
.ok_or_else(|| {
_exec_datafusion_err!("Expects indices to be in the range of searched vector")
})
}
pub fn longest_consecutive_prefix<T: Borrow<usize>>(
sequence: impl IntoIterator<Item = T>,
) -> usize {
let mut count = 0;
for item in sequence {
if !count.eq(item.borrow()) {
break;
}
count += 1;
}
count
}
#[derive(Debug, Clone)]
pub struct SingleRowListArrayBuilder {
arr: ArrayRef,
nullable: bool,
field_name: Option<String>,
}
impl SingleRowListArrayBuilder {
pub fn new(arr: ArrayRef) -> Self {
Self {
arr,
nullable: true,
field_name: None,
}
}
pub fn with_nullable(mut self, nullable: bool) -> Self {
self.nullable = nullable;
self
}
pub fn with_field_name(mut self, field_name: Option<String>) -> Self {
self.field_name = field_name;
self
}
pub fn with_field(self, field: &Field) -> Self {
self.with_field_name(Some(field.name().to_owned()))
.with_nullable(field.is_nullable())
}
pub fn build_list_array(self) -> ListArray {
let (field, arr) = self.into_field_and_arr();
let offsets = OffsetBuffer::from_lengths([arr.len()]);
ListArray::new(field, offsets, arr, None)
}
pub fn build_list_scalar(self) -> ScalarValue {
ScalarValue::List(Arc::new(self.build_list_array()))
}
pub fn build_large_list_array(self) -> LargeListArray {
let (field, arr) = self.into_field_and_arr();
let offsets = OffsetBuffer::from_lengths([arr.len()]);
LargeListArray::new(field, offsets, arr, None)
}
pub fn build_large_list_scalar(self) -> ScalarValue {
ScalarValue::LargeList(Arc::new(self.build_large_list_array()))
}
pub fn build_fixed_size_list_array(self, list_size: usize) -> FixedSizeListArray {
let (field, arr) = self.into_field_and_arr();
FixedSizeListArray::new(field, list_size as i32, arr, None)
}
pub fn build_fixed_size_list_scalar(self, list_size: usize) -> ScalarValue {
ScalarValue::FixedSizeList(Arc::new(self.build_fixed_size_list_array(list_size)))
}
pub fn build_list_view_array(self) -> ListViewArray {
let (field, arr) = self.into_field_and_arr();
let offsets = ScalarBuffer::from(vec![0]);
let sizes = ScalarBuffer::from(vec![i32::try_from(arr.len()).expect(
"Trying to construct a ListView where element length exceeds i32::MAX",
)]);
ListViewArray::new(field, offsets, sizes, arr, None)
}
pub fn build_list_view_scalar(self) -> ScalarValue {
ScalarValue::ListView(Arc::new(self.build_list_view_array()))
}
pub fn build_large_list_view_array(self) -> LargeListViewArray {
let (field, arr) = self.into_field_and_arr();
let offsets = ScalarBuffer::from(vec![0]);
let sizes = ScalarBuffer::from(vec![arr.len() as i64]);
LargeListViewArray::new(field, offsets, sizes, arr, None)
}
pub fn build_large_list_view_scalar(self) -> ScalarValue {
ScalarValue::LargeListView(Arc::new(self.build_large_list_view_array()))
}
fn into_field_and_arr(self) -> (Arc<Field>, ArrayRef) {
let Self {
arr,
nullable,
field_name,
} = self;
let data_type = arr.data_type().to_owned();
let field = match field_name {
Some(name) => Field::new(name, data_type, nullable),
None => Field::new_list_field(data_type, nullable),
};
(Arc::new(field), arr)
}
}
pub fn arrays_into_list_array(
arr: impl IntoIterator<Item = ArrayRef>,
) -> Result<ListArray> {
let arr = arr.into_iter().collect::<Vec<_>>();
assert_or_internal_err!(!arr.is_empty(), "Cannot wrap empty array into list array");
let lens = arr.iter().map(|x| x.len()).collect::<Vec<_>>();
let data_type = arr[0].data_type().to_owned();
let values = arr.iter().map(|x| x.as_ref()).collect::<Vec<_>>();
Ok(ListArray::new(
Arc::new(Field::new_list_field(data_type, true)),
OffsetBuffer::from_lengths(lens),
arrow::compute::concat(values.as_slice())?,
None,
))
}
pub fn list_to_arrays<O: OffsetSizeTrait>(a: &ArrayRef) -> Vec<ArrayRef> {
a.as_list::<O>().iter().flatten().collect::<Vec<_>>()
}
pub fn fixed_size_list_to_arrays(a: &ArrayRef) -> Vec<ArrayRef> {
a.as_fixed_size_list().iter().flatten().collect::<Vec<_>>()
}
pub fn base_type(data_type: &DataType) -> DataType {
match data_type {
DataType::List(field)
| DataType::LargeList(field)
| DataType::ListView(field)
| DataType::LargeListView(field)
| DataType::FixedSizeList(field, _) => base_type(field.data_type()),
_ => data_type.to_owned(),
}
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
pub enum ListCoercion {
FixedSizedListToList,
}
pub fn coerced_type_with_base_type_only(
data_type: &DataType,
base_type: &DataType,
array_coercion: Option<&ListCoercion>,
) -> DataType {
match (data_type, array_coercion) {
(DataType::List(field), _)
| (DataType::FixedSizeList(field, _), Some(ListCoercion::FixedSizedListToList)) =>
{
let field_type = coerced_type_with_base_type_only(
field.data_type(),
base_type,
array_coercion,
);
DataType::List(Arc::new(Field::new(
field.name(),
field_type,
field.is_nullable(),
)))
}
(DataType::FixedSizeList(field, len), _) => {
let field_type = coerced_type_with_base_type_only(
field.data_type(),
base_type,
array_coercion,
);
DataType::FixedSizeList(
Arc::new(Field::new(field.name(), field_type, field.is_nullable())),
*len,
)
}
(DataType::ListView(field), _) => {
let field_type = coerced_type_with_base_type_only(
field.data_type(),
base_type,
array_coercion,
);
DataType::ListView(Arc::new(Field::new(
field.name(),
field_type,
field.is_nullable(),
)))
}
(DataType::LargeList(field), _) => {
let field_type = coerced_type_with_base_type_only(
field.data_type(),
base_type,
array_coercion,
);
DataType::LargeList(Arc::new(Field::new(
field.name(),
field_type,
field.is_nullable(),
)))
}
(DataType::LargeListView(field), _) => {
let field_type = coerced_type_with_base_type_only(
field.data_type(),
base_type,
array_coercion,
);
DataType::LargeListView(Arc::new(Field::new(
field.name(),
field_type,
field.is_nullable(),
)))
}
_ => base_type.clone(),
}
}
pub fn coerced_fixed_size_list_to_list(data_type: &DataType) -> DataType {
match data_type {
DataType::List(field) | DataType::FixedSizeList(field, _) => {
let field_type = coerced_fixed_size_list_to_list(field.data_type());
DataType::List(Arc::new(Field::new(
field.name(),
field_type,
field.is_nullable(),
)))
}
DataType::ListView(field) => {
let field_type = coerced_fixed_size_list_to_list(field.data_type());
DataType::ListView(Arc::new(Field::new(
field.name(),
field_type,
field.is_nullable(),
)))
}
DataType::LargeList(field) => {
let field_type = coerced_fixed_size_list_to_list(field.data_type());
DataType::LargeList(Arc::new(Field::new(
field.name(),
field_type,
field.is_nullable(),
)))
}
DataType::LargeListView(field) => {
let field_type = coerced_fixed_size_list_to_list(field.data_type());
DataType::LargeListView(Arc::new(Field::new(
field.name(),
field_type,
field.is_nullable(),
)))
}
_ => data_type.clone(),
}
}
pub fn list_ndims(data_type: &DataType) -> u64 {
match data_type {
DataType::List(field)
| DataType::LargeList(field)
| DataType::ListView(field)
| DataType::LargeListView(field)
| DataType::FixedSizeList(field, _) => 1 + list_ndims(field.data_type()),
_ => 0,
}
}
pub mod datafusion_strsim {
use std::cmp::min;
use std::str::Chars;
struct StringWrapper<'a>(&'a str);
impl<'b> IntoIterator for &StringWrapper<'b> {
type Item = char;
type IntoIter = Chars<'b>;
fn into_iter(self) -> Self::IntoIter {
self.0.chars()
}
}
fn generic_levenshtein_with_buffer<'a, 'b, Iter1, Iter2, Elem1, Elem2>(
a: &'a Iter1,
b: &'b Iter2,
cache: &mut Vec<usize>,
) -> usize
where
&'a Iter1: IntoIterator<Item = Elem1>,
&'b Iter2: IntoIterator<Item = Elem2>,
Elem1: PartialEq<Elem2>,
{
let b_len = b.into_iter().count();
if a.into_iter().next().is_none() {
return b_len;
}
cache.clear();
cache.extend(1..=b_len);
let mut result = 0;
for (i, a_elem) in a.into_iter().enumerate() {
result = i + 1;
let mut distance_b = i;
for (j, b_elem) in b.into_iter().enumerate() {
let cost = if a_elem == b_elem { 0usize } else { 1usize };
let distance_a = distance_b + cost;
distance_b = cache[j];
result = min(result + 1, min(distance_a, distance_b + 1));
cache[j] = result;
}
}
result
}
fn generic_levenshtein<'a, 'b, Iter1, Iter2, Elem1, Elem2>(
a: &'a Iter1,
b: &'b Iter2,
) -> usize
where
&'a Iter1: IntoIterator<Item = Elem1>,
&'b Iter2: IntoIterator<Item = Elem2>,
Elem1: PartialEq<Elem2>,
{
let mut cache = Vec::new();
generic_levenshtein_with_buffer(a, b, &mut cache)
}
pub fn levenshtein(a: &str, b: &str) -> usize {
generic_levenshtein(&StringWrapper(a), &StringWrapper(b))
}
pub fn levenshtein_with_buffer(a: &str, b: &str, cache: &mut Vec<usize>) -> usize {
generic_levenshtein_with_buffer(&StringWrapper(a), &StringWrapper(b), cache)
}
pub fn normalized_levenshtein(a: &str, b: &str) -> f64 {
if a.is_empty() && b.is_empty() {
return 1.0;
}
1.0 - (levenshtein(a, b) as f64)
/ (a.chars().count().max(b.chars().count()) as f64)
}
}
pub fn merge_and_order_indices<T: Borrow<usize>, S: Borrow<usize>>(
first: impl IntoIterator<Item = T>,
second: impl IntoIterator<Item = S>,
) -> Vec<usize> {
let mut result: Vec<_> = first
.into_iter()
.map(|e| *e.borrow())
.chain(second.into_iter().map(|e| *e.borrow()))
.collect::<HashSet<_>>()
.into_iter()
.collect();
result.sort();
result
}
pub fn set_difference<T: Borrow<usize>, S: Borrow<usize>>(
first: impl IntoIterator<Item = T>,
second: impl IntoIterator<Item = S>,
) -> Vec<usize> {
let set: HashSet<_> = second.into_iter().map(|e| *e.borrow()).collect();
first
.into_iter()
.map(|e| *e.borrow())
.filter(|e| !set.contains(e))
.collect()
}
pub fn find_indices<T: PartialEq, S: Borrow<T>>(
items: &[T],
targets: impl IntoIterator<Item = S>,
) -> Result<Vec<usize>> {
targets
.into_iter()
.map(|target| items.iter().position(|e| target.borrow().eq(e)))
.collect::<Option<_>>()
.ok_or_else(|| _exec_datafusion_err!("Target not found"))
}
pub fn transpose<T>(original: Vec<Vec<T>>) -> Vec<Vec<T>> {
match original.as_slice() {
[] => vec![],
[first, ..] => {
let mut result = (0..first.len()).map(|_| vec![]).collect::<Vec<_>>();
for row in original {
for (item, transposed_row) in row.into_iter().zip(&mut result) {
transposed_row.push(item);
}
}
result
}
}
}
pub fn combine_limit(
parent_skip: usize,
parent_fetch: Option<usize>,
child_skip: usize,
child_fetch: Option<usize>,
) -> (usize, Option<usize>) {
let combined_skip = child_skip.saturating_add(parent_skip);
let combined_fetch = match (parent_fetch, child_fetch) {
(Some(parent_fetch), Some(child_fetch)) => {
Some(min(parent_fetch, child_fetch.saturating_sub(parent_skip)))
}
(Some(parent_fetch), None) => Some(parent_fetch),
(None, Some(child_fetch)) => Some(child_fetch.saturating_sub(parent_skip)),
(None, None) => None,
};
(combined_skip, combined_fetch)
}
pub fn get_available_parallelism() -> usize {
static PARALLELISM: LazyLock<usize> = LazyLock::new(|| {
available_parallelism()
.unwrap_or(NonZero::new(1).expect("literal value `1` shouldn't be zero"))
.get()
});
*PARALLELISM
}
pub fn take_function_args<const N: usize, T>(
function_name: &str,
args: impl IntoIterator<Item = T>,
) -> Result<[T; N]> {
let args = args.into_iter().collect::<Vec<_>>();
args.try_into().map_err(|v: Vec<T>| {
_exec_datafusion_err!(
"{} function requires {} {}, got {}",
function_name,
N,
if N == 1 { "argument" } else { "arguments" },
v.len()
)
})
}
pub fn list_values(array: &dyn Array) -> Result<ArrayRef> {
match array.data_type() {
DataType::List(_) => Ok(sliced_list_values(array.as_list::<i32>())),
DataType::LargeList(_) => Ok(sliced_list_values(array.as_list::<i64>())),
DataType::FixedSizeList(_, _) => {
Ok(Arc::clone(array.as_fixed_size_list().values()))
}
other => _exec_err!("expected list, got {other}"),
}
}
fn sliced_list_values<O: OffsetSizeTrait>(list: &GenericListArray<O>) -> ArrayRef {
let values = list.values();
let offsets = list.offsets();
if let (Some(first), Some(last)) = (offsets.first(), offsets.last()) {
let first = first.as_usize();
let last = last.as_usize();
if first != 0 || last != values.len() {
return values.slice(first, last - first);
}
}
Arc::clone(values)
}
pub fn adjust_offsets_for_slice<O: OffsetSizeTrait>(
list: &GenericListArray<O>,
) -> OffsetBuffer<O> {
let offsets = list.offsets();
if let (Some(first), Some(last)) = (offsets.first(), offsets.last())
&& (!first.is_zero() || last.as_usize() != list.values().len())
{
let offsets = offsets.iter().map(|offset| *offset - *first).collect();
return OffsetBuffer::new(offsets);
}
offsets.clone()
}
pub fn remove_list_null_values(array: &ArrayRef) -> Result<ArrayRef> {
match array.data_type() {
DataType::List(_) => Ok(Arc::new(truncate_list_nulls(array.as_list::<i32>())?)),
DataType::LargeList(_) => {
Ok(Arc::new(truncate_list_nulls(array.as_list::<i64>())?))
}
dt => _exec_err!("expected List or LargeList, got {dt}"),
}
}
fn truncate_list_nulls<O: OffsetSizeTrait>(
list: &GenericListArray<O>,
) -> Result<GenericListArray<O>> {
if let Some(nulls) = list.nulls()
&& nulls.null_count() > 0
{
let lengths = length(list)?;
let zero: &dyn Datum = if lengths.data_type() == &DataType::Int32 {
&Int32Array::new_scalar(0)
} else {
&Int64Array::new_scalar(0)
};
let (mut valid_or_empty, _nulls) = eq(&lengths, zero)?.into_parts();
valid_or_empty |= nulls.inner();
let valid_or_empty = BooleanArray::from(valid_or_empty);
if valid_or_empty.has_false() {
let array_data = list.values().to_data();
let offsets = list.offsets();
let capacity = offsets[offsets.len() - 1] - offsets[0];
let mut mutable_array_data =
MutableArrayData::new(vec![&array_data], false, capacity.as_usize());
let (valid_or_empty, _nulls) = valid_or_empty.into_parts();
for (start, end) in valid_or_empty.set_slices() {
mutable_array_data.extend(
0,
offsets[start].as_usize(),
offsets[end].as_usize(),
);
}
let lengths = std::iter::zip(offsets.lengths(), nulls)
.map(|(length, is_valid)| if is_valid { length } else { 0 });
let offsets = OffsetBuffer::from_lengths(lengths);
let values = make_array(mutable_array_data.freeze());
let field = match list.data_type() {
DataType::List(field) => field,
DataType::LargeList(field) => field,
_ => unreachable!(),
};
return Ok(GenericListArray::try_new(
Arc::clone(field),
offsets,
values,
list.nulls().cloned(),
)?);
}
}
Ok(list.clone())
}
pub fn list_values_row_number(array: &dyn Array) -> Result<ArrayRef> {
match array.data_type() {
DataType::List(_) => Ok(Arc::new(variable_size_list_values_row_number::<
Int32Type,
>(array.as_list().offsets()))),
DataType::LargeList(_) => Ok(Arc::new(variable_size_list_values_row_number::<
Int64Type,
>(array.as_list().offsets()))),
DataType::ListView(_) => Ok(Arc::new(variable_size_list_values_row_number::<
Int32Type,
>(array.as_list_view().offsets()))),
DataType::LargeListView(_) => {
Ok(Arc::new(variable_size_list_values_row_number::<Int64Type>(
array.as_list_view().offsets(),
)))
}
DataType::FixedSizeList(_, _) => {
let fixed_size_list = array.as_fixed_size_list();
Ok(Arc::new(fsl_values_row_number(
fixed_size_list.value_length(),
fixed_size_list.len(),
)?))
}
DataType::Map(_, _) => Ok(Arc::new(variable_size_list_values_row_number::<
Int32Type,
>(array.as_map().offsets()))),
other => _exec_err!("expected list, got {other}"),
}
}
fn variable_size_list_values_row_number<T: ArrowPrimitiveType>(
offsets: &[T::Native],
) -> PrimitiveArray<T> {
let mut rows_number = Vec::with_capacity(
offsets[offsets.len() - 1].to_usize().unwrap() - offsets[0].to_usize().unwrap(),
);
for (i, w) in offsets.windows(2).enumerate() {
let len = w[1].as_usize() - w[0].as_usize();
rows_number.extend(repeat_n(T::Native::usize_as(i), len));
}
PrimitiveArray::new(rows_number.into(), None)
}
fn fsl_values_row_number(list_size: i32, array_len: usize) -> Result<Int32Array> {
let list_size = list_size.to_usize().ok_or_else(|| {
_exec_datafusion_err!("fsl_values_index: invalid list_size {list_size}")
})?;
let mut rows_number = Vec::with_capacity(list_size * array_len);
for i in 0..array_len {
rows_number.extend(repeat_n(i as i32, list_size));
}
Ok(PrimitiveArray::new(rows_number.into(), None))
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use super::*;
use crate::ScalarValue::Null;
use arrow::{
array::{Float64Array, Int32Array},
buffer::NullBuffer,
datatypes::Int32Type,
};
use sqlparser::ast::Ident;
#[test]
fn test_bisect_linear_left_and_right() -> Result<()> {
let arrays: Vec<ArrayRef> = vec![
Arc::new(Float64Array::from(vec![5.0, 7.0, 8.0, 9., 10.])),
Arc::new(Float64Array::from(vec![2.0, 3.0, 3.0, 4.0, 5.0])),
Arc::new(Float64Array::from(vec![5.0, 7.0, 8.0, 10., 11.0])),
Arc::new(Float64Array::from(vec![15.0, 13.0, 8.0, 5., 0.0])),
];
let search_tuple: Vec<ScalarValue> = vec![
ScalarValue::Float64(Some(8.0)),
ScalarValue::Float64(Some(3.0)),
ScalarValue::Float64(Some(8.0)),
ScalarValue::Float64(Some(8.0)),
];
let ords = [
SortOptions {
descending: false,
nulls_first: true,
},
SortOptions {
descending: false,
nulls_first: true,
},
SortOptions {
descending: false,
nulls_first: true,
},
SortOptions {
descending: true,
nulls_first: true,
},
];
let res = bisect::<true>(&arrays, &search_tuple, &ords)?;
assert_eq!(res, 2);
let res = bisect::<false>(&arrays, &search_tuple, &ords)?;
assert_eq!(res, 3);
let res = linear_search::<true>(&arrays, &search_tuple, &ords)?;
assert_eq!(res, 2);
let res = linear_search::<false>(&arrays, &search_tuple, &ords)?;
assert_eq!(res, 3);
Ok(())
}
#[test]
fn vector_ord() {
assert!(vec![1, 0, 0, 0, 0, 0, 0, 1] < vec![1, 0, 0, 0, 0, 0, 0, 2]);
assert!(vec![1, 0, 0, 0, 0, 0, 1, 1] > vec![1, 0, 0, 0, 0, 0, 0, 2]);
assert!(
vec![
ScalarValue::Int32(Some(2)),
Null,
ScalarValue::Int32(Some(0)),
] < vec![
ScalarValue::Int32(Some(2)),
Null,
ScalarValue::Int32(Some(1)),
]
);
assert!(
vec![
ScalarValue::Int32(Some(2)),
ScalarValue::Int32(None),
ScalarValue::Int32(Some(0)),
] < vec![
ScalarValue::Int32(Some(2)),
ScalarValue::Int32(None),
ScalarValue::Int32(Some(1)),
]
);
}
#[test]
fn ord_same_type() {
assert!((ScalarValue::Int32(Some(2)) < ScalarValue::Int32(Some(3))));
}
#[test]
fn test_bisect_linear_left_and_right_diff_sort() -> Result<()> {
let arrays: Vec<ArrayRef> =
vec![Arc::new(Float64Array::from(vec![4.0, 3.0, 2.0, 1.0, 0.0]))];
let search_tuple: Vec<ScalarValue> = vec![ScalarValue::Float64(Some(4.0))];
let ords = [SortOptions {
descending: true,
nulls_first: true,
}];
let res = bisect::<true>(&arrays, &search_tuple, &ords)?;
assert_eq!(res, 0);
let res = linear_search::<true>(&arrays, &search_tuple, &ords)?;
assert_eq!(res, 0);
let arrays: Vec<ArrayRef> =
vec![Arc::new(Float64Array::from(vec![4.0, 3.0, 2.0, 1.0, 0.0]))];
let search_tuple: Vec<ScalarValue> = vec![ScalarValue::Float64(Some(4.0))];
let ords = [SortOptions {
descending: true,
nulls_first: true,
}];
let res = bisect::<false>(&arrays, &search_tuple, &ords)?;
assert_eq!(res, 1);
let res = linear_search::<false>(&arrays, &search_tuple, &ords)?;
assert_eq!(res, 1);
let arrays: Vec<ArrayRef> =
vec![Arc::new(Float64Array::from(vec![5.0, 7.0, 8.0, 9., 10.]))];
let search_tuple: Vec<ScalarValue> = vec![ScalarValue::Float64(Some(7.0))];
let ords = [SortOptions {
descending: false,
nulls_first: true,
}];
let res = bisect::<true>(&arrays, &search_tuple, &ords)?;
assert_eq!(res, 1);
let res = linear_search::<true>(&arrays, &search_tuple, &ords)?;
assert_eq!(res, 1);
let arrays: Vec<ArrayRef> =
vec![Arc::new(Float64Array::from(vec![5.0, 7.0, 8.0, 9., 10.]))];
let search_tuple: Vec<ScalarValue> = vec![ScalarValue::Float64(Some(7.0))];
let ords = [SortOptions {
descending: false,
nulls_first: true,
}];
let res = bisect::<false>(&arrays, &search_tuple, &ords)?;
assert_eq!(res, 2);
let res = linear_search::<false>(&arrays, &search_tuple, &ords)?;
assert_eq!(res, 2);
let arrays: Vec<ArrayRef> = vec![
Arc::new(Float64Array::from(vec![5.0, 7.0, 8.0, 8.0, 9., 10.])),
Arc::new(Float64Array::from(vec![10.0, 9.0, 8.0, 7.5, 7., 6.])),
];
let search_tuple: Vec<ScalarValue> = vec![
ScalarValue::Float64(Some(8.0)),
ScalarValue::Float64(Some(8.0)),
];
let ords = [
SortOptions {
descending: false,
nulls_first: true,
},
SortOptions {
descending: true,
nulls_first: true,
},
];
let res = bisect::<false>(&arrays, &search_tuple, &ords)?;
assert_eq!(res, 3);
let res = linear_search::<false>(&arrays, &search_tuple, &ords)?;
assert_eq!(res, 3);
let res = bisect::<true>(&arrays, &search_tuple, &ords)?;
assert_eq!(res, 2);
let res = linear_search::<true>(&arrays, &search_tuple, &ords)?;
assert_eq!(res, 2);
Ok(())
}
#[test]
fn test_evaluate_partition_ranges() -> Result<()> {
let arrays: Vec<ArrayRef> = vec![
Arc::new(Float64Array::from(vec![1.0, 1.0, 1.0, 2.0, 2.0, 2.0])),
Arc::new(Float64Array::from(vec![4.0, 4.0, 3.0, 2.0, 1.0, 1.0])),
];
let n_row = arrays[0].len();
let options: Vec<SortOptions> = vec![
SortOptions {
descending: false,
nulls_first: false,
},
SortOptions {
descending: true,
nulls_first: false,
},
];
let sort_columns = arrays
.into_iter()
.zip(options)
.map(|(values, options)| SortColumn {
values,
options: Some(options),
})
.collect::<Vec<_>>();
let ranges = evaluate_partition_ranges(n_row, &sort_columns)?;
assert_eq!(ranges.len(), 4);
assert_eq!(ranges[0], Range { start: 0, end: 2 });
assert_eq!(ranges[1], Range { start: 2, end: 3 });
assert_eq!(ranges[2], Range { start: 3, end: 4 });
assert_eq!(ranges[3], Range { start: 4, end: 6 });
Ok(())
}
#[cfg(feature = "sql")]
#[test]
fn test_quote_identifier() -> Result<()> {
let cases = vec![
("foo", r#"foo"#),
("_foo", r#"_foo"#),
("foo_bar", r#"foo_bar"#),
("foo-bar", r#""foo-bar""#),
("foo.bar", r#""foo.bar""#),
("Foo", r#""Foo""#),
("Foo.Bar", r#""Foo.Bar""#),
("test1", r#"test1"#),
("1test", r#""1test""#),
];
for (identifier, quoted_identifier) in cases {
println!("input: \n{identifier}\nquoted_identifier:\n{quoted_identifier}");
assert_eq!(quote_identifier(identifier), quoted_identifier);
let quote_style = if quoted_identifier.starts_with('"') {
Some('"')
} else {
None
};
let expected_parsed = vec![Ident {
value: identifier.to_string(),
quote_style,
span: sqlparser::tokenizer::Span::empty(),
}];
assert_eq!(
parse_identifiers(quoted_identifier).unwrap(),
expected_parsed
);
}
Ok(())
}
#[test]
fn test_get_at_indices() -> Result<()> {
let in_vec = vec![1, 2, 3, 4, 5, 6, 7];
assert_eq!(get_at_indices(&in_vec, [0, 2])?, vec![1, 3]);
assert_eq!(get_at_indices(&in_vec, [4, 2])?, vec![5, 3]);
assert!(get_at_indices(&in_vec, [7]).is_err());
Ok(())
}
#[test]
fn test_longest_consecutive_prefix() {
assert_eq!(longest_consecutive_prefix([0, 3, 4]), 1);
assert_eq!(longest_consecutive_prefix([0, 1, 3, 4]), 2);
assert_eq!(longest_consecutive_prefix([0, 1, 2, 3, 4]), 5);
assert_eq!(longest_consecutive_prefix([1, 2, 3, 4]), 0);
}
#[test]
fn test_merge_and_order_indices() {
assert_eq!(
merge_and_order_indices([0, 3, 4], [1, 3, 5]),
vec![0, 1, 3, 4, 5]
);
assert_eq!(
merge_and_order_indices([3, 0, 4], [5, 1, 3]),
vec![0, 1, 3, 4, 5]
);
}
#[test]
fn test_set_difference() {
assert_eq!(set_difference([0, 3, 4], [1, 2]), vec![0, 3, 4]);
assert_eq!(set_difference([0, 3, 4], [1, 2, 4]), vec![0, 3]);
assert_eq!(set_difference([3, 4, 0], [1, 2, 4]), vec![3, 0]);
assert_eq!(set_difference([0, 3, 4], [4, 1, 2]), vec![0, 3]);
assert_eq!(set_difference([3, 4, 0], [4, 1, 2]), vec![3, 0]);
}
#[test]
fn test_find_indices() -> Result<()> {
assert_eq!(find_indices(&[0, 3, 4], [0, 3, 4])?, vec![0, 1, 2]);
assert_eq!(find_indices(&[0, 3, 4], [0, 4, 3])?, vec![0, 2, 1]);
assert_eq!(find_indices(&[3, 0, 4], [0, 3])?, vec![1, 0]);
assert!(find_indices(&[0, 3], [0, 3, 4]).is_err());
assert!(find_indices(&[0, 3, 4], [0, 2]).is_err());
Ok(())
}
#[test]
fn test_transpose() -> Result<()> {
let in_data = vec![vec![1, 2, 3], vec![4, 5, 6]];
let transposed = transpose(in_data);
let expected = vec![vec![1, 4], vec![2, 5], vec![3, 6]];
assert_eq!(expected, transposed);
Ok(())
}
#[test]
fn test_sliced_list_values() {
let data = vec![
Some(vec![Some(0), Some(1), Some(2)]),
None,
Some(vec![Some(3), None, Some(5)]),
Some(vec![Some(6), Some(7)]),
];
let list = ListArray::from_iter_primitive::<Int32Type, _, _>(data);
assert_eq!(
sliced_list_values(&list).as_primitive(),
&Int32Array::from(vec![
Some(0),
Some(1),
Some(2),
Some(3),
None,
Some(5),
Some(6),
Some(7)
])
);
assert_eq!(
sliced_list_values(&list.slice(0, 1)).as_primitive(),
&Int32Array::from(vec![Some(0), Some(1), Some(2)])
);
assert_eq!(
sliced_list_values(&list.slice(2, 1)).as_primitive(),
&Int32Array::from(vec![Some(3), None, Some(5)])
);
assert_eq!(
sliced_list_values(&list.slice(3, 1)).as_primitive(),
&Int32Array::from(vec![Some(6), Some(7)])
);
assert!(sliced_list_values(&list.slice(0, 0)).is_empty());
assert!(sliced_list_values(&list.slice(1, 0)).is_empty());
assert!(sliced_list_values(&list.slice(3, 0)).is_empty());
}
#[test]
fn test_adjust_offsets() {
let data = vec![
Some(vec![Some(0), Some(1), Some(2)]),
None,
Some(vec![Some(3), None, Some(5)]),
Some(vec![Some(6), Some(7)]),
];
let list = ListArray::from_iter_primitive::<Int32Type, _, _>(data);
assert_eq!(
adjust_offsets_for_slice(&list),
OffsetBuffer::from_lengths([3, 0, 3, 2])
);
assert_eq!(
adjust_offsets_for_slice(&list.slice(0, 1)),
OffsetBuffer::from_lengths([3])
);
assert_eq!(
adjust_offsets_for_slice(&list.slice(1, 2)),
OffsetBuffer::from_lengths([0, 3])
);
assert_eq!(
adjust_offsets_for_slice(&list.slice(1, 3)),
OffsetBuffer::from_lengths([0, 3, 2])
);
assert_eq!(
adjust_offsets_for_slice(&list.slice(0, 0)),
OffsetBuffer::from_lengths([])
);
assert_eq!(
adjust_offsets_for_slice(&list.slice(1, 0)),
OffsetBuffer::from_lengths([])
);
assert_eq!(
adjust_offsets_for_slice(&list.slice(3, 0)),
OffsetBuffer::from_lengths([])
);
}
fn create_i32_list(
values: impl Into<Int32Array>,
offsets: OffsetBuffer<i32>,
nulls: Option<NullBuffer>,
) -> ListArray {
let list_field = Arc::new(Field::new_list_field(DataType::Int32, true));
ListArray::new(list_field, offsets, Arc::new(values.into()), nulls)
}
#[test]
fn test_remove_list_null_values_list() {
let list = Arc::new(create_i32_list(
vec![100, 20, 10, 0, 0, 0, 0, 1, 50],
OffsetBuffer::<i32>::from_lengths(vec![3, 4, 0, 2, 0]),
Some(NullBuffer::from(vec![true, false, false, true, false])),
)) as ArrayRef;
let res = remove_list_null_values(&list).unwrap();
let res = res.as_list::<i32>();
let expected = Arc::new(create_i32_list(
vec![100, 20, 10, 1, 50],
OffsetBuffer::<i32>::from_lengths(vec![3, 0, 0, 2, 0]),
Some(NullBuffer::from(vec![true, false, false, true, false])),
)) as ArrayRef;
let expected = expected.as_list::<i32>();
assert_eq!(res, expected);
assert_eq!(res.values(), expected.values());
assert_eq!(res.offsets(), expected.offsets());
}
#[test]
fn test_list_array_values_row_number() {
assert_eq!(
variable_size_list_values_row_number::<Int32Type>(
&OffsetBuffer::from_lengths([1, 3, 0, 2,])
),
Int32Array::from(vec![0, 1, 1, 1, 3, 3])
);
assert_eq!(
variable_size_list_values_row_number::<Int32Type>(
&OffsetBuffer::from_lengths([])
),
Int32Array::new_null(0)
);
assert_eq!(
variable_size_list_values_row_number::<Int32Type>(
&OffsetBuffer::from_lengths([0])
),
Int32Array::new_null(0)
);
assert_eq!(
variable_size_list_values_row_number::<Int32Type>(
&OffsetBuffer::from_lengths([0, 0])
),
Int32Array::new_null(0)
);
assert_eq!(
variable_size_list_values_row_number::<Int32Type>(
&OffsetBuffer::from_lengths([1])
),
Int32Array::from(vec![0])
);
assert_eq!(
variable_size_list_values_row_number::<Int32Type>(
&OffsetBuffer::from_lengths([2])
),
Int32Array::from(vec![0, 0])
);
}
#[test]
fn test_fsl_values_row_number() {
assert_eq!(
fsl_values_row_number(2, 3).unwrap(),
Int32Array::from(vec![0, 0, 1, 1, 2, 2])
);
assert_eq!(
fsl_values_row_number(1, 3).unwrap(),
Int32Array::from(vec![0, 1, 2])
);
assert_eq!(
fsl_values_row_number(2, 1).unwrap(),
Int32Array::from(vec![0, 0])
);
assert_eq!(
fsl_values_row_number(2, 0).unwrap(),
Int32Array::new_null(0),
);
assert_eq!(
fsl_values_row_number(0, 2).unwrap(),
Int32Array::new_null(0),
);
assert_eq!(
fsl_values_row_number(0, 0).unwrap(),
Int32Array::new_null(0),
);
fsl_values_row_number(-1, 2).unwrap_err();
fsl_values_row_number(-1, 0).unwrap_err();
}
}