use std::sync::Arc;
use datafusion::arrow::array::{
Array, ArrayRef, Decimal128Array, Float64Array, GenericListArray, Int64Array, StructArray,
UInt32Array,
};
use datafusion::arrow::buffer::OffsetBuffer;
use datafusion::arrow::datatypes::{DataType, Field, Fields};
use datafusion::common::ScalarValue;
use datafusion::config::ConfigOptions;
use datafusion::logical_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl};
use super::array_helpers::{ArrayAvgUdf, ArraySumUdf};
use super::map_from_entries::MapFromEntriesUdf;
use super::width_bucket::WidthBucketArrayUdf;
fn make_large_list_i64(values: &[Option<&[i64]>]) -> ArrayRef {
let mut all_values: Vec<i64> = Vec::new();
let mut offsets: Vec<i64> = vec![0];
let mut nulls: Vec<bool> = Vec::new();
for row in values {
match row {
Some(vals) => {
all_values.extend_from_slice(vals);
offsets.push(all_values.len() as i64);
nulls.push(true);
}
None => {
offsets.push(all_values.len() as i64);
nulls.push(false);
}
}
}
let values_array: ArrayRef = Arc::new(Int64Array::from(all_values));
let field = Arc::new(Field::new("item", DataType::Int64, true));
let null_buffer = datafusion::arrow::buffer::NullBuffer::from(nulls);
Arc::new(
GenericListArray::<i64>::try_new(
field,
OffsetBuffer::new(offsets.into()),
values_array,
Some(null_buffer),
)
.expect("Failed to create LargeListArray"),
)
}
fn make_large_list_f64(values: &[Option<&[f64]>]) -> ArrayRef {
let mut all_values: Vec<f64> = Vec::new();
let mut offsets: Vec<i64> = vec![0];
let mut nulls: Vec<bool> = Vec::new();
for row in values {
match row {
Some(vals) => {
all_values.extend_from_slice(vals);
offsets.push(all_values.len() as i64);
nulls.push(true);
}
None => {
offsets.push(all_values.len() as i64);
nulls.push(false);
}
}
}
let values_array: ArrayRef = Arc::new(Float64Array::from(all_values));
let field = Arc::new(Field::new("item", DataType::Float64, true));
let null_buffer = datafusion::arrow::buffer::NullBuffer::from(nulls);
Arc::new(
GenericListArray::<i64>::try_new(
field,
OffsetBuffer::new(offsets.into()),
values_array,
Some(null_buffer),
)
.expect("Failed to create LargeListArray"),
)
}
fn make_large_list_decimal(values: &[Option<&[i128]>], precision: u8, scale: i8) -> ArrayRef {
let mut all_values: Vec<Option<i128>> = Vec::new();
let mut offsets: Vec<i64> = vec![0];
let mut nulls: Vec<bool> = Vec::new();
for row in values {
match row {
Some(vals) => {
all_values.extend(vals.iter().map(|v| Some(*v)));
offsets.push(all_values.len() as i64);
nulls.push(true);
}
None => {
offsets.push(all_values.len() as i64);
nulls.push(false);
}
}
}
let values_array: ArrayRef = Arc::new(
Decimal128Array::from(all_values)
.with_precision_and_scale(precision, scale)
.expect("Failed to set decimal precision/scale"),
);
let field = Arc::new(Field::new(
"item",
DataType::Decimal128(precision, scale),
true,
));
let null_buffer = datafusion::arrow::buffer::NullBuffer::from(nulls);
Arc::new(
GenericListArray::<i64>::try_new(
field,
OffsetBuffer::new(offsets.into()),
values_array,
Some(null_buffer),
)
.expect("Failed to create LargeListArray"),
)
}
fn invoke_scalar(udf: &dyn ScalarUDFImpl, args: Vec<ColumnarValue>) -> ColumnarValue {
try_invoke_scalar(udf, args).expect("UDF invoke failed")
}
fn try_invoke_scalar(
udf: &dyn ScalarUDFImpl,
args: Vec<ColumnarValue>,
) -> datafusion::common::Result<ColumnarValue> {
let arg_fields = args
.iter()
.map(|a| Arc::new(Field::new("arg", a.data_type(), true)))
.collect();
udf.invoke_with_args(ScalarFunctionArgs {
args,
arg_fields,
number_rows: 1,
return_field: Arc::new(Field::new("result", DataType::Null, true)),
config_options: Arc::new(ConfigOptions::default()),
})
}
#[test]
fn test_array_sum_large_list_columnar() {
let udf = ArraySumUdf::new();
let array = make_large_list_i64(&[Some(&[1, 2, 3]), None, Some(&[10, 20])]);
let result = invoke_scalar(&udf, vec![ColumnarValue::Array(array)]);
let ColumnarValue::Array(arr) = result else {
panic!("Expected array result");
};
let int_arr = arr
.as_any()
.downcast_ref::<Int64Array>()
.expect("Expected Int64Array");
assert_eq!(int_arr.value(0), 6);
assert!(int_arr.is_null(1));
assert_eq!(int_arr.value(2), 30);
}
#[test]
fn test_array_sum_large_list_f64_columnar() {
let udf = ArraySumUdf::new();
let array = make_large_list_f64(&[Some(&[1.5, 2.5, 3.0])]);
let result = invoke_scalar(&udf, vec![ColumnarValue::Array(array)]);
let ColumnarValue::Array(arr) = result else {
panic!("Expected array result");
};
let f64_arr = arr
.as_any()
.downcast_ref::<Float64Array>()
.expect("Expected Float64Array");
assert!((f64_arr.value(0) - 7.0).abs() < 1e-10);
}
#[test]
fn test_array_sum_large_list_scalar() {
let udf = ArraySumUdf::new();
let array = make_large_list_i64(&[Some(&[10, 20, 30])]);
let scalar = ScalarValue::try_from_array(&array, 0).expect("Failed to create scalar");
assert!(matches!(scalar, ScalarValue::LargeList(_)));
let result = invoke_scalar(&udf, vec![ColumnarValue::Scalar(scalar)]);
let ColumnarValue::Scalar(ScalarValue::Int64(Some(val))) = result else {
panic!("Expected Int64 scalar result");
};
assert_eq!(val, 60);
}
#[test]
fn test_array_sum_large_list_decimal_columnar() {
let udf = ArraySumUdf::new();
let array = make_large_list_decimal(&[Some(&[100, 200, 300]), None, Some(&[50])], 10, 2);
let result = invoke_scalar(&udf, vec![ColumnarValue::Array(array)]);
let ColumnarValue::Array(arr) = result else {
panic!("Expected array result");
};
let dec_arr = arr
.as_any()
.downcast_ref::<Decimal128Array>()
.expect("Expected Decimal128Array");
assert_eq!(dec_arr.value(0), 600); assert!(dec_arr.is_null(1));
assert_eq!(dec_arr.value(2), 50);
}
#[test]
fn test_array_avg_large_list_columnar() {
let udf = ArrayAvgUdf::new();
let array = make_large_list_i64(&[Some(&[10, 20, 30]), None, Some(&[4, 6])]);
let result = invoke_scalar(&udf, vec![ColumnarValue::Array(array)]);
let ColumnarValue::Array(arr) = result else {
panic!("Expected array result");
};
let float_arr = arr
.as_any()
.downcast_ref::<Float64Array>()
.expect("Expected Float64Array");
assert!((float_arr.value(0) - 20.0).abs() < f64::EPSILON); assert!(float_arr.is_null(1));
assert!((float_arr.value(2) - 5.0).abs() < f64::EPSILON); }
#[test]
fn test_array_avg_large_list_scalar() {
let udf = ArrayAvgUdf::new();
let array = make_large_list_i64(&[Some(&[10, 20, 30])]);
let scalar = ScalarValue::try_from_array(&array, 0).expect("Failed to create scalar");
assert!(matches!(scalar, ScalarValue::LargeList(_)));
let result = invoke_scalar(&udf, vec![ColumnarValue::Scalar(scalar)]);
let ColumnarValue::Scalar(ScalarValue::Float64(Some(val))) = result else {
panic!("Expected Float64 scalar result");
};
assert!((val - 20.0).abs() < f64::EPSILON); }
#[test]
fn test_array_avg_large_list_decimal_columnar() {
let udf = ArrayAvgUdf::new();
let array = make_large_list_decimal(&[Some(&[100, 200, 300]), Some(&[40, 60])], 10, 2);
let result = invoke_scalar(&udf, vec![ColumnarValue::Array(array)]);
let ColumnarValue::Array(arr) = result else {
panic!("Expected array result");
};
let dec_arr = arr
.as_any()
.downcast_ref::<Decimal128Array>()
.expect("Expected Decimal128Array");
assert_eq!(dec_arr.value(0), 200); assert_eq!(dec_arr.value(1), 50); }
fn make_large_list_of_pairs(rows: &[Option<&[(&str, i64)]>]) -> ArrayRef {
use datafusion::arrow::array::StringArray;
let mut all_keys: Vec<String> = Vec::new();
let mut all_values: Vec<i64> = Vec::new();
let mut offsets: Vec<i64> = vec![0];
let mut nulls: Vec<bool> = Vec::new();
for row in rows {
match row {
Some(pairs) => {
for (k, v) in *pairs {
all_keys.push(k.to_string());
all_values.push(*v);
}
offsets.push(all_keys.len() as i64);
nulls.push(true);
}
None => {
offsets.push(all_keys.len() as i64);
nulls.push(false);
}
}
}
let keys_array: ArrayRef = Arc::new(StringArray::from(all_keys));
let values_array: ArrayRef = Arc::new(Int64Array::from(all_values));
let struct_fields = Fields::from(vec![
Field::new("key", DataType::Utf8, false),
Field::new("value", DataType::Int64, true),
]);
let struct_array =
StructArray::try_new(struct_fields.clone(), vec![keys_array, values_array], None)
.expect("Failed to create StructArray");
let field = Arc::new(Field::new("item", DataType::Struct(struct_fields), true));
let null_buffer = datafusion::arrow::buffer::NullBuffer::from(nulls);
Arc::new(
GenericListArray::<i64>::try_new(
field,
OffsetBuffer::new(offsets.into()),
Arc::new(struct_array),
Some(null_buffer),
)
.expect("Failed to create LargeListArray of structs"),
)
}
#[test]
fn test_map_from_entries_large_list_columnar() {
use datafusion::arrow::array::MapArray;
let udf = MapFromEntriesUdf::new();
let array = make_large_list_of_pairs(&[Some(&[("a", 1), ("b", 2)]), None, Some(&[("x", 10)])]);
let result = invoke_scalar(&udf, vec![ColumnarValue::Array(array)]);
let ColumnarValue::Array(arr) = result else {
panic!("Expected array result");
};
let map_arr = arr
.as_any()
.downcast_ref::<MapArray>()
.expect("Expected MapArray");
assert_eq!(map_arr.len(), 3);
assert!(!map_arr.is_null(0));
assert!(map_arr.is_null(1));
assert!(!map_arr.is_null(2));
}
#[test]
fn test_map_from_entries_large_list_scalar() {
let udf = MapFromEntriesUdf::new();
let array = make_large_list_of_pairs(&[Some(&[("a", 1), ("b", 2)])]);
let scalar = ScalarValue::try_from_array(&array, 0).expect("Failed to create scalar");
assert!(matches!(scalar, ScalarValue::LargeList(_)));
let result = invoke_scalar(&udf, vec![ColumnarValue::Scalar(scalar)]);
let ColumnarValue::Scalar(map_scalar) = result else {
panic!("Expected scalar result");
};
assert!(matches!(map_scalar, ScalarValue::Map(_)));
}
#[test]
fn test_width_bucket_large_list_scalar_bins() {
let udf = WidthBucketArrayUdf::new();
let bins_array = make_large_list_f64(&[Some(&[10.0, 20.0, 30.0])]);
let bins_scalar = ScalarValue::try_from_array(&bins_array, 0).expect("Failed to create scalar");
assert!(matches!(bins_scalar, ScalarValue::LargeList(_)));
let x_scalar = ScalarValue::Float64(Some(15.0));
let result = invoke_scalar(
&udf,
vec![
ColumnarValue::Scalar(x_scalar),
ColumnarValue::Scalar(bins_scalar),
],
);
let ColumnarValue::Scalar(ScalarValue::Int64(Some(bucket))) = result else {
panic!("Expected Int64 scalar result");
};
assert_eq!(bucket, 1);
}
#[test]
fn test_width_bucket_large_list_columnar_bins() {
let udf = WidthBucketArrayUdf::new();
let x_scalar = ScalarValue::Float64(Some(15.0));
let bins_array = make_large_list_f64(&[Some(&[10.0, 20.0, 30.0]), Some(&[5.0, 10.0])]);
let result = invoke_scalar(
&udf,
vec![
ColumnarValue::Scalar(x_scalar),
ColumnarValue::Array(bins_array),
],
);
let ColumnarValue::Array(arr) = result else {
panic!("Expected array result");
};
let int_arr = arr
.as_any()
.downcast_ref::<Int64Array>()
.expect("Expected Int64Array");
assert_eq!(int_arr.value(0), 1); assert_eq!(int_arr.value(1), 2); }
fn make_large_list_nullable_f64(values: &[Option<&[Option<f64>]>]) -> ArrayRef {
let mut all_values: Vec<Option<f64>> = Vec::new();
let mut offsets: Vec<i64> = vec![0];
let mut nulls: Vec<bool> = Vec::new();
for row in values {
match row {
Some(vals) => {
all_values.extend_from_slice(vals);
offsets.push(all_values.len() as i64);
nulls.push(true);
}
None => {
offsets.push(all_values.len() as i64);
nulls.push(false);
}
}
}
let values_array: ArrayRef = Arc::new(Float64Array::from(all_values));
let field = Arc::new(Field::new("item", DataType::Float64, true));
let null_buffer = datafusion::arrow::buffer::NullBuffer::from(nulls);
Arc::new(
GenericListArray::<i64>::try_new(
field,
OffsetBuffer::new(offsets.into()),
values_array,
Some(null_buffer),
)
.expect("Failed to create LargeListArray"),
)
}
fn make_large_list_u32(values: &[Option<&[u32]>]) -> ArrayRef {
let mut all_values: Vec<u32> = Vec::new();
let mut offsets: Vec<i64> = vec![0];
let mut nulls: Vec<bool> = Vec::new();
for row in values {
match row {
Some(vals) => {
all_values.extend_from_slice(vals);
offsets.push(all_values.len() as i64);
nulls.push(true);
}
None => {
offsets.push(all_values.len() as i64);
nulls.push(false);
}
}
}
let values_array: ArrayRef = Arc::new(UInt32Array::from(all_values));
let field = Arc::new(Field::new("item", DataType::UInt32, true));
let null_buffer = datafusion::arrow::buffer::NullBuffer::from(nulls);
Arc::new(
GenericListArray::<i64>::try_new(
field,
OffsetBuffer::new(offsets.into()),
values_array,
Some(null_buffer),
)
.expect("Failed to create LargeListArray"),
)
}
#[test]
fn test_width_bucket_null_in_bins_scalar() {
let udf = WidthBucketArrayUdf::new();
let bins_array = make_large_list_nullable_f64(&[Some(&[Some(10.0), None, Some(30.0)])]);
let bins_scalar = ScalarValue::try_from_array(&bins_array, 0).expect("Failed to create scalar");
let x_scalar = ScalarValue::Float64(Some(15.0));
let result = invoke_scalar(
&udf,
vec![
ColumnarValue::Scalar(x_scalar),
ColumnarValue::Scalar(bins_scalar),
],
);
let ColumnarValue::Scalar(ScalarValue::Int64(bucket)) = result else {
panic!("Expected Int64 scalar result");
};
assert_eq!(bucket, None);
}
#[test]
fn test_width_bucket_null_in_bins_columnar() {
let udf = WidthBucketArrayUdf::new();
let bins_array = make_large_list_nullable_f64(&[
Some(&[Some(10.0), None, Some(30.0)]),
Some(&[Some(5.0), Some(10.0)]),
]);
let x_scalar = ScalarValue::Float64(Some(15.0));
let result = invoke_scalar(
&udf,
vec![
ColumnarValue::Scalar(x_scalar),
ColumnarValue::Array(bins_array),
],
);
let ColumnarValue::Array(arr) = result else {
panic!("Expected array result");
};
let int_arr = arr
.as_any()
.downcast_ref::<Int64Array>()
.expect("Expected Int64Array");
assert!(int_arr.is_null(0));
assert_eq!(int_arr.value(1), 2); }
#[test]
fn test_width_bucket_nan_in_bins_error() {
let udf = WidthBucketArrayUdf::new();
let bins_array = make_large_list_f64(&[Some(&[10.0, f64::NAN, 30.0])]);
let bins_scalar = ScalarValue::try_from_array(&bins_array, 0).expect("Failed to create scalar");
let x_scalar = ScalarValue::Float64(Some(15.0));
let err = try_invoke_scalar(
&udf,
vec![
ColumnarValue::Scalar(x_scalar),
ColumnarValue::Scalar(bins_scalar),
],
)
.unwrap_err();
assert!(
err.to_string().contains("NaN"),
"Expected NaN error, got: {err}"
);
}
#[test]
fn test_width_bucket_nan_x_overflow_bucket() {
let udf = WidthBucketArrayUdf::new();
let bins_array = make_large_list_f64(&[Some(&[10.0, 20.0, 30.0])]);
let bins_scalar = ScalarValue::try_from_array(&bins_array, 0).expect("Failed to create scalar");
let x_scalar = ScalarValue::Float64(Some(f64::NAN));
let result = invoke_scalar(
&udf,
vec![
ColumnarValue::Scalar(x_scalar),
ColumnarValue::Scalar(bins_scalar),
],
);
let ColumnarValue::Scalar(ScalarValue::Int64(Some(bucket))) = result else {
panic!("Expected Int64 scalar result");
};
assert_eq!(bucket, 3); }
#[test]
fn test_width_bucket_unsorted_bins_error() {
let udf = WidthBucketArrayUdf::new();
let bins_array = make_large_list_f64(&[Some(&[30.0, 10.0, 20.0])]);
let bins_scalar = ScalarValue::try_from_array(&bins_array, 0).expect("Failed to create scalar");
let x_scalar = ScalarValue::Float64(Some(15.0));
let err = try_invoke_scalar(
&udf,
vec![
ColumnarValue::Scalar(x_scalar),
ColumnarValue::Scalar(bins_scalar),
],
)
.unwrap_err();
assert!(
err.to_string().contains("sorted"),
"Expected sorted error, got: {err}"
);
}
#[test]
fn test_width_bucket_equal_consecutive_bins_ok() {
let udf = WidthBucketArrayUdf::new();
let bins_array = make_large_list_f64(&[Some(&[10.0, 10.0, 20.0])]);
let bins_scalar = ScalarValue::try_from_array(&bins_array, 0).expect("Failed to create scalar");
let x_scalar = ScalarValue::Float64(Some(10.0));
let result = invoke_scalar(
&udf,
vec![
ColumnarValue::Scalar(x_scalar),
ColumnarValue::Scalar(bins_scalar),
],
);
let ColumnarValue::Scalar(ScalarValue::Int64(Some(bucket))) = result else {
panic!("Expected Int64 scalar result");
};
assert!(bucket >= 1 && bucket <= 2);
}
#[test]
fn test_width_bucket_uint_bins_columnar() {
let udf = WidthBucketArrayUdf::new();
let bins_array = make_large_list_u32(&[Some(&[10, 20, 30])]);
let x_scalar = ScalarValue::Float64(Some(15.0));
let result = invoke_scalar(
&udf,
vec![
ColumnarValue::Scalar(x_scalar),
ColumnarValue::Array(bins_array),
],
);
let ColumnarValue::Array(arr) = result else {
panic!("Expected array result");
};
let int_arr = arr
.as_any()
.downcast_ref::<Int64Array>()
.expect("Expected Int64Array");
assert_eq!(int_arr.value(0), 1); }
#[test]
fn test_width_bucket_uint_x_array() {
let udf = WidthBucketArrayUdf::new();
let x_array: ArrayRef = Arc::new(UInt32Array::from(vec![5u32, 15, 25, 35]));
let bins_array = make_large_list_f64(&[Some(&[10.0, 20.0, 30.0])]);
let bins_scalar = ScalarValue::try_from_array(&bins_array, 0).expect("Failed to create scalar");
let result = invoke_scalar(
&udf,
vec![
ColumnarValue::Array(x_array),
ColumnarValue::Scalar(bins_scalar),
],
);
let ColumnarValue::Array(arr) = result else {
panic!("Expected array result");
};
let int_arr = arr
.as_any()
.downcast_ref::<Int64Array>()
.expect("Expected Int64Array");
assert_eq!(int_arr.value(0), 0); assert_eq!(int_arr.value(1), 1); assert_eq!(int_arr.value(2), 2); assert_eq!(int_arr.value(3), 3); }