use std::result::Result as StdResult;
use std::sync::Arc;
use datafusion::arrow::array::types::{Float32Type, Float64Type};
use datafusion::arrow::array::{
Array, ArrayRef, FixedSizeListArray, Float32Array, Float64Array, Int8Array, Int64Array,
ListArray, StructArray,
};
use datafusion::arrow::datatypes::{DataType, Field, FieldRef};
use datafusion::common::ScalarValue;
use datafusion::common::utils::arrays_into_list_array;
use datafusion::logical_expr::ColumnarValue;
use nabled::core::prelude::NabledReal;
use ndarray::{Array1, Array2, Array3, Array4, Axis, Ix1, Ix2, Ix3, Ix4};
use ndarrow::NdarrowElement;
use num_complex::Complex64;
use crate::metadata::{complex_vector_field, vector_field};
use crate::udf::common::invoke_udf;
use crate::udfs;
fn assert_close(actual: f64, expected: f64) {
let delta = (actual - expected).abs();
assert!(delta < 1.0e-9, "expected {expected}, got {actual}, delta {delta}");
}
fn assert_close_eps(actual: f64, expected: f64, epsilon: f64) {
let delta = (actual - expected).abs();
assert!(delta < epsilon, "expected {expected}, got {actual}, delta {delta}, epsilon {epsilon}");
}
fn assert_complex_close(actual: Complex64, expected: Complex64) {
assert_close(actual.re, expected.re);
assert_close(actual.im, expected.im);
}
fn fixed_size_list<const R: usize, const C: usize>(rows: [[f64; C]; R]) -> FixedSizeListArray {
FixedSizeListArray::from_iter_primitive::<Float64Type, _, _>(
rows.into_iter().map(|row| Some(row.into_iter().map(Some).collect::<Vec<_>>())),
i32::try_from(C).expect("fixed-size-list width should fit i32"),
)
}
fn fixed_size_list_f32<const R: usize, const C: usize>(rows: [[f32; C]; R]) -> FixedSizeListArray {
FixedSizeListArray::from_iter_primitive::<Float32Type, _, _>(
rows.into_iter().map(|row| Some(row.into_iter().map(Some).collect::<Vec<_>>())),
i32::try_from(C).expect("fixed-size-list width should fit i32"),
)
}
fn complex_vector_batch<const R: usize, const C: usize>(
name: &str,
rows: [[Complex64; C]; R],
) -> (FieldRef, FixedSizeListArray) {
let values = rows.into_iter().flatten().collect::<Vec<_>>();
let array = Array2::from_shape_vec((R, C), values).expect("complex vector batch shape");
let array = ndarrow::array2_complex64_to_fixed_size_list(array).expect("complex vector batch");
let field = complex_vector_field(name, C, false).expect("complex vector field");
(field, array)
}
fn matrix_batch<const B: usize, const R: usize, const C: usize>(
name: &str,
values: [[[f64; C]; R]; B],
) -> (FieldRef, FixedSizeListArray) {
let values = values.into_iter().flatten().flatten().collect::<Vec<f64>>();
let array = Array3::from_shape_vec((B, R, C), values).expect("matrix batch shape");
let (field, array) =
ndarrow::arrayd_to_fixed_shape_tensor(name, array.into_dyn()).expect("matrix batch");
(Arc::new(field), array)
}
fn matrix_batch_f32<const B: usize, const R: usize, const C: usize>(
name: &str,
values: [[[f32; C]; R]; B],
) -> (FieldRef, FixedSizeListArray) {
let values = values.into_iter().flatten().flatten().collect::<Vec<f32>>();
let array = Array3::from_shape_vec((B, R, C), values).expect("matrix batch shape");
let (field, array) =
ndarrow::arrayd_to_fixed_shape_tensor(name, array.into_dyn()).expect("matrix batch");
(Arc::new(field), array)
}
fn complex_matrix_batch<const B: usize, const R: usize, const C: usize>(
name: &str,
values: [[[Complex64; C]; R]; B],
) -> (FieldRef, FixedSizeListArray) {
let values = values.into_iter().flatten().flatten().collect::<Vec<_>>();
let array = Array3::from_shape_vec((B, R, C), values).expect("complex matrix batch shape");
let (field, array) = ndarrow::arrayd_complex64_to_fixed_shape_tensor(name, array.into_dyn())
.expect("complex matrix batch");
(Arc::new(field), array)
}
fn ragged_vectors(name: &str, rows: Vec<Vec<f64>>) -> (FieldRef, StructArray) {
let rows = rows
.into_iter()
.map(Array1::from_vec)
.map(ndarray::ArrayBase::into_dyn)
.collect::<Vec<_>>();
let (field, array) =
ndarrow::arrays_to_variable_shape_tensor(name, rows, Some(vec![None])).expect("ragged");
(Arc::new(field), array)
}
fn ragged_vectors_f32(name: &str, rows: Vec<Vec<f32>>) -> (FieldRef, StructArray) {
let rows = rows
.into_iter()
.map(Array1::from_vec)
.map(ndarray::ArrayBase::into_dyn)
.collect::<Vec<_>>();
let (field, array) =
ndarrow::arrays_to_variable_shape_tensor(name, rows, Some(vec![None])).expect("ragged");
(Arc::new(field), array)
}
fn ragged_matrices(name: &str, rows: Vec<Vec<Vec<f64>>>) -> (FieldRef, StructArray) {
let rows = rows
.into_iter()
.map(|matrix| {
let row_count = matrix.len();
let col_count = matrix.first().map_or(0, Vec::len);
let values = matrix.into_iter().flatten().collect::<Vec<_>>();
Array2::from_shape_vec((row_count, col_count), values)
.expect("matrix batch shape")
.into_dyn()
})
.collect::<Vec<_>>();
let (field, array) =
ndarrow::arrays_to_variable_shape_tensor(name, rows, Some(vec![None, None]))
.expect("ragged matrices");
(Arc::new(field), array)
}
fn ragged_matrices_f32(name: &str, rows: Vec<Vec<Vec<f32>>>) -> (FieldRef, StructArray) {
let rows = rows
.into_iter()
.map(|matrix| {
let row_count = matrix.len();
let col_count = matrix.first().map_or(0, Vec::len);
let values = matrix.into_iter().flatten().collect::<Vec<_>>();
Array2::from_shape_vec((row_count, col_count), values)
.expect("matrix batch shape")
.into_dyn()
})
.collect::<Vec<_>>();
let (field, array) =
ndarrow::arrays_to_variable_shape_tensor(name, rows, Some(vec![None, None]))
.expect("ragged matrices");
(Arc::new(field), array)
}
fn complex_ragged_matrices(name: &str, rows: Vec<Vec<Vec<Complex64>>>) -> (FieldRef, StructArray) {
let rows = rows
.into_iter()
.map(|matrix| {
let row_count = matrix.len();
let col_count = matrix.first().map_or(0, Vec::len);
let values = matrix.into_iter().flatten().collect::<Vec<_>>();
Array2::from_shape_vec((row_count, col_count), values)
.expect("complex matrix batch shape")
.into_dyn()
})
.collect::<Vec<_>>();
let (field, array) = ndarrow::arrays_complex64_to_variable_shape_tensor(name, rows, None)
.expect("complex ragged matrices");
(Arc::new(field), array)
}
fn tensor_batch4<const B: usize, const D0: usize, const D1: usize, const D2: usize>(
name: &str,
values: [[[[f64; D2]; D1]; D0]; B],
) -> (FieldRef, FixedSizeListArray) {
let values = values.into_iter().flatten().flatten().flatten().collect::<Vec<f64>>();
let array = Array4::from_shape_vec((B, D0, D1, D2), values).expect("tensor batch shape");
let (field, array) =
ndarrow::arrayd_to_fixed_shape_tensor(name, array.into_dyn()).expect("tensor batch");
(Arc::new(field), array)
}
fn tensor_batch4_f32<const B: usize, const D0: usize, const D1: usize, const D2: usize>(
name: &str,
values: [[[[f32; D2]; D1]; D0]; B],
) -> (FieldRef, FixedSizeListArray) {
let values = values.into_iter().flatten().flatten().flatten().collect::<Vec<f32>>();
let array = Array4::from_shape_vec((B, D0, D1, D2), values).expect("tensor batch shape");
let (field, array) =
ndarrow::arrayd_to_fixed_shape_tensor(name, array.into_dyn()).expect("tensor batch");
(Arc::new(field), array)
}
fn complex_tensor_batch4<const B: usize, const D0: usize, const D1: usize, const D2: usize>(
name: &str,
values: [[[[Complex64; D2]; D1]; D0]; B],
) -> (FieldRef, FixedSizeListArray) {
let values = values.into_iter().flatten().flatten().flatten().collect::<Vec<_>>();
let array =
Array4::from_shape_vec((B, D0, D1, D2), values).expect("complex tensor batch shape");
let (field, array) = ndarrow::arrayd_complex64_to_fixed_shape_tensor(name, array.into_dyn())
.expect("complex tensor batch");
(Arc::new(field), array)
}
fn sparse_batch(name: &str) -> (FieldRef, StructArray) {
let (field, array) = ndarrow::csr_batch_to_extension_array(
name,
vec![[2, 3], [2, 2]],
vec![vec![0, 2, 3], vec![0, 1, 3]],
vec![vec![0, 2, 1], vec![0, 0, 1]],
vec![vec![1.0, 2.0, 3.0], vec![4.0, 5.0, 6.0]],
)
.expect("sparse batch");
(Arc::new(field), array)
}
fn sparse_batch_f32(name: &str) -> (FieldRef, StructArray) {
let (field, array) = ndarrow::csr_batch_to_extension_array(
name,
vec![[2, 3], [2, 2]],
vec![vec![0, 2, 3], vec![0, 1, 3]],
vec![vec![0, 2, 1], vec![0, 0, 1]],
vec![vec![1.0_f32, 2.0, 3.0], vec![4.0_f32, 5.0, 6.0]],
)
.expect("sparse batch");
(Arc::new(field), array)
}
fn sparse_batch_rhs(name: &str) -> (FieldRef, StructArray) {
let (field, array) = ndarrow::csr_batch_to_extension_array(
name,
vec![[3, 2], [2, 2]],
vec![vec![0, 1, 2, 4], vec![0, 1, 2]],
vec![vec![0, 1, 0, 1], vec![0, 1]],
vec![vec![1.0, 1.0, 1.0, 1.0], vec![1.0, 1.0]],
)
.expect("sparse rhs batch");
(Arc::new(field), array)
}
fn sparse_batch_rhs_f32(name: &str) -> (FieldRef, StructArray) {
let (field, array) = ndarrow::csr_batch_to_extension_array(
name,
vec![[3, 2], [2, 2]],
vec![vec![0, 1, 2, 4], vec![0, 1, 2]],
vec![vec![0, 1, 0, 1], vec![0, 1]],
vec![vec![1.0_f32, 1.0, 1.0, 1.0], vec![1.0_f32, 1.0]],
)
.expect("sparse rhs batch");
(Arc::new(field), array)
}
fn float64_list_array(rows: Vec<Vec<f64>>) -> ListArray {
ListArray::from_iter_primitive::<Float64Type, _, _>(
rows.into_iter().map(|row| Some(row.into_iter().map(Some).collect::<Vec<_>>())),
)
}
fn float32_list_array(rows: Vec<Vec<f32>>) -> ListArray {
ListArray::from_iter_primitive::<Float32Type, _, _>(
rows.into_iter().map(|row| Some(row.into_iter().map(Some).collect::<Vec<_>>())),
)
}
fn int32_list_array(rows: Vec<Vec<i32>>) -> ListArray {
ListArray::from_iter_primitive::<datafusion::arrow::array::types::Int32Type, _, _>(
rows.into_iter().map(|row| Some(row.into_iter().map(Some).collect::<Vec<_>>())),
)
}
fn u32_list_array(rows: Vec<Vec<u32>>) -> ListArray {
ListArray::from_iter_primitive::<datafusion::arrow::array::types::UInt32Type, _, _>(
rows.into_iter().map(|row| Some(row.into_iter().map(Some).collect::<Vec<_>>())),
)
}
fn scalar_float64_list(values: Vec<f64>) -> ScalarValue {
ScalarValue::List(Arc::new(float64_list_array(vec![values])))
}
fn scalar_float32_list(values: Vec<f32>) -> ScalarValue {
ScalarValue::List(Arc::new(float32_list_array(vec![values])))
}
fn scalar_int32_list(values: Vec<i32>) -> ScalarValue {
ScalarValue::List(Arc::new(int32_list_array(vec![values])))
}
fn scalar_nested_float64_list(rows: Vec<Vec<f64>>) -> ScalarValue {
let nested = float64_list_array(rows);
let wrapped =
arrays_into_list_array([Arc::new(nested) as ArrayRef]).expect("single nested list scalar");
ScalarValue::List(Arc::new(wrapped))
}
fn scalar_nested_float32_list(rows: Vec<Vec<f32>>) -> ScalarValue {
let nested = float32_list_array(rows);
let wrapped =
arrays_into_list_array([Arc::new(nested) as ArrayRef]).expect("single nested list scalar");
ScalarValue::List(Arc::new(wrapped))
}
fn f64_array(values: &ColumnarValue) -> &Float64Array {
let ColumnarValue::Array(array) = values else {
panic!("expected array output");
};
array.as_any().downcast_ref::<Float64Array>().expect("expected Float64Array")
}
fn f32_array(values: &ColumnarValue) -> &Float32Array {
let ColumnarValue::Array(array) = values else {
panic!("expected array output");
};
array.as_any().downcast_ref::<Float32Array>().expect("expected Float32Array")
}
fn fixed_size_list_array(values: &ColumnarValue) -> &FixedSizeListArray {
let ColumnarValue::Array(array) = values else {
panic!("expected array output");
};
array.as_any().downcast_ref::<FixedSizeListArray>().expect("expected FixedSizeListArray")
}
fn struct_array(values: &ColumnarValue) -> &StructArray {
let ColumnarValue::Array(array) = values else {
panic!("expected array output");
};
array.as_any().downcast_ref::<StructArray>().expect("expected StructArray")
}
fn array_data_type(values: &ColumnarValue) -> &DataType {
let ColumnarValue::Array(array) = values else {
panic!("expected array output");
};
array.data_type()
}
fn fixed_shape_view3<'a>(
field: &'a FieldRef,
values: &'a ColumnarValue,
) -> ndarray::ArrayView3<'a, f64> {
fixed_shape_viewd(field, values).into_dimensionality::<Ix3>().expect("rank-3 tensor")
}
fn fixed_shape_viewd<'a>(
field: &'a FieldRef,
values: &'a ColumnarValue,
) -> ndarray::ArrayViewD<'a, f64> {
ndarrow::fixed_shape_tensor_as_array_viewd::<Float64Type>(
field.as_ref(),
fixed_size_list_array(values),
)
.expect("fixed-shape tensor")
}
fn fixed_shape_viewd_f32<'a>(
field: &'a FieldRef,
values: &'a ColumnarValue,
) -> ndarray::ArrayViewD<'a, f32> {
ndarrow::fixed_shape_tensor_as_array_viewd::<Float32Type>(
field.as_ref(),
fixed_size_list_array(values),
)
.expect("fixed-shape tensor")
}
fn complex_fixed_shape_view3<'a>(
field: &'a FieldRef,
values: &'a ColumnarValue,
) -> ndarray::ArrayView3<'a, Complex64> {
ndarrow::complex64_fixed_shape_tensor_as_array_viewd(
field.as_ref(),
fixed_size_list_array(values),
)
.expect("complex fixed-shape tensor")
.into_dimensionality::<Ix3>()
.expect("rank-3 complex tensor")
}
fn complex_fixed_shape_viewd<'a>(
field: &'a FieldRef,
values: &'a ColumnarValue,
) -> ndarray::ArrayViewD<'a, Complex64> {
ndarrow::complex64_fixed_shape_tensor_as_array_viewd(
field.as_ref(),
fixed_size_list_array(values),
)
.expect("complex fixed-shape tensor")
}
fn fixed_shape_view4<'a>(
field: &'a FieldRef,
values: &'a ColumnarValue,
) -> ndarray::ArrayView4<'a, f64> {
fixed_shape_viewd(field, values).into_dimensionality::<Ix4>().expect("rank-4 tensor")
}
fn fixed_shape_view4_f32<'a>(
field: &'a FieldRef,
values: &'a ColumnarValue,
) -> ndarray::ArrayView4<'a, f32> {
fixed_shape_viewd_f32(field, values).into_dimensionality::<Ix4>().expect("rank-4 tensor")
}
fn variable_shape_rows<'a>(
field: &'a FieldRef,
values: &'a ColumnarValue,
) -> ndarrow::VariableShapeTensorIter<'a, Float64Type> {
ndarrow::variable_shape_tensor_iter::<Float64Type>(field.as_ref(), struct_array(values))
.expect("variable-shape tensor")
}
fn variable_shape_rows_f32<'a>(
field: &'a FieldRef,
values: &'a ColumnarValue,
) -> ndarrow::VariableShapeTensorIter<'a, Float32Type> {
ndarrow::variable_shape_tensor_iter::<Float32Type>(field.as_ref(), struct_array(values))
.expect("variable-shape tensor")
}
fn csr_to_dense(view: &ndarrow::CsrView<'_, f64>) -> Array2<f64> {
let mut dense = Array2::<f64>::zeros((view.nrows, view.ncols));
for row in 0..view.nrows {
let start = usize::try_from(view.row_ptrs[row]).expect("row ptr start");
let end = usize::try_from(view.row_ptrs[row + 1]).expect("row ptr end");
for offset in start..end {
let col = usize::try_from(view.col_indices[offset]).expect("column index");
dense[[row, col]] = view.values[offset];
}
}
dense
}
fn invoke_udf_error(
udf: &Arc<datafusion::logical_expr::ScalarUDF>,
args: Vec<ColumnarValue>,
arg_fields: Vec<FieldRef>,
scalar_arguments: &[Option<ScalarValue>],
number_rows: usize,
) -> String {
invoke_udf(udf, args, arg_fields, scalar_arguments, number_rows)
.expect_err("expected UDF invocation to fail")
.to_string()
}
fn assert_eigen_struct_output(return_field: &FieldRef, output: &ColumnarValue, min: f64, max: f64) {
let DataType::Struct(fields) = return_field.data_type() else {
panic!("expected eigen struct return");
};
let output = struct_array(output);
let eigenvalues =
output.column(0).as_any().downcast_ref::<FixedSizeListArray>().expect("eigenvalues");
let eigenvalues =
ndarrow::fixed_size_list_as_array2::<Float64Type>(eigenvalues).expect("eigenvalue tensor");
let eigenvectors =
output.column(1).as_any().downcast_ref::<FixedSizeListArray>().expect("eigenvectors");
let eigenvectors =
ndarrow::fixed_shape_tensor_as_array_viewd::<Float64Type>(&fields[1], eigenvectors)
.expect("eigenvector tensor")
.into_dimensionality::<Ix3>()
.expect("rank-3 eigenvectors");
assert_close(eigenvalues.row(0).iter().copied().fold(f64::INFINITY, f64::min), min);
assert_close(eigenvalues.row(0).iter().copied().fold(f64::NEG_INFINITY, f64::max), max);
assert_close(eigenvectors.iter().map(|value| value.abs()).sum::<f64>(), 2.0);
}
fn assert_two_tensor_struct_output(
return_field: &FieldRef,
output: &ColumnarValue,
min: f64,
max: f64,
) {
let DataType::Struct(fields) = return_field.data_type() else {
panic!("expected two-tensor struct return");
};
let output = struct_array(output);
let first =
output.column(0).as_any().downcast_ref::<FixedSizeListArray>().expect("first tensor");
let first = ndarrow::fixed_shape_tensor_as_array_viewd::<Float64Type>(&fields[0], first)
.expect("first tensor")
.into_dimensionality::<Ix3>()
.expect("rank-3 first tensor");
let second =
output.column(1).as_any().downcast_ref::<FixedSizeListArray>().expect("second tensor");
let second = ndarrow::fixed_shape_tensor_as_array_viewd::<Float64Type>(&fields[1], second)
.expect("second tensor")
.into_dimensionality::<Ix3>()
.expect("rank-3 second tensor");
let diagonal = [second[[0, 0, 0]], second[[0, 1, 1]]];
assert_close(first.iter().map(|value| value.abs()).sum::<f64>(), 2.0);
assert_close(second[[0, 0, 1]], 0.0);
assert_close(second[[0, 1, 0]], 0.0);
assert_close(diagonal.into_iter().fold(f64::INFINITY, f64::min), min);
assert_close(diagonal.into_iter().fold(f64::NEG_INFINITY, f64::max), max);
}
fn assert_orthogonal_matrix_output(return_field: &FieldRef, output: &ColumnarValue) {
let orthogonal = fixed_shape_view3(return_field, output);
assert_close(orthogonal[[0, 0, 0]], 1.0);
assert_close(orthogonal[[0, 1, 1]], 1.0);
}
fn assert_tensor_vector_struct_output(
return_field: &FieldRef,
output: &ColumnarValue,
diagonal: [f64; 2],
) {
let DataType::Struct(fields) = return_field.data_type() else {
panic!("expected tensor-vector struct return");
};
let output = struct_array(output);
let balanced =
output.column(0).as_any().downcast_ref::<FixedSizeListArray>().expect("balanced tensor");
let balanced = ndarrow::fixed_shape_tensor_as_array_viewd::<Float64Type>(&fields[0], balanced)
.expect("balanced tensor")
.into_dimensionality::<Ix3>()
.expect("rank-3 balanced tensor");
let scaling =
output.column(1).as_any().downcast_ref::<FixedSizeListArray>().expect("diagonal vector");
let scaling =
ndarrow::fixed_size_list_as_array2::<Float64Type>(scaling).expect("diagonal vector");
assert_close(balanced[[0, 0, 0]], diagonal[0]);
assert_close(balanced[[0, 1, 1]], diagonal[1]);
assert_close(scaling[[0, 0]], 1.0);
assert_close(scaling[[0, 1]], 1.0);
}
fn assert_pca_scores_from_struct<'a>(
return_field: &'a FieldRef,
output: &'a ColumnarValue,
) -> ndarray::ArrayView3<'a, f64> {
let DataType::Struct(fields) = return_field.data_type() else {
panic!("expected PCA struct return");
};
let output = struct_array(output);
let scores = output.column(4).as_any().downcast_ref::<FixedSizeListArray>().expect("scores");
ndarrow::fixed_shape_tensor_as_array_viewd::<Float64Type>(&fields[4], scores)
.expect("scores")
.into_dimensionality::<Ix3>()
.expect("rank-3 scores")
}
fn assert_complex_pca_scores_from_struct<'a>(
return_field: &'a FieldRef,
output: &'a ColumnarValue,
) -> ndarray::ArrayView3<'a, Complex64> {
let DataType::Struct(fields) = return_field.data_type() else {
panic!("expected complex PCA struct return");
};
let output = struct_array(output);
let scores = output.column(4).as_any().downcast_ref::<FixedSizeListArray>().expect("scores");
ndarrow::complex64_fixed_shape_tensor_as_array_viewd(&fields[4], scores)
.expect("complex scores")
.into_dimensionality::<Ix3>()
.expect("rank-3 complex scores")
}
#[test]
fn constructor_udfs_build_fixed_shape_contracts() {
let make_vector_udf = udfs::make_vector_udf();
let make_matrix_udf = udfs::make_matrix_udf();
let make_tensor_udf = udfs::make_tensor_udf();
let vector_values = scalar_float64_list(vec![3.0, 4.0]);
let vector_field =
Arc::new(Field::new("values", DataType::new_list(DataType::Float64, false), false));
let (vector_return_field, vector_output) = invoke_udf(
&make_vector_udf,
vec![
ColumnarValue::Scalar(vector_values.clone()),
ColumnarValue::Scalar(ScalarValue::Int64(Some(2))),
],
vec![Arc::clone(&vector_field), Arc::new(Field::new("dim", DataType::Int64, false))],
&[Some(vector_values), Some(ScalarValue::Int64(Some(2)))],
1,
)
.expect("make_vector");
assert_eq!(
vector_return_field.data_type(),
&DataType::new_fixed_size_list(DataType::Float64, 2, false)
);
let vector_output =
ndarrow::fixed_size_list_as_array2::<Float64Type>(fixed_size_list_array(&vector_output))
.expect("vector output");
assert_close(vector_output[[0, 0]], 3.0);
assert_close(vector_output[[0, 1]], 4.0);
let matrix_values = scalar_nested_float64_list(vec![vec![1.0, 2.0], vec![3.0, 4.0]]);
let matrix_field = Arc::new(Field::new(
"matrix_values",
DataType::new_list(DataType::new_list(DataType::Float64, false), false),
false,
));
let (matrix_return_field, matrix_output) = invoke_udf(
&make_matrix_udf,
vec![
ColumnarValue::Scalar(matrix_values.clone()),
ColumnarValue::Scalar(ScalarValue::Int64(Some(2))),
ColumnarValue::Scalar(ScalarValue::Int64(Some(2))),
],
vec![
Arc::clone(&matrix_field),
Arc::new(Field::new("rows", DataType::Int64, false)),
Arc::new(Field::new("cols", DataType::Int64, false)),
],
&[
Some(matrix_values),
Some(ScalarValue::Int64(Some(2))),
Some(ScalarValue::Int64(Some(2))),
],
1,
)
.expect("make_matrix");
let matrix_output = fixed_shape_view3(&matrix_return_field, &matrix_output);
assert_close(matrix_output[[0, 0, 0]], 1.0);
assert_close(matrix_output[[0, 0, 1]], 2.0);
assert_close(matrix_output[[0, 1, 0]], 3.0);
assert_close(matrix_output[[0, 1, 1]], 4.0);
let tensor_values =
float64_list_array(vec![vec![1.0, 2.0, 3.0, 4.0], vec![5.0, 6.0, 7.0, 8.0]]);
let (tensor_return_field, tensor_output) = invoke_udf(
&make_tensor_udf,
vec![
ColumnarValue::Array(Arc::new(tensor_values)),
ColumnarValue::Scalar(ScalarValue::Int64(Some(2))),
ColumnarValue::Scalar(ScalarValue::Int64(Some(2))),
],
vec![
Arc::new(Field::new(
"tensor_values",
DataType::new_list(DataType::Float64, false),
false,
)),
Arc::new(Field::new("dim0", DataType::Int64, false)),
Arc::new(Field::new("dim1", DataType::Int64, false)),
],
&[None, Some(ScalarValue::Int64(Some(2))), Some(ScalarValue::Int64(Some(2)))],
2,
)
.expect("make_tensor");
let tensor_output = fixed_shape_view3(&tensor_return_field, &tensor_output);
assert_close(tensor_output[[0, 0, 0]], 1.0);
assert_close(tensor_output[[0, 1, 1]], 4.0);
assert_close(tensor_output[[1, 0, 0]], 5.0);
assert_close(tensor_output[[1, 1, 1]], 8.0);
}
#[test]
fn constructor_udfs_build_variable_and_sparse_contracts() {
let make_variable_tensor_udf = udfs::make_variable_tensor_udf();
let make_csr_matrix_batch_udf = udfs::make_csr_matrix_batch_udf();
let variable_data = float64_list_array(vec![vec![1.0, 2.0, 3.0, 4.0], vec![5.0, 6.0, 7.0]]);
let variable_shape = int32_list_array(vec![vec![2, 2], vec![1, 3]]);
let (variable_return_field, variable_output) = invoke_udf(
&make_variable_tensor_udf,
vec![
ColumnarValue::Array(Arc::new(variable_data)),
ColumnarValue::Array(Arc::new(variable_shape)),
ColumnarValue::Scalar(ScalarValue::Int64(Some(2))),
],
vec![
Arc::new(Field::new("data", DataType::new_list(DataType::Float64, false), false)),
Arc::new(Field::new("shape", DataType::new_list(DataType::Int32, false), false)),
Arc::new(Field::new("rank", DataType::Int64, false)),
],
&[None, None, Some(ScalarValue::Int64(Some(2)))],
2,
)
.expect("make_variable_tensor");
let mut variable_rows = variable_shape_rows(&variable_return_field, &variable_output);
let (_, tensor0) = variable_rows.next().expect("row0").expect("row0 tensor");
let tensor0 = tensor0.into_dimensionality::<Ix2>().expect("rank-2 tensor");
let (_, tensor1) = variable_rows.next().expect("row1").expect("row1 tensor");
let tensor1 = tensor1.into_dimensionality::<Ix2>().expect("rank-2 tensor");
assert_close(tensor0[[0, 0]], 1.0);
assert_close(tensor0[[1, 1]], 4.0);
assert_close(tensor1[[0, 0]], 5.0);
assert_close(tensor1[[0, 2]], 7.0);
let csr_shape = scalar_int32_list(vec![2, 3]);
let csr_row_ptrs = scalar_int32_list(vec![0, 2, 3]);
let csr_col_indices = ScalarValue::List(Arc::new(u32_list_array(vec![vec![0, 2, 1]])));
let csr_values = scalar_float64_list(vec![1.0, 2.0, 3.0]);
let (csr_return_field, csr_output) = invoke_udf(
&make_csr_matrix_batch_udf,
vec![
ColumnarValue::Scalar(csr_shape.clone()),
ColumnarValue::Scalar(csr_row_ptrs.clone()),
ColumnarValue::Scalar(csr_col_indices.clone()),
ColumnarValue::Scalar(csr_values.clone()),
],
vec![
Arc::new(Field::new("shape", DataType::new_list(DataType::Int32, false), false)),
Arc::new(Field::new("row_ptrs", DataType::new_list(DataType::Int32, false), false)),
Arc::new(Field::new("col_indices", DataType::new_list(DataType::UInt32, false), false)),
Arc::new(Field::new("values", DataType::new_list(DataType::Float64, false), false)),
],
&[Some(csr_shape), Some(csr_row_ptrs), Some(csr_col_indices), Some(csr_values)],
1,
)
.expect("make_csr_matrix_batch");
let mut csr_rows = ndarrow::csr_matrix_batch_iter::<Float64Type>(
csr_return_field.as_ref(),
struct_array(&csr_output),
)
.expect("csr batch output");
let (_, csr0) = csr_rows.next().expect("row0").expect("row0 csr");
let csr0 = csr_to_dense(&csr0);
assert_eq!(csr0.shape(), &[2, 3]);
assert_close(csr0[[0, 0]], 1.0);
assert_close(csr0[[0, 2]], 2.0);
assert_close(csr0[[1, 1]], 3.0);
}
#[test]
fn constructor_udfs_reject_invalid_shapes() {
let make_vector_udf = udfs::make_vector_udf();
let make_matrix_udf = udfs::make_matrix_udf();
let make_variable_tensor_udf = udfs::make_variable_tensor_udf();
let make_csr_matrix_batch_udf = udfs::make_csr_matrix_batch_udf();
let vector_error = invoke_udf_error(
&make_vector_udf,
vec![
ColumnarValue::Array(Arc::new(float64_list_array(vec![vec![1.0, 2.0, 3.0]]))),
ColumnarValue::Scalar(ScalarValue::Int64(Some(2))),
],
vec![
Arc::new(Field::new("values", DataType::new_list(DataType::Float64, false), false)),
Arc::new(Field::new("dim", DataType::Int64, false)),
],
&[None, Some(ScalarValue::Int64(Some(2)))],
1,
);
assert!(vector_error.contains("expected length 2"));
let matrix_error = invoke_udf_error(
&make_matrix_udf,
vec![
ColumnarValue::Scalar(scalar_nested_float64_list(vec![vec![1.0], vec![2.0, 3.0]])),
ColumnarValue::Scalar(ScalarValue::Int64(Some(2))),
ColumnarValue::Scalar(ScalarValue::Int64(Some(2))),
],
vec![
Arc::new(Field::new(
"matrix_values",
DataType::new_list(DataType::new_list(DataType::Float64, false), false),
false,
)),
Arc::new(Field::new("rows", DataType::Int64, false)),
Arc::new(Field::new("cols", DataType::Int64, false)),
],
&[
Some(scalar_nested_float64_list(vec![vec![1.0], vec![2.0, 3.0]])),
Some(ScalarValue::Int64(Some(2))),
Some(ScalarValue::Int64(Some(2))),
],
1,
);
assert!(matrix_error.contains("nested row 0 expected width 2"));
let variable_error = invoke_udf_error(
&make_variable_tensor_udf,
vec![
ColumnarValue::Array(Arc::new(float64_list_array(vec![vec![1.0, 2.0]]))),
ColumnarValue::Array(Arc::new(int32_list_array(vec![vec![-1, 2]]))),
ColumnarValue::Scalar(ScalarValue::Int64(Some(2))),
],
vec![
Arc::new(Field::new("data", DataType::new_list(DataType::Float64, false), false)),
Arc::new(Field::new("shape", DataType::new_list(DataType::Int32, false), false)),
Arc::new(Field::new("rank", DataType::Int64, false)),
],
&[None, None, Some(ScalarValue::Int64(Some(2)))],
1,
);
assert!(variable_error.contains("negative dimension"));
let csr_error = invoke_udf_error(
&make_csr_matrix_batch_udf,
vec![
ColumnarValue::Array(Arc::new(int32_list_array(vec![vec![2, 3]]))),
ColumnarValue::Array(Arc::new(int32_list_array(vec![vec![0, 1]]))),
ColumnarValue::Array(Arc::new(u32_list_array(vec![vec![0, 1, 2]]))),
ColumnarValue::Array(Arc::new(float64_list_array(vec![vec![1.0, 2.0, 3.0]]))),
],
vec![
Arc::new(Field::new("shape", DataType::new_list(DataType::Int32, false), false)),
Arc::new(Field::new("row_ptrs", DataType::new_list(DataType::Int32, false), false)),
Arc::new(Field::new("col_indices", DataType::new_list(DataType::UInt32, false), false)),
Arc::new(Field::new("values", DataType::new_list(DataType::Float64, false), false)),
],
&[None, None, None, None],
1,
);
assert!(csr_error.contains("row_ptrs"));
}
#[test]
fn constructor_udfs_support_empty_batches() {
let make_variable_tensor_udf = udfs::make_variable_tensor_udf();
let make_csr_matrix_batch_udf = udfs::make_csr_matrix_batch_udf();
let (variable_return_field, variable_output) = invoke_udf(
&make_variable_tensor_udf,
vec![
ColumnarValue::Array(Arc::new(float64_list_array(Vec::new()))),
ColumnarValue::Array(Arc::new(int32_list_array(Vec::new()))),
ColumnarValue::Scalar(ScalarValue::Int64(Some(2))),
],
vec![
Arc::new(Field::new("data", DataType::new_list(DataType::Float64, false), false)),
Arc::new(Field::new("shape", DataType::new_list(DataType::Int32, false), false)),
Arc::new(Field::new("rank", DataType::Int64, false)),
],
&[None, None, Some(ScalarValue::Int64(Some(2)))],
0,
)
.expect("empty variable tensor batch");
assert_eq!(variable_return_field.extension_type_name(), Some("arrow.variable_shape_tensor"));
assert_eq!(struct_array(&variable_output).len(), 0);
let (csr_return_field, csr_output) = invoke_udf(
&make_csr_matrix_batch_udf,
vec![
ColumnarValue::Array(Arc::new(int32_list_array(Vec::new()))),
ColumnarValue::Array(Arc::new(int32_list_array(Vec::new()))),
ColumnarValue::Array(Arc::new(u32_list_array(Vec::new()))),
ColumnarValue::Array(Arc::new(float64_list_array(Vec::new()))),
],
vec![
Arc::new(Field::new("shape", DataType::new_list(DataType::Int32, false), false)),
Arc::new(Field::new("row_ptrs", DataType::new_list(DataType::Int32, false), false)),
Arc::new(Field::new("col_indices", DataType::new_list(DataType::UInt32, false), false)),
Arc::new(Field::new("values", DataType::new_list(DataType::Float64, false), false)),
],
&[None, None, None, None],
0,
)
.expect("empty csr batch");
assert_eq!(csr_return_field.extension_type_name(), Some("ndarrow.csr_matrix_batch"));
assert_eq!(struct_array(&csr_output).len(), 0);
}
#[test]
fn vector_udfs_cover_real_batch_ops() {
let left = fixed_size_list([[3.0, 4.0, 0.0], [1.0, 2.0, 2.0]]);
let right = fixed_size_list([[4.0, 0.0, 3.0], [2.0, 2.0, 1.0]]);
let left_field = vector_field("left", &DataType::Float64, 3, false).expect("vector field");
let right_field = vector_field("right", &DataType::Float64, 3, false).expect("vector field");
let l2_norm_udf = udfs::vector_l2_norm_udf();
let dot_udf = udfs::vector_dot_udf();
let cosine_similarity_udf = udfs::vector_cosine_similarity_udf();
let cosine_distance_udf = udfs::vector_cosine_distance_udf();
let normalize_udf = udfs::vector_normalize_udf();
let (_, norms) = invoke_udf(
&l2_norm_udf,
vec![ColumnarValue::Array(Arc::new(left.clone()))],
vec![Arc::clone(&left_field)],
&[None],
2,
)
.expect("vector_l2_norm");
assert_close(f64_array(&norms).value(0), 5.0);
assert_close(f64_array(&norms).value(1), 3.0);
let (_, dots) = invoke_udf(
&dot_udf,
vec![
ColumnarValue::Array(Arc::new(left.clone())),
ColumnarValue::Array(Arc::new(right.clone())),
],
vec![Arc::clone(&left_field), Arc::clone(&right_field)],
&[None, None],
2,
)
.expect("vector_dot");
assert_close(f64_array(&dots).value(0), 12.0);
assert_close(f64_array(&dots).value(1), 8.0);
let (_, similarities) = invoke_udf(
&cosine_similarity_udf,
vec![
ColumnarValue::Array(Arc::new(left.clone())),
ColumnarValue::Array(Arc::new(right.clone())),
],
vec![Arc::clone(&left_field), Arc::clone(&right_field)],
&[None, None],
2,
)
.expect("vector_cosine_similarity");
assert_close(f64_array(&similarities).value(0), 0.48);
assert_close(f64_array(&similarities).value(1), 8.0 / 9.0);
let (_, distances) = invoke_udf(
&cosine_distance_udf,
vec![
ColumnarValue::Array(Arc::new(left.clone())),
ColumnarValue::Array(Arc::new(right.clone())),
],
vec![Arc::clone(&left_field), Arc::clone(&right_field)],
&[None, None],
2,
)
.expect("vector_cosine_distance");
assert_close(f64_array(&distances).value(0), 0.52);
assert_close(f64_array(&distances).value(1), 1.0 / 9.0);
let (_, normalized) = invoke_udf(
&normalize_udf,
vec![ColumnarValue::Array(Arc::new(left))],
vec![left_field],
&[None],
2,
)
.expect("vector_normalize");
let normalized = fixed_size_list_array(&normalized);
let normalized =
ndarrow::fixed_size_list_as_array2::<Float64Type>(normalized).expect("normalized vectors");
assert_close(normalized[[0, 0]], 0.6);
assert_close(normalized[[0, 1]], 0.8);
assert_close(normalized[[1, 0]], 1.0 / 3.0);
assert_close(normalized[[1, 1]], 2.0 / 3.0);
assert_close(normalized[[1, 2]], 2.0 / 3.0);
}
#[test]
fn vector_udfs_cover_complex_batch_ops() {
let sqrt_two = 2.0_f64.sqrt();
let (left_field, left) = complex_vector_batch("left_complex", [
[Complex64::new(1.0, 1.0), Complex64::new(0.0, 0.0)],
[Complex64::new(0.0, 0.0), Complex64::new(2.0, 0.0)],
]);
let (right_field, right) = complex_vector_batch("right_complex", [
[Complex64::new(1.0, 0.0), Complex64::new(0.0, 1.0)],
[Complex64::new(2.0, 0.0), Complex64::new(0.0, 0.0)],
]);
let dot_udf = udfs::vector_dot_hermitian_udf();
let norm_udf = udfs::vector_l2_norm_complex_udf();
let similarity_udf = udfs::vector_cosine_similarity_complex_udf();
let normalize_udf = udfs::vector_normalize_complex_udf();
let (dot_field, dots) = invoke_udf(
&dot_udf,
vec![
ColumnarValue::Array(Arc::new(left.clone())),
ColumnarValue::Array(Arc::new(right.clone())),
],
vec![Arc::clone(&left_field), Arc::clone(&right_field)],
&[None, None],
2,
)
.expect("vector_dot_hermitian");
let dots = ndarrow::complex64_as_array_view1(dot_field.as_ref(), fixed_size_list_array(&dots))
.expect("complex scalar output");
assert_complex_close(dots[0], Complex64::new(1.0, -1.0));
assert_complex_close(dots[1], Complex64::new(0.0, 0.0));
let (_, norms) = invoke_udf(
&norm_udf,
vec![ColumnarValue::Array(Arc::new(left.clone()))],
vec![Arc::clone(&left_field)],
&[None],
2,
)
.expect("vector_l2_norm_complex");
assert_close(f64_array(&norms).value(0), sqrt_two);
assert_close(f64_array(&norms).value(1), 2.0);
let (similarity_field, similarities) = invoke_udf(
&similarity_udf,
vec![ColumnarValue::Array(Arc::new(left.clone())), ColumnarValue::Array(Arc::new(right))],
vec![Arc::clone(&left_field), right_field],
&[None, None],
2,
)
.expect("vector_cosine_similarity_complex");
let similarities = ndarrow::complex64_as_array_view1(
similarity_field.as_ref(),
fixed_size_list_array(&similarities),
)
.expect("complex cosine similarity output");
assert_complex_close(similarities[0], Complex64::new(0.5, -0.5));
assert_complex_close(similarities[1], Complex64::new(0.0, 0.0));
let (normalized_field, normalized) = invoke_udf(
&normalize_udf,
vec![ColumnarValue::Array(Arc::new(left))],
vec![left_field],
&[None],
2,
)
.expect("vector_normalize_complex");
let normalized =
ndarrow::complex64_as_array_view2(fixed_size_list_array(&normalized)).expect("normalized");
let DataType::FixedSizeList(item_field, _) = normalized_field.data_type() else {
panic!("expected complex vector field");
};
assert_eq!(item_field.extension_type_name(), Some("ndarrow.complex64"));
assert_complex_close(normalized[[0, 0]], Complex64::new(1.0 / sqrt_two, 1.0 / sqrt_two));
assert_complex_close(normalized[[0, 1]], Complex64::new(0.0, 0.0));
assert_complex_close(normalized[[1, 0]], Complex64::new(0.0, 0.0));
assert_complex_close(normalized[[1, 1]], Complex64::new(1.0, 0.0));
}
fn assert_complex_matrix_products(
matrix_field: &FieldRef,
matrices: &FixedSizeListArray,
vector_field: FieldRef,
vectors: &FixedSizeListArray,
right_field: &FieldRef,
right: &FixedSizeListArray,
) {
let matrix_view =
ndarrow::complex64_fixed_shape_tensor_as_array_viewd(matrix_field.as_ref(), matrices)
.expect("matrix tensor")
.into_dimensionality::<Ix3>()
.expect("rank-3 complex matrix batch");
let vector_view = ndarrow::complex64_as_array_view2(vectors).expect("complex rhs");
let right_view =
ndarrow::complex64_fixed_shape_tensor_as_array_viewd(right_field.as_ref(), right)
.expect("right tensor")
.into_dimensionality::<Ix3>()
.expect("rank-3 right matrix batch");
let (matvec_field, matvec) = invoke_udf(
&udfs::matrix_matvec_complex_udf(),
vec![
ColumnarValue::Array(Arc::new(matrices.clone())),
ColumnarValue::Array(Arc::new(vectors.clone())),
],
vec![Arc::clone(matrix_field), vector_field],
&[None, None],
2,
)
.expect("matrix_matvec_complex");
let matvec =
ndarrow::complex64_as_array_view2(fixed_size_list_array(&matvec)).expect("complex matvec");
let DataType::FixedSizeList(item_field, _) = matvec_field.data_type() else {
panic!("expected complex vector output");
};
assert_eq!(item_field.extension_type_name(), Some("ndarrow.complex64"));
for row in 0..matrix_view.len_of(Axis(0)) {
let expected = nabled::linalg::matrix::matvec_complex_view(
&matrix_view.index_axis(Axis(0), row),
&vector_view.index_axis(Axis(0), row),
)
.expect("expected complex matvec");
for col in 0..expected.len() {
assert_complex_close(matvec[[row, col]], expected[col]);
}
}
let (matmat_field, matmat) = invoke_udf(
&udfs::matrix_matmat_complex_udf(),
vec![
ColumnarValue::Array(Arc::new(matrices.clone())),
ColumnarValue::Array(Arc::new(right.clone())),
],
vec![Arc::clone(matrix_field), Arc::clone(right_field)],
&[None, None],
2,
)
.expect("matrix_matmat_complex");
let matmat = complex_fixed_shape_view3(&matmat_field, &matmat);
for row in 0..matrix_view.len_of(Axis(0)) {
let expected = nabled::linalg::matrix::matmat_complex_view(
&matrix_view.index_axis(Axis(0), row),
&right_view.index_axis(Axis(0), row),
)
.expect("expected complex matmat");
for i in 0..expected.nrows() {
for j in 0..expected.ncols() {
assert_complex_close(matmat[[row, i, j]], expected[[i, j]]);
}
}
}
}
fn assert_complex_matrix_stats(matrix_field: &FieldRef, matrices: &FixedSizeListArray) {
let matrix_view =
ndarrow::complex64_fixed_shape_tensor_as_array_viewd(matrix_field.as_ref(), matrices)
.expect("matrix tensor")
.into_dimensionality::<Ix3>()
.expect("rank-3 complex matrix batch");
let (_, means) = invoke_udf(
&udfs::matrix_column_means_complex_udf(),
vec![ColumnarValue::Array(Arc::new(matrices.clone()))],
vec![Arc::clone(matrix_field)],
&[None],
2,
)
.expect("matrix_column_means_complex");
let means =
ndarrow::complex64_as_array_view2(fixed_size_list_array(&means)).expect("complex means");
for row in 0..matrix_view.len_of(Axis(0)) {
let expected =
nabled::ml::stats::column_means_complex_view(&matrix_view.index_axis(Axis(0), row));
for col in 0..expected.len() {
assert_complex_close(means[[row, col]], expected[col]);
}
}
let (center_field, centered) = invoke_udf(
&udfs::matrix_center_columns_complex_udf(),
vec![ColumnarValue::Array(Arc::new(matrices.clone()))],
vec![Arc::clone(matrix_field)],
&[None],
2,
)
.expect("matrix_center_columns_complex");
let centered = complex_fixed_shape_view3(¢er_field, ¢ered);
for row in 0..matrix_view.len_of(Axis(0)) {
let expected =
nabled::ml::stats::center_columns_complex_view(&matrix_view.index_axis(Axis(0), row));
for i in 0..expected.nrows() {
for j in 0..expected.ncols() {
assert_complex_close(centered[[row, i, j]], expected[[i, j]]);
}
}
}
let (covariance_field, covariance) = invoke_udf(
&udfs::matrix_covariance_complex_udf(),
vec![ColumnarValue::Array(Arc::new(matrices.clone()))],
vec![Arc::clone(matrix_field)],
&[None],
2,
)
.expect("matrix_covariance_complex");
let covariance = complex_fixed_shape_view3(&covariance_field, &covariance);
for row in 0..matrix_view.len_of(Axis(0)) {
let expected = nabled::ml::stats::covariance_matrix_complex_view(
&matrix_view.index_axis(Axis(0), row),
)
.expect("expected complex covariance");
for i in 0..expected.nrows() {
for j in 0..expected.ncols() {
assert_complex_close(covariance[[row, i, j]], expected[[i, j]]);
}
}
}
let (correlation_field, correlation) = invoke_udf(
&udfs::matrix_correlation_complex_udf(),
vec![ColumnarValue::Array(Arc::new(matrices.clone()))],
vec![Arc::clone(matrix_field)],
&[None],
2,
)
.expect("matrix_correlation_complex");
let correlation = complex_fixed_shape_view3(&correlation_field, &correlation);
for row in 0..matrix_view.len_of(Axis(0)) {
let expected = nabled::ml::stats::correlation_matrix_complex_view(
&matrix_view.index_axis(Axis(0), row),
)
.expect("expected complex correlation");
for i in 0..expected.nrows() {
for j in 0..expected.ncols() {
assert_complex_close(correlation[[row, i, j]], expected[[i, j]]);
}
}
}
}
fn complex_matrix_view<'a>(
matrix_field: &'a FieldRef,
matrices: &'a FixedSizeListArray,
) -> ndarray::ArrayView3<'a, Complex64> {
ndarrow::complex64_fixed_shape_tensor_as_array_viewd(matrix_field.as_ref(), matrices)
.expect("complex matrix tensor")
.into_dimensionality::<Ix3>()
.expect("rank-3 complex matrix batch")
}
fn assert_complex_matrix_tensor_output<E>(
output_field: &FieldRef,
output: &ColumnarValue,
matrix_view: &ndarray::ArrayView3<'_, Complex64>,
op: impl Fn(&ndarray::ArrayView2<'_, Complex64>) -> StdResult<Array2<Complex64>, E>,
) where
E: std::fmt::Display,
{
let output = complex_fixed_shape_view3(output_field, output);
for row in 0..matrix_view.len_of(Axis(0)) {
let expected = op(&matrix_view.index_axis(Axis(0), row))
.unwrap_or_else(|error| panic!("expected output: {error}"));
for i in 0..expected.nrows() {
for j in 0..expected.ncols() {
assert_complex_close(output[[row, i, j]], expected[[i, j]]);
}
}
}
}
fn assert_complex_two_tensor_struct_output<E>(
output_field: &FieldRef,
output: &ColumnarValue,
matrix_view: &ndarray::ArrayView3<'_, Complex64>,
op: impl Fn(
&ndarray::ArrayView2<'_, Complex64>,
) -> StdResult<(Array2<Complex64>, Array2<Complex64>), E>,
) where
E: std::fmt::Display,
{
let DataType::Struct(fields) = output_field.data_type() else {
panic!("expected struct output");
};
let output = struct_array(output);
let first = ndarrow::complex64_fixed_shape_tensor_as_array_viewd(
&fields[0],
output
.column(0)
.as_any()
.downcast_ref::<FixedSizeListArray>()
.expect("first complex tensor"),
)
.expect("first complex tensor")
.into_dimensionality::<Ix3>()
.expect("rank-3 first complex tensor");
let second = ndarrow::complex64_fixed_shape_tensor_as_array_viewd(
&fields[1],
output
.column(1)
.as_any()
.downcast_ref::<FixedSizeListArray>()
.expect("second complex tensor"),
)
.expect("second complex tensor")
.into_dimensionality::<Ix3>()
.expect("rank-3 second complex tensor");
for row in 0..matrix_view.len_of(Axis(0)) {
let (expected_first, expected_second) = op(&matrix_view.index_axis(Axis(0), row))
.unwrap_or_else(|error| panic!("expected output: {error}"));
for i in 0..expected_first.nrows() {
for j in 0..expected_first.ncols() {
assert_complex_close(first[[row, i, j]], expected_first[[i, j]]);
assert_complex_close(second[[row, i, j]], expected_second[[i, j]]);
}
}
}
}
fn assert_complex_eigen_struct_output<E>(
output_field: &FieldRef,
output: &ColumnarValue,
matrix_view: &ndarray::ArrayView3<'_, Complex64>,
op: impl Fn(
&ndarray::ArrayView2<'_, Complex64>,
) -> StdResult<nabled::linalg::eigen::NdarrayNonsymmetricEigenResult<f64>, E>,
) where
E: std::fmt::Display,
{
let DataType::Struct(fields) = output_field.data_type() else {
panic!("expected struct output");
};
let output = struct_array(output);
let eigenvalues = ndarrow::complex64_as_array_view2(
output
.column(0)
.as_any()
.downcast_ref::<FixedSizeListArray>()
.expect("complex eigenvalue vectors"),
)
.expect("complex eigenvalue vectors");
let schur_vectors = ndarrow::complex64_fixed_shape_tensor_as_array_viewd(
&fields[1],
output
.column(1)
.as_any()
.downcast_ref::<FixedSizeListArray>()
.expect("complex schur vectors"),
)
.expect("complex schur vectors")
.into_dimensionality::<Ix3>()
.expect("rank-3 complex schur vectors");
for row in 0..matrix_view.len_of(Axis(0)) {
let expected = op(&matrix_view.index_axis(Axis(0), row))
.unwrap_or_else(|error| panic!("expected output: {error}"));
for index in 0..expected.eigenvalues.len() {
assert_complex_close(eigenvalues[[row, index]], expected.eigenvalues[index]);
}
for i in 0..expected.schur_vectors.nrows() {
for j in 0..expected.schur_vectors.ncols() {
assert_complex_close(schur_vectors[[row, i, j]], expected.schur_vectors[[i, j]]);
}
}
}
}
fn assert_real_input_complex_eigen_struct_output<T, E>(
output_field: &FieldRef,
output: &ColumnarValue,
matrix_view: &ndarray::ArrayView3<'_, T::Native>,
op: impl Fn(
&ndarray::ArrayView2<'_, T::Native>,
) -> StdResult<nabled::linalg::eigen::NdarrayNonsymmetricEigenResult<T::Native>, E>,
) where
T: datafusion::arrow::array::types::ArrowPrimitiveType,
T::Native: NabledReal + NdarrowElement + Into<f64>,
E: std::fmt::Display,
{
let DataType::Struct(fields) = output_field.data_type() else {
panic!("expected struct output");
};
let output = struct_array(output);
let eigenvalues = ndarrow::complex64_as_array_view2(
output
.column(0)
.as_any()
.downcast_ref::<FixedSizeListArray>()
.expect("complex eigenvalue vectors"),
)
.expect("complex eigenvalue vectors");
let schur_vectors = ndarrow::complex64_fixed_shape_tensor_as_array_viewd(
&fields[1],
output
.column(1)
.as_any()
.downcast_ref::<FixedSizeListArray>()
.expect("complex schur vectors"),
)
.expect("complex schur vectors")
.into_dimensionality::<Ix3>()
.expect("rank-3 complex schur vectors");
for row in 0..matrix_view.len_of(Axis(0)) {
let expected = op(&matrix_view.index_axis(Axis(0), row))
.unwrap_or_else(|error| panic!("expected output: {error}"));
for index in 0..expected.eigenvalues.len() {
assert_complex_close(
eigenvalues[[row, index]],
Complex64::new(
expected.eigenvalues[index].re.into(),
expected.eigenvalues[index].im.into(),
),
);
}
for i in 0..expected.schur_vectors.nrows() {
for j in 0..expected.schur_vectors.ncols() {
assert_complex_close(
schur_vectors[[row, i, j]],
Complex64::new(
expected.schur_vectors[[i, j]].re.into(),
expected.schur_vectors[[i, j]].im.into(),
),
);
}
}
}
}
fn assert_real_input_complex_bi_eigen_struct_output<T, E>(
output_field: &FieldRef,
output: &ColumnarValue,
matrix_view: &ndarray::ArrayView3<'_, T::Native>,
op: impl Fn(
&ndarray::ArrayView2<'_, T::Native>,
) -> StdResult<nabled::linalg::eigen::NdarrayNonsymmetricBiEigenResult<T::Native>, E>,
) where
T: datafusion::arrow::array::types::ArrowPrimitiveType,
T::Native: NabledReal + NdarrowElement + Into<f64>,
E: std::fmt::Display,
{
let DataType::Struct(fields) = output_field.data_type() else {
panic!("expected struct output");
};
let output = struct_array(output);
let eigenvalues = ndarrow::complex64_as_array_view2(
output
.column(0)
.as_any()
.downcast_ref::<FixedSizeListArray>()
.expect("complex eigenvalue vectors"),
)
.expect("complex eigenvalue vectors");
let right = ndarrow::complex64_fixed_shape_tensor_as_array_viewd(
&fields[1],
output
.column(1)
.as_any()
.downcast_ref::<FixedSizeListArray>()
.expect("complex right eigenvectors"),
)
.expect("complex right eigenvectors")
.into_dimensionality::<Ix3>()
.expect("rank-3 complex right eigenvectors");
let left = ndarrow::complex64_fixed_shape_tensor_as_array_viewd(
&fields[2],
output
.column(2)
.as_any()
.downcast_ref::<FixedSizeListArray>()
.expect("complex left eigenvectors"),
)
.expect("complex left eigenvectors")
.into_dimensionality::<Ix3>()
.expect("rank-3 complex left eigenvectors");
let diagonal = ndarrow::fixed_size_list_as_array2::<T>(
output
.column(3)
.as_any()
.downcast_ref::<FixedSizeListArray>()
.expect("balancing diagonal"),
)
.expect("balancing diagonal");
let balanced = ndarrow::fixed_shape_tensor_as_array_viewd::<T>(
&fields[4],
output.column(4).as_any().downcast_ref::<FixedSizeListArray>().expect("balanced matrix"),
)
.expect("balanced matrix")
.into_dimensionality::<Ix3>()
.expect("rank-3 balanced matrix");
for row in 0..matrix_view.len_of(Axis(0)) {
let expected = op(&matrix_view.index_axis(Axis(0), row))
.unwrap_or_else(|error| panic!("expected output: {error}"));
for index in 0..expected.eigenvalues.len() {
assert_complex_close(
eigenvalues[[row, index]],
Complex64::new(
expected.eigenvalues[index].re.into(),
expected.eigenvalues[index].im.into(),
),
);
assert_close(diagonal[[row, index]].into(), expected.balancing_diagonal[index].into());
}
for i in 0..expected.right_eigenvectors.nrows() {
for j in 0..expected.right_eigenvectors.ncols() {
assert_complex_close(
right[[row, i, j]],
Complex64::new(
expected.right_eigenvectors[[i, j]].re.into(),
expected.right_eigenvectors[[i, j]].im.into(),
),
);
assert_complex_close(
left[[row, i, j]],
Complex64::new(
expected.left_eigenvectors[[i, j]].re.into(),
expected.left_eigenvectors[[i, j]].im.into(),
),
);
assert_close(balanced[[row, i, j]].into(), expected.balanced_matrix[[i, j]].into());
}
}
}
}
#[test]
fn complex_matrix_udfs_cover_products_and_stats() {
let (matrix_field, matrices) = complex_matrix_batch("complex_matrices", [
[[Complex64::new(1.0, 1.0), Complex64::new(0.0, 0.0)], [
Complex64::new(0.0, 0.0),
Complex64::new(2.0, 0.0),
]],
[[Complex64::new(2.0, 0.0), Complex64::new(1.0, -1.0)], [
Complex64::new(1.0, 1.0),
Complex64::new(3.0, 0.0),
]],
]);
let (vector_field, vectors) = complex_vector_batch("complex_rhs", [
[Complex64::new(1.0, 0.0), Complex64::new(0.0, 1.0)],
[Complex64::new(1.0, -1.0), Complex64::new(2.0, 0.0)],
]);
let (right_field, right) = complex_matrix_batch("complex_right", [
[[Complex64::new(1.0, 0.0), Complex64::new(0.0, 1.0)], [
Complex64::new(2.0, 0.0),
Complex64::new(1.0, 0.0),
]],
[[Complex64::new(0.0, 1.0), Complex64::new(1.0, 0.0)], [
Complex64::new(2.0, 0.0),
Complex64::new(0.0, 0.0),
]],
]);
assert_complex_matrix_products(
&matrix_field,
&matrices,
vector_field,
&vectors,
&right_field,
&right,
);
assert_complex_matrix_stats(&matrix_field, &matrices);
}
#[test]
fn complex_decomposition_udfs_cover_schur_and_polar() {
let (matrix_field, matrices) = complex_matrix_batch("complex_spectral", [
[[Complex64::new(4.0, 0.0), Complex64::new(0.0, 0.0)], [
Complex64::new(0.0, 0.0),
Complex64::new(9.0, 0.0),
]],
[[Complex64::new(1.0, 0.0), Complex64::new(0.0, 0.0)], [
Complex64::new(0.0, 0.0),
Complex64::new(16.0, 0.0),
]],
]);
let matrix_view = complex_matrix_view(&matrix_field, &matrices);
let (schur_field, schur) = invoke_udf(
&udfs::matrix_schur_complex_udf(),
vec![ColumnarValue::Array(Arc::new(matrices.clone()))],
vec![Arc::clone(&matrix_field)],
&[None],
2,
)
.expect("matrix_schur_complex");
assert_complex_two_tensor_struct_output(&schur_field, &schur, &matrix_view, |view| {
nabled::linalg::schur::compute_schur_complex_view(view).map(|result| (result.q, result.t))
});
let (polar_field, polar) = invoke_udf(
&udfs::matrix_polar_complex_udf(),
vec![ColumnarValue::Array(Arc::new(matrices.clone()))],
vec![Arc::clone(&matrix_field)],
&[None],
2,
)
.expect("matrix_polar_complex");
assert_complex_two_tensor_struct_output(&polar_field, &polar, &matrix_view, |view| {
nabled::linalg::polar::compute_polar_complex_view(view).map(|result| (result.u, result.p))
});
}
#[test]
fn complex_eigen_udf_covers_nonsymmetric_contract() {
let (matrix_field, matrices) = complex_matrix_batch("complex_nonsymmetric_spectral", [
[[Complex64::new(1.0, 0.0), Complex64::new(2.0, 1.0)], [
Complex64::new(0.0, 0.0),
Complex64::new(3.0, 0.0),
]],
[[Complex64::new(2.0, 0.0), Complex64::new(0.0, -1.0)], [
Complex64::new(0.0, 0.0),
Complex64::new(5.0, 0.0),
]],
]);
let matrix_view = complex_matrix_view(&matrix_field, &matrices);
let (output_field, output) = invoke_udf(
&udfs::matrix_eigen_nonsymmetric_complex_udf(),
vec![ColumnarValue::Array(Arc::new(matrices.clone()))],
vec![Arc::clone(&matrix_field)],
&[None],
2,
)
.expect("matrix_eigen_nonsymmetric_complex");
assert_complex_eigen_struct_output(&output_field, &output, &matrix_view, |view| {
nabled::linalg::eigen::nonsymmetric_complex_view(view)
});
}
#[test]
fn real_nonsymmetric_eigen_udfs_cover_complex_contracts() {
let (matrix_field_f32, matrices_f32) =
matrix_batch_f32("real_nonsymmetric_f32", [[[1.0_f32, 2.0_f32], [0.0_f32, 3.0_f32]]]);
let matrix_view_f32 = ndarrow::fixed_shape_tensor_as_array_viewd::<Float32Type>(
matrix_field_f32.as_ref(),
&matrices_f32,
)
.expect("f32 matrix view")
.into_dimensionality::<Ix3>()
.expect("rank-3 f32 matrix batch");
let (field_f32, output_f32) = invoke_udf(
&udfs::matrix_eigen_nonsymmetric_udf(),
vec![ColumnarValue::Array(Arc::new(matrices_f32.clone()))],
vec![Arc::clone(&matrix_field_f32)],
&[None],
1,
)
.expect("matrix_eigen_nonsymmetric");
assert_real_input_complex_eigen_struct_output::<Float32Type, _>(
&field_f32,
&output_f32,
&matrix_view_f32,
nabled::linalg::eigen::nonsymmetric_view,
);
let (matrix_field_f64, matrices_f64) =
matrix_batch("real_nonsymmetric_f64", [[[2.0_f64, -1.0_f64], [1.0_f64, 4.0_f64]]]);
let matrix_view_f64 = ndarrow::fixed_shape_tensor_as_array_viewd::<Float64Type>(
matrix_field_f64.as_ref(),
&matrices_f64,
)
.expect("f64 matrix view")
.into_dimensionality::<Ix3>()
.expect("rank-3 f64 matrix batch");
let (field_f64, output_f64) = invoke_udf(
&udfs::matrix_eigen_nonsymmetric_bi_udf(),
vec![ColumnarValue::Array(Arc::new(matrices_f64.clone()))],
vec![Arc::clone(&matrix_field_f64)],
&[None],
1,
)
.expect("matrix_eigen_nonsymmetric_bi");
assert_real_input_complex_bi_eigen_struct_output::<Float64Type, _>(
&field_f64,
&output_f64,
&matrix_view_f64,
|view| {
let config = nabled::linalg::eigen::NonsymmetricEigenConfig::<f64>::default();
nabled::linalg::eigen::nonsymmetric_bi_view(view, &config)
},
);
}
#[test]
fn complex_matrix_function_udfs_cover_exp_and_logs() {
let (matrix_field, matrices) = complex_matrix_batch("complex_matrix_functions", [
[[Complex64::new(4.0, 0.0), Complex64::new(0.0, 0.0)], [
Complex64::new(0.0, 0.0),
Complex64::new(9.0, 0.0),
]],
[[Complex64::new(1.0, 0.0), Complex64::new(0.0, 0.0)], [
Complex64::new(0.0, 0.0),
Complex64::new(16.0, 0.0),
]],
]);
let matrix_view = complex_matrix_view(&matrix_field, &matrices);
let max_terms = ScalarValue::Int64(Some(32));
let tolerance = ScalarValue::Float64(Some(1.0e-12));
let scalar_fields = vec![
Arc::clone(&matrix_field),
Arc::new(Field::new("max_terms", DataType::Int64, false)),
Arc::new(Field::new("tolerance", DataType::Float64, false)),
];
let (exp_field, exp_output) = invoke_udf(
&udfs::matrix_exp_complex_udf(),
vec![
ColumnarValue::Array(Arc::new(matrices.clone())),
ColumnarValue::Scalar(max_terms.clone()),
ColumnarValue::Scalar(tolerance.clone()),
],
scalar_fields,
&[None, Some(max_terms), Some(tolerance)],
2,
)
.expect("matrix_exp_complex");
assert_complex_matrix_tensor_output(&exp_field, &exp_output, &matrix_view, |view| {
nabled::linalg::matrix_functions::matrix_exp_complex_view(view, 32, 1.0e-12)
});
let (exp_eigen_field, exp_eigen_output) = invoke_udf(
&udfs::matrix_exp_eigen_complex_udf(),
vec![ColumnarValue::Array(Arc::new(matrices.clone()))],
vec![Arc::clone(&matrix_field)],
&[None],
2,
)
.expect("matrix_exp_eigen_complex");
assert_complex_matrix_tensor_output(
&exp_eigen_field,
&exp_eigen_output,
&matrix_view,
nabled::linalg::matrix_functions::matrix_exp_eigen_complex_view,
);
let (log_eigen_field, log_eigen_output) = invoke_udf(
&udfs::matrix_log_eigen_complex_udf(),
vec![ColumnarValue::Array(Arc::new(matrices.clone()))],
vec![Arc::clone(&matrix_field)],
&[None],
2,
)
.expect("matrix_log_eigen_complex");
assert_complex_matrix_tensor_output(
&log_eigen_field,
&log_eigen_output,
&matrix_view,
nabled::linalg::matrix_functions::matrix_log_eigen_complex_view,
);
let (log_svd_field, log_svd_output) = invoke_udf(
&udfs::matrix_log_svd_complex_udf(),
vec![ColumnarValue::Array(Arc::new(matrices.clone()))],
vec![Arc::clone(&matrix_field)],
&[None],
2,
)
.expect("matrix_log_svd_complex");
assert_complex_matrix_tensor_output(
&log_svd_field,
&log_svd_output,
&matrix_view,
nabled::linalg::matrix_functions::matrix_log_svd_complex_view,
);
}
#[test]
fn complex_matrix_function_udfs_cover_power_and_sign() {
let (matrix_field, matrices) = complex_matrix_batch("complex_matrix_functions", [
[[Complex64::new(4.0, 0.0), Complex64::new(0.0, 0.0)], [
Complex64::new(0.0, 0.0),
Complex64::new(9.0, 0.0),
]],
[[Complex64::new(1.0, 0.0), Complex64::new(0.0, 0.0)], [
Complex64::new(0.0, 0.0),
Complex64::new(16.0, 0.0),
]],
]);
let matrix_view = complex_matrix_view(&matrix_field, &matrices);
let power = ScalarValue::Float64(Some(0.5));
let scalar_fields =
vec![Arc::clone(&matrix_field), Arc::new(Field::new("power", DataType::Float64, false))];
let (power_field, power_output) = invoke_udf(
&udfs::matrix_power_complex_udf(),
vec![
ColumnarValue::Array(Arc::new(matrices.clone())),
ColumnarValue::Scalar(power.clone()),
],
scalar_fields,
&[None, Some(power)],
2,
)
.expect("matrix_power_complex");
assert_complex_matrix_tensor_output(&power_field, &power_output, &matrix_view, |view| {
nabled::linalg::matrix_functions::matrix_power_complex_view(view, 0.5)
});
let (sign_field, sign_output) = invoke_udf(
&udfs::matrix_sign_complex_udf(),
vec![ColumnarValue::Array(Arc::new(matrices.clone()))],
vec![Arc::clone(&matrix_field)],
&[None],
2,
)
.expect("matrix_sign_complex");
assert_complex_matrix_tensor_output(
&sign_field,
&sign_output,
&matrix_view,
nabled::linalg::matrix_functions::matrix_sign_complex_view,
);
}
#[test]
fn complex_iterative_udfs_cover_outputs() {
let (matrix_field, matrices) = complex_matrix_batch("complex_systems", [
[[Complex64::new(4.0, 0.0), Complex64::new(0.0, 0.0)], [
Complex64::new(0.0, 0.0),
Complex64::new(9.0, 0.0),
]],
[[Complex64::new(2.0, 0.0), Complex64::new(0.0, 0.0)], [
Complex64::new(0.0, 0.0),
Complex64::new(5.0, 0.0),
]],
]);
let (rhs_field, rhs) = complex_vector_batch("complex_rhs", [
[Complex64::new(8.0, 0.0), Complex64::new(18.0, 0.0)],
[Complex64::new(4.0, 0.0), Complex64::new(10.0, 0.0)],
]);
let cg_udf = udfs::matrix_conjugate_gradient_complex_udf();
let gmres_udf = udfs::matrix_gmres_complex_udf();
let tolerance = ScalarValue::Float64(Some(1.0e-10));
let max_iterations = ScalarValue::Int64(Some(16));
let scalar_fields = vec![
Arc::clone(&matrix_field),
Arc::clone(&rhs_field),
Arc::new(Field::new("tolerance", DataType::Float64, false)),
Arc::new(Field::new("max_iterations", DataType::Int64, false)),
];
let (_, cg) = invoke_udf(
&cg_udf,
vec![
ColumnarValue::Array(Arc::new(matrices.clone())),
ColumnarValue::Array(Arc::new(rhs.clone())),
ColumnarValue::Scalar(tolerance.clone()),
ColumnarValue::Scalar(max_iterations.clone()),
],
scalar_fields.clone(),
&[None, None, Some(tolerance.clone()), Some(max_iterations.clone())],
2,
)
.expect("matrix_conjugate_gradient_complex");
let cg = ndarrow::complex64_as_array_view2(fixed_size_list_array(&cg)).expect("cg output");
assert_complex_close(cg[[0, 0]], Complex64::new(2.0, 0.0));
assert_complex_close(cg[[0, 1]], Complex64::new(2.0, 0.0));
assert_complex_close(cg[[1, 0]], Complex64::new(2.0, 0.0));
assert_complex_close(cg[[1, 1]], Complex64::new(2.0, 0.0));
let (_, gmres) = invoke_udf(
&gmres_udf,
vec![
ColumnarValue::Array(Arc::new(matrices)),
ColumnarValue::Array(Arc::new(rhs)),
ColumnarValue::Scalar(tolerance.clone()),
ColumnarValue::Scalar(max_iterations.clone()),
],
scalar_fields,
&[None, None, Some(tolerance), Some(max_iterations)],
2,
)
.expect("matrix_gmres_complex");
let gmres =
ndarrow::complex64_as_array_view2(fixed_size_list_array(&gmres)).expect("gmres output");
assert_complex_close(gmres[[0, 0]], Complex64::new(2.0, 0.0));
assert_complex_close(gmres[[0, 1]], Complex64::new(2.0, 0.0));
assert_complex_close(gmres[[1, 0]], Complex64::new(2.0, 0.0));
assert_complex_close(gmres[[1, 1]], Complex64::new(2.0, 0.0));
}
#[test]
fn complex_tensor_udfs_cover_last_axis_outputs() {
let (tensor_field, tensor) = complex_tensor_batch4("complex_tensor", [[
[[Complex64::new(3.0, 4.0), Complex64::new(0.0, 0.0)], [
Complex64::new(0.0, 0.0),
Complex64::new(5.0, 12.0),
]],
[[Complex64::new(8.0, 15.0), Complex64::new(0.0, 0.0)], [
Complex64::new(7.0, 24.0),
Complex64::new(0.0, 0.0),
]],
]]);
let (variable_field, variable) = complex_ragged_matrices("complex_ragged", vec![
vec![vec![Complex64::new(3.0, 4.0), Complex64::new(0.0, 0.0)], vec![
Complex64::new(5.0, 12.0),
Complex64::new(0.0, 0.0),
]],
vec![vec![Complex64::new(8.0, 15.0), Complex64::new(0.0, 0.0)]],
]);
let norm_udf = udfs::tensor_l2_norm_last_axis_complex_udf();
let normalize_udf = udfs::tensor_normalize_last_axis_complex_udf();
let variable_norm_udf = udfs::tensor_variable_l2_norm_last_axis_complex_udf();
let variable_normalize_udf = udfs::tensor_variable_normalize_last_axis_complex_udf();
let (norm_field, norms) = invoke_udf(
&norm_udf,
vec![ColumnarValue::Array(Arc::new(tensor.clone()))],
vec![Arc::clone(&tensor_field)],
&[None],
1,
)
.expect("tensor_l2_norm_last_axis_complex");
let norms = fixed_shape_view3(&norm_field, &norms);
assert_close(norms[[0, 0, 0]], 5.0);
assert_close(norms[[0, 0, 1]], 13.0);
assert_close(norms[[0, 1, 0]], 17.0);
assert_close(norms[[0, 1, 1]], 25.0);
let (normalize_field, normalized) = invoke_udf(
&normalize_udf,
vec![ColumnarValue::Array(Arc::new(tensor))],
vec![Arc::clone(&tensor_field)],
&[None],
1,
)
.expect("tensor_normalize_last_axis_complex");
let normalized = complex_fixed_shape_viewd(&normalize_field, &normalized)
.into_dimensionality::<Ix4>()
.expect("rank-4 complex tensor");
assert_complex_close(normalized[[0, 0, 0, 0]], Complex64::new(0.6, 0.8));
assert_complex_close(normalized[[0, 0, 0, 1]], Complex64::new(0.0, 0.0));
assert_complex_close(normalized[[0, 0, 1, 0]], Complex64::new(0.0, 0.0));
assert_complex_close(normalized[[0, 0, 1, 1]], Complex64::new(5.0 / 13.0, 12.0 / 13.0));
let (variable_norm_field, variable_norms) = invoke_udf(
&variable_norm_udf,
vec![ColumnarValue::Array(Arc::new(variable.clone()))],
vec![Arc::clone(&variable_field)],
&[None],
2,
)
.expect("tensor_variable_l2_norm_last_axis_complex");
let mut norm_iter = ndarrow::variable_shape_tensor_iter::<Float64Type>(
variable_norm_field.as_ref(),
struct_array(&variable_norms),
)
.expect("variable norm output");
let (_, first_norm) = norm_iter.next().expect("first norm row").expect("first norm view");
let (_, second_norm) = norm_iter.next().expect("second norm row").expect("second norm view");
assert_close(first_norm[[0]], 5.0);
assert_close(first_norm[[1]], 13.0);
assert_close(second_norm[[0]], 17.0);
let (variable_normalize_field, variable_normalized) = invoke_udf(
&variable_normalize_udf,
vec![ColumnarValue::Array(Arc::new(variable))],
vec![variable_field],
&[None],
2,
)
.expect("tensor_variable_normalize_last_axis_complex");
let mut normalized_iter = ndarrow::complex64_variable_shape_tensor_iter(
variable_normalize_field.as_ref(),
struct_array(&variable_normalized),
)
.expect("variable normalize output");
let (_, first) =
normalized_iter.next().expect("first normalized row").expect("first normalized view");
let (_, second) =
normalized_iter.next().expect("second normalized row").expect("second normalized view");
assert_complex_close(first[[0, 0]], Complex64::new(0.6, 0.8));
assert_complex_close(first[[0, 1]], Complex64::new(0.0, 0.0));
assert_complex_close(first[[1, 0]], Complex64::new(5.0 / 13.0, 12.0 / 13.0));
assert_complex_close(first[[1, 1]], Complex64::new(0.0, 0.0));
assert_complex_close(second[[0, 0]], Complex64::new(8.0 / 17.0, 15.0 / 17.0));
assert_complex_close(second[[0, 1]], Complex64::new(0.0, 0.0));
}
#[test]
fn matrix_udfs_cover_matmul_and_lu_solve() {
let (left_field, left) =
matrix_batch("left_matrix", [[[1.0, 2.0], [3.0, 4.0]], [[2.0, 0.0], [1.0, 2.0]]]);
let (right_field, right) =
matrix_batch("right_matrix", [[[2.0, 0.0], [1.0, 2.0]], [[1.0, 1.0], [0.0, 1.0]]]);
let rhs = fixed_size_list([[5.0, 11.0], [4.0, 5.0]]);
let rhs_field = vector_field("rhs", &DataType::Float64, 2, false).expect("rhs field");
let matmul_udf = udfs::matrix_matmul_udf();
let lu_solve_udf = udfs::matrix_lu_solve_udf();
let (product_field, product) = invoke_udf(
&matmul_udf,
vec![ColumnarValue::Array(Arc::new(left.clone())), ColumnarValue::Array(Arc::new(right))],
vec![Arc::clone(&left_field), right_field],
&[None, None],
2,
)
.expect("matrix_matmul");
let product = fixed_size_list_array(&product);
let product =
ndarrow::fixed_shape_tensor_as_array_viewd::<Float64Type>(product_field.as_ref(), product)
.expect("product tensor")
.into_dimensionality::<Ix3>()
.expect("rank-3 tensor");
assert_close(product[[0, 0, 0]], 4.0);
assert_close(product[[0, 0, 1]], 4.0);
assert_close(product[[0, 1, 0]], 10.0);
assert_close(product[[0, 1, 1]], 8.0);
assert_close(product[[1, 0, 0]], 2.0);
assert_close(product[[1, 0, 1]], 2.0);
assert_close(product[[1, 1, 0]], 1.0);
assert_close(product[[1, 1, 1]], 3.0);
let (_, solved) = invoke_udf(
&lu_solve_udf,
vec![ColumnarValue::Array(Arc::new(left)), ColumnarValue::Array(Arc::new(rhs))],
vec![left_field, rhs_field],
&[None, None],
2,
)
.expect("matrix_lu_solve");
let solved = fixed_size_list_array(&solved);
let solved = ndarrow::fixed_size_list_as_array2::<Float64Type>(solved).expect("solutions");
assert_close(solved[[0, 0]], 1.0);
assert_close(solved[[0, 1]], 2.0);
assert_close(solved[[1, 0]], 2.0);
assert_close(solved[[1, 1]], 1.5);
}
#[test]
fn matrix_matvec_udf_covers_rowwise_matrix_vector_products() {
let (matrix_field, matrices) =
matrix_batch("matrices", [[[1.0, 2.0], [3.0, 4.0]], [[2.0, 0.0], [1.0, 2.0]]]);
let vectors = fixed_size_list([[2.0, 1.0], [3.0, 4.0]]);
let vector_field = vector_field("vectors", &DataType::Float64, 2, false).expect("vector field");
let matvec_udf = udfs::matrix_matvec_udf();
let (_, product) = invoke_udf(
&matvec_udf,
vec![ColumnarValue::Array(Arc::new(matrices)), ColumnarValue::Array(Arc::new(vectors))],
vec![matrix_field, vector_field],
&[None, None],
2,
)
.expect("matrix_matvec");
let product =
ndarrow::fixed_size_list_as_array2::<Float64Type>(fixed_size_list_array(&product))
.expect("matrix-vector product");
assert_close(product[[0, 0]], 4.0);
assert_close(product[[0, 1]], 10.0);
assert_close(product[[1, 0]], 6.0);
assert_close(product[[1, 1]], 11.0);
}
#[test]
fn matrix_lu_returns_struct_of_tensor_batches() {
let (field, matrices) =
matrix_batch("lu_matrix", [[[4.0, 2.0], [1.0, 3.0]], [[5.0, 1.0], [2.0, 4.0]]]);
let lu_udf = udfs::matrix_lu_udf();
let (return_field, lu) = invoke_udf(
&lu_udf,
vec![ColumnarValue::Array(Arc::new(matrices))],
vec![Arc::clone(&field)],
&[None],
2,
)
.expect("matrix_lu");
let DataType::Struct(fields) = return_field.data_type() else {
panic!("expected struct return field");
};
let lu = struct_array(&lu);
let lower =
lu.column(0).as_any().downcast_ref::<FixedSizeListArray>().expect("lower tensor array");
let upper =
lu.column(1).as_any().downcast_ref::<FixedSizeListArray>().expect("upper tensor array");
let lower = ndarrow::fixed_shape_tensor_as_array_viewd::<Float64Type>(&fields[0], lower)
.expect("lower tensor");
let upper = ndarrow::fixed_shape_tensor_as_array_viewd::<Float64Type>(&fields[1], upper)
.expect("upper tensor");
assert_eq!(lower.shape(), &[2, 2, 2]);
assert_eq!(upper.shape(), &[2, 2, 2]);
}
#[test]
fn sparse_matvec_returns_variable_shape_vectors() {
let (sparse_field, sparse) = sparse_batch("sparse");
let (ragged_field, vectors) =
ragged_vectors("vectors", vec![vec![1.0, 2.0, 3.0], vec![2.0, 1.0]]);
let sparse_matvec_udf = udfs::sparse_matvec_udf();
let (sparse_return_field, sparse_result) = invoke_udf(
&sparse_matvec_udf,
vec![ColumnarValue::Array(Arc::new(sparse)), ColumnarValue::Array(Arc::new(vectors))],
vec![sparse_field, ragged_field],
&[None, None],
2,
)
.expect("sparse_matvec");
let sparse_result = struct_array(&sparse_result);
let mut sparse_iter = ndarrow::variable_shape_tensor_iter::<Float64Type>(
sparse_return_field.as_ref(),
sparse_result,
)
.expect("ragged tensor output");
let (_, row0) = sparse_iter.next().expect("row 0").expect("row 0 view");
let (_, row1) = sparse_iter.next().expect("row 1").expect("row 1 view");
assert_eq!(row0.shape(), &[2]);
assert_eq!(row1.shape(), &[2]);
assert_close(row0[[0]], 7.0);
assert_close(row0[[1]], 6.0);
assert_close(row1[[0]], 8.0);
assert_close(row1[[1]], 16.0);
}
#[test]
fn sparse_lu_solve_returns_variable_shape_vectors() {
let (sparse_field, sparse) = ndarrow::csr_batch_to_extension_array(
"sparse_solve",
vec![[2, 2], [2, 2]],
vec![vec![0, 2, 4], vec![0, 1, 2]],
vec![vec![0, 1, 0, 1], vec![0, 1]],
vec![vec![4.0, 1.0, 1.0, 3.0], vec![2.0, 5.0]],
)
.map(|(field, array)| (Arc::new(field), array))
.expect("sparse solve batch");
let (rhs_field, rhs) = ragged_vectors("rhs", vec![vec![1.0, 2.0], vec![4.0, 10.0]]);
let sparse_lu_solve_udf = udfs::sparse_lu_solve_udf();
let (return_field, solved) = invoke_udf(
&sparse_lu_solve_udf,
vec![ColumnarValue::Array(Arc::new(sparse)), ColumnarValue::Array(Arc::new(rhs))],
vec![sparse_field, rhs_field],
&[None, None],
2,
)
.expect("sparse_lu_solve");
let solved = struct_array(&solved);
let mut solved_iter =
ndarrow::variable_shape_tensor_iter::<Float64Type>(return_field.as_ref(), solved)
.expect("ragged tensor output");
let (_, first) = solved_iter.next().expect("row 0").expect("row 0 view");
let first = first.into_dimensionality::<Ix1>().expect("rank-1 vector");
let (_, second) = solved_iter.next().expect("row 1").expect("row 1 view");
let second = second.into_dimensionality::<Ix1>().expect("rank-1 vector");
assert_close(first[[0]], 1.0 / 11.0);
assert_close(first[[1]], 7.0 / 11.0);
assert_close(second[[0]], 2.0);
assert_close(second[[1]], 2.0);
}
#[test]
fn tensor_sum_last_axis_and_matrix_column_means_work() {
let (tensor_field, tensor) =
matrix_batch("tensor", [[[1.0, 2.0], [3.0, 4.0]], [[5.0, 6.0], [7.0, 8.0]]]);
let tensor_sum_udf = udfs::tensor_sum_last_axis_udf();
let column_means_udf = udfs::matrix_column_means_udf();
let (tensor_return_field, tensor_result) = invoke_udf(
&tensor_sum_udf,
vec![ColumnarValue::Array(Arc::new(tensor.clone()))],
vec![Arc::clone(&tensor_field)],
&[None],
2,
)
.expect("tensor_sum_last_axis");
let tensor_result = fixed_size_list_array(&tensor_result);
let tensor_result = ndarrow::fixed_shape_tensor_as_array_viewd::<Float64Type>(
tensor_return_field.as_ref(),
tensor_result,
)
.expect("tensor reduction")
.into_dimensionality::<Ix2>()
.expect("rank-2 tensor");
assert_close(tensor_result[[0, 0]], 3.0);
assert_close(tensor_result[[0, 1]], 7.0);
assert_close(tensor_result[[1, 0]], 11.0);
assert_close(tensor_result[[1, 1]], 15.0);
let (_, means) = invoke_udf(
&column_means_udf,
vec![ColumnarValue::Array(Arc::new(tensor))],
vec![Arc::clone(&tensor_field)],
&[None],
2,
)
.expect("matrix_column_means");
let means = fixed_size_list_array(&means);
let means = ndarrow::fixed_size_list_as_array2::<Float64Type>(means).expect("means");
assert_close(means[[0, 0]], 2.0);
assert_close(means[[0, 1]], 3.0);
assert_close(means[[1, 0]], 6.0);
assert_close(means[[1, 1]], 7.0);
}
#[test]
fn linear_regression_returns_struct_result() {
let (design_field, design) =
matrix_batch("design", [[[1.0, 0.0], [0.0, 1.0], [1.0, 1.0]], [[1.0, 0.0], [0.0, 1.0], [
1.0, 1.0,
]]]);
let response = fixed_size_list([[1.0, 2.0, 3.0], [2.0, 1.0, 3.0]]);
let response_field =
vector_field("response", &DataType::Float64, 3, false).expect("response field");
let linear_regression_udf = udfs::linear_regression_udf();
let (_, regression) = invoke_udf(
&linear_regression_udf,
vec![
ColumnarValue::Array(Arc::new(design)),
ColumnarValue::Array(Arc::new(response)),
ColumnarValue::Scalar(ScalarValue::Boolean(Some(false))),
],
vec![
design_field,
response_field,
Arc::new(Field::new("add_intercept", DataType::Boolean, false)),
],
&[None, None, Some(ScalarValue::Boolean(Some(false)))],
2,
)
.expect("linear_regression");
let regression = struct_array(®ression);
let coefficients =
regression.column(0).as_any().downcast_ref::<FixedSizeListArray>().expect("coefficients");
let coefficients =
ndarrow::fixed_size_list_as_array2::<Float64Type>(coefficients).expect("coefficients");
let fitted =
regression.column(1).as_any().downcast_ref::<FixedSizeListArray>().expect("fitted");
let fitted = ndarrow::fixed_size_list_as_array2::<Float64Type>(fitted).expect("fitted");
let residuals =
regression.column(2).as_any().downcast_ref::<FixedSizeListArray>().expect("residuals");
let residuals =
ndarrow::fixed_size_list_as_array2::<Float64Type>(residuals).expect("residuals");
let r_squared =
regression.column(3).as_any().downcast_ref::<Float64Array>().expect("r_squared");
assert_close(coefficients[[0, 0]], 1.0);
assert_close(coefficients[[0, 1]], 2.0);
assert_close(coefficients[[1, 0]], 2.0);
assert_close(coefficients[[1, 1]], 1.0);
assert_close(fitted[[0, 0]], 1.0);
assert_close(fitted[[0, 1]], 2.0);
assert_close(fitted[[0, 2]], 3.0);
assert_close(fitted[[1, 0]], 2.0);
assert_close(fitted[[1, 1]], 1.0);
assert_close(fitted[[1, 2]], 3.0);
assert_close(residuals[[0, 0]], 0.0);
assert_close(residuals[[0, 1]], 0.0);
assert_close(residuals[[0, 2]], 0.0);
assert_close(residuals[[1, 0]], 0.0);
assert_close(residuals[[1, 1]], 0.0);
assert_close(residuals[[1, 2]], 0.0);
assert_close(r_squared.value(0), 1.0);
assert_close(r_squared.value(1), 1.0);
}
#[test]
fn matrix_inverse_and_determinant_udfs_cover_scalar_outputs() {
let (field, matrices) =
matrix_batch("matrix", [[[9.0, 0.0], [0.0, 4.0]], [[16.0, 0.0], [0.0, 1.0]]]);
let inverse_udf = udfs::matrix_inverse_udf();
let determinant_udf = udfs::matrix_determinant_udf();
let log_determinant_udf = udfs::matrix_log_determinant_udf();
let (inverse_field, inverse) = invoke_udf(
&inverse_udf,
vec![ColumnarValue::Array(Arc::new(matrices.clone()))],
vec![Arc::clone(&field)],
&[None],
2,
)
.expect("matrix_inverse");
let inverse = fixed_shape_view3(&inverse_field, &inverse);
assert_close(inverse[[0, 0, 0]], 1.0 / 9.0);
assert_close(inverse[[0, 1, 1]], 0.25);
assert_close(inverse[[1, 0, 0]], 1.0 / 16.0);
assert_close(inverse[[1, 1, 1]], 1.0);
let (_, determinant) = invoke_udf(
&determinant_udf,
vec![ColumnarValue::Array(Arc::new(matrices.clone()))],
vec![Arc::clone(&field)],
&[None],
2,
)
.expect("matrix_determinant");
assert_close(f64_array(&determinant).value(0), 36.0);
assert_close(f64_array(&determinant).value(1), 16.0);
let (_, log_determinant) = invoke_udf(
&log_determinant_udf,
vec![ColumnarValue::Array(Arc::new(matrices.clone()))],
vec![Arc::clone(&field)],
&[None],
2,
)
.expect("matrix_log_determinant");
let log_determinant = struct_array(&log_determinant);
let signs = log_determinant.column(0).as_any().downcast_ref::<Int8Array>().expect("signs");
let log_abs =
log_determinant.column(1).as_any().downcast_ref::<Float64Array>().expect("log abs");
assert_eq!(signs.value(0), 1);
assert_eq!(signs.value(1), 1);
assert_close(log_abs.value(0), 36.0_f64.ln());
assert_close(log_abs.value(1), 16.0_f64.ln());
}
#[test]
fn matrix_cholesky_udfs_cover_struct_solver_and_inverse_outputs() {
let (field, matrices) =
matrix_batch("matrix", [[[9.0, 0.0], [0.0, 4.0]], [[16.0, 0.0], [0.0, 1.0]]]);
let rhs = fixed_size_list([[9.0, 8.0], [32.0, 3.0]]);
let rhs_field = vector_field("rhs", &DataType::Float64, 2, false).expect("rhs field");
let cholesky_udf = udfs::matrix_cholesky_udf();
let cholesky_solve_udf = udfs::matrix_cholesky_solve_udf();
let cholesky_inverse_udf = udfs::matrix_cholesky_inverse_udf();
let (cholesky_return_field, cholesky) = invoke_udf(
&cholesky_udf,
vec![ColumnarValue::Array(Arc::new(matrices.clone()))],
vec![Arc::clone(&field)],
&[None],
2,
)
.expect("matrix_cholesky");
let DataType::Struct(cholesky_fields) = cholesky_return_field.data_type() else {
panic!("expected struct return type");
};
let cholesky = struct_array(&cholesky);
let lower =
cholesky.column(0).as_any().downcast_ref::<FixedSizeListArray>().expect("lower tensor");
let lower =
ndarrow::fixed_shape_tensor_as_array_viewd::<Float64Type>(&cholesky_fields[0], lower)
.expect("lower tensor")
.into_dimensionality::<Ix3>()
.expect("rank-3 tensor");
assert_close(lower[[0, 0, 0]], 3.0);
assert_close(lower[[0, 1, 1]], 2.0);
assert_close(lower[[1, 0, 0]], 4.0);
assert_close(lower[[1, 1, 1]], 1.0);
let (_, cholesky_solution) = invoke_udf(
&cholesky_solve_udf,
vec![ColumnarValue::Array(Arc::new(matrices.clone())), ColumnarValue::Array(Arc::new(rhs))],
vec![Arc::clone(&field), rhs_field],
&[None, None],
2,
)
.expect("matrix_cholesky_solve");
let cholesky_solution = ndarrow::fixed_size_list_as_array2::<Float64Type>(
fixed_size_list_array(&cholesky_solution),
)
.expect("cholesky solution");
assert_close(cholesky_solution[[0, 0]], 1.0);
assert_close(cholesky_solution[[0, 1]], 2.0);
assert_close(cholesky_solution[[1, 0]], 2.0);
assert_close(cholesky_solution[[1, 1]], 3.0);
let (cholesky_inverse_field, cholesky_inverse) = invoke_udf(
&cholesky_inverse_udf,
vec![ColumnarValue::Array(Arc::new(matrices.clone()))],
vec![Arc::clone(&field)],
&[None],
2,
)
.expect("matrix_cholesky_inverse");
let cholesky_inverse = fixed_shape_view3(&cholesky_inverse_field, &cholesky_inverse);
assert_close(cholesky_inverse[[0, 0, 0]], 1.0 / 9.0);
assert_close(cholesky_inverse[[0, 1, 1]], 0.25);
assert_close(cholesky_inverse[[1, 0, 0]], 1.0 / 16.0);
assert_close(cholesky_inverse[[1, 1, 1]], 1.0);
}
#[test]
fn matrix_qr_and_svd_udfs_cover_struct_outputs() {
let (field, matrices) =
matrix_batch("matrix", [[[9.0, 0.0], [0.0, 4.0]], [[16.0, 0.0], [0.0, 1.0]]]);
let qr_udf = udfs::matrix_qr_udf();
let svd_udf = udfs::matrix_svd_udf();
let (qr_return_field, qr) = invoke_udf(
&qr_udf,
vec![ColumnarValue::Array(Arc::new(matrices.clone()))],
vec![Arc::clone(&field)],
&[None],
2,
)
.expect("matrix_qr");
let DataType::Struct(qr_fields) = qr_return_field.data_type() else {
panic!("expected QR struct return");
};
let qr = struct_array(&qr);
let q = qr.column(0).as_any().downcast_ref::<FixedSizeListArray>().expect("q tensor");
let q = ndarrow::fixed_shape_tensor_as_array_viewd::<Float64Type>(&qr_fields[0], q)
.expect("q tensor")
.into_dimensionality::<Ix3>()
.expect("rank-3 tensor");
let r = qr.column(1).as_any().downcast_ref::<FixedSizeListArray>().expect("r tensor");
let r = ndarrow::fixed_shape_tensor_as_array_viewd::<Float64Type>(&qr_fields[1], r)
.expect("r tensor")
.into_dimensionality::<Ix3>()
.expect("rank-3 tensor");
let rank = qr.column(2).as_any().downcast_ref::<Int64Array>().expect("rank");
assert_close(q[[0, 0, 0]], 1.0);
assert_close(q[[0, 1, 1]], 1.0);
assert_close(r[[0, 0, 0]], 9.0);
assert_close(r[[0, 1, 1]], 4.0);
assert_eq!(rank.value(0), 2);
assert_eq!(rank.value(1), 2);
let (svd_return_field, svd) = invoke_udf(
&svd_udf,
vec![ColumnarValue::Array(Arc::new(matrices))],
vec![field],
&[None],
2,
)
.expect("matrix_svd");
let DataType::Struct(svd_fields) = svd_return_field.data_type() else {
panic!("expected SVD struct return");
};
let svd = struct_array(&svd);
let u = svd.column(0).as_any().downcast_ref::<FixedSizeListArray>().expect("u tensor");
let u = ndarrow::fixed_shape_tensor_as_array_viewd::<Float64Type>(&svd_fields[0], u)
.expect("u tensor")
.into_dimensionality::<Ix3>()
.expect("rank-3 tensor");
let singular_values =
svd.column(1).as_any().downcast_ref::<FixedSizeListArray>().expect("singular values");
let singular_values = ndarrow::fixed_size_list_as_array2::<Float64Type>(singular_values)
.expect("singular values");
let vt = svd.column(2).as_any().downcast_ref::<FixedSizeListArray>().expect("vt tensor");
let vt = ndarrow::fixed_shape_tensor_as_array_viewd::<Float64Type>(&svd_fields[2], vt)
.expect("vt tensor")
.into_dimensionality::<Ix3>()
.expect("rank-3 tensor");
assert_close(u[[0, 0, 0]], 1.0);
assert_close(u[[0, 1, 1]], 1.0);
assert_close(singular_values[[0, 0]], 9.0);
assert_close(singular_values[[0, 1]], 4.0);
assert_close(singular_values[[1, 0]], 16.0);
assert_close(singular_values[[1, 1]], 1.0);
assert_close(vt[[0, 0, 0]], 1.0);
assert_close(vt[[0, 1, 1]], 1.0);
}
#[test]
fn matrix_qr_variant_udfs_cover_struct_outputs() {
let qr_reduced_udf = udfs::matrix_qr_reduced_udf();
let qr_pivoted_udf = udfs::matrix_qr_pivoted_udf();
let (reduced_field, reduced_matrices) =
matrix_batch("qr_reduced", [[[1.0, 0.0], [0.0, 2.0], [0.0, 0.0]]]);
let (reduced_return_field, reduced_output) = invoke_udf(
&qr_reduced_udf,
vec![ColumnarValue::Array(Arc::new(reduced_matrices))],
vec![reduced_field],
&[None],
1,
)
.expect("matrix_qr_reduced");
let DataType::Struct(reduced_fields) = reduced_return_field.data_type() else {
panic!("expected reduced QR struct return");
};
let reduced = struct_array(&reduced_output);
let q = reduced.column(0).as_any().downcast_ref::<FixedSizeListArray>().expect("q tensor");
let q = ndarrow::fixed_shape_tensor_as_array_viewd::<Float64Type>(&reduced_fields[0], q)
.expect("q tensor")
.into_dimensionality::<Ix3>()
.expect("rank-3 tensor");
let r = reduced.column(1).as_any().downcast_ref::<FixedSizeListArray>().expect("r tensor");
let r = ndarrow::fixed_shape_tensor_as_array_viewd::<Float64Type>(&reduced_fields[1], r)
.expect("r tensor")
.into_dimensionality::<Ix3>()
.expect("rank-3 tensor");
let rank = reduced.column(2).as_any().downcast_ref::<Int64Array>().expect("rank");
assert_eq!(q.dim(), (1, 3, 2));
assert_eq!(r.dim(), (1, 2, 2));
assert_eq!(rank.value(0), 2);
assert_close(r[[0, 0, 0]], 1.0);
assert_close(r[[0, 1, 1]], 2.0);
let (pivoted_field, pivoted_matrices) = matrix_batch("qr_pivoted", [[[1.0, 0.0], [0.0, 3.0]]]);
let (pivoted_return_field, pivoted_output) = invoke_udf(
&qr_pivoted_udf,
vec![ColumnarValue::Array(Arc::new(pivoted_matrices))],
vec![pivoted_field],
&[None],
1,
)
.expect("matrix_qr_pivoted");
let DataType::Struct(pivoted_fields) = pivoted_return_field.data_type() else {
panic!("expected pivoted QR struct return");
};
let pivoted = struct_array(&pivoted_output);
let permutation =
pivoted.column(2).as_any().downcast_ref::<FixedSizeListArray>().expect("p tensor");
let permutation =
ndarrow::fixed_shape_tensor_as_array_viewd::<Float64Type>(&pivoted_fields[2], permutation)
.expect("p tensor")
.into_dimensionality::<Ix3>()
.expect("rank-3 tensor");
let rank = pivoted.column(3).as_any().downcast_ref::<Int64Array>().expect("rank");
assert_eq!(rank.value(0), 2);
assert_close(permutation[[0, 0, 1]], 1.0);
assert_close(permutation[[0, 1, 0]], 1.0);
}
#[test]
fn matrix_svd_variant_udfs_cover_struct_and_variable_outputs() {
let svd_truncated_udf = udfs::matrix_svd_truncated_udf();
let svd_tolerance_udf = udfs::matrix_svd_with_tolerance_udf();
let svd_null_space_udf = udfs::matrix_svd_null_space_udf();
let (truncated_field, truncated_matrices) =
matrix_batch("svd_truncated", [[[9.0, 0.0], [0.0, 4.0]]]);
let (truncated_return_field, truncated_output) = invoke_udf(
&svd_truncated_udf,
vec![
ColumnarValue::Array(Arc::new(truncated_matrices)),
ColumnarValue::Scalar(ScalarValue::Int64(Some(1))),
],
vec![truncated_field, Arc::new(Field::new("k", DataType::Int64, false))],
&[None, Some(ScalarValue::Int64(Some(1)))],
1,
)
.expect("matrix_svd_truncated");
let DataType::Struct(truncated_fields) = truncated_return_field.data_type() else {
panic!("expected truncated SVD struct return");
};
let truncated = struct_array(&truncated_output);
let u = truncated.column(0).as_any().downcast_ref::<FixedSizeListArray>().expect("u tensor");
let u = ndarrow::fixed_shape_tensor_as_array_viewd::<Float64Type>(&truncated_fields[0], u)
.expect("u tensor")
.into_dimensionality::<Ix3>()
.expect("rank-3 tensor");
let singular_values = truncated
.column(1)
.as_any()
.downcast_ref::<FixedSizeListArray>()
.expect("singular values");
let singular_values = ndarrow::fixed_size_list_as_array2::<Float64Type>(singular_values)
.expect("singular values");
let vt = truncated.column(2).as_any().downcast_ref::<FixedSizeListArray>().expect("vt tensor");
let vt = ndarrow::fixed_shape_tensor_as_array_viewd::<Float64Type>(&truncated_fields[2], vt)
.expect("vt tensor")
.into_dimensionality::<Ix3>()
.expect("rank-3 tensor");
assert_eq!(u.dim(), (1, 2, 1));
assert_eq!(vt.dim(), (1, 1, 2));
assert_close(singular_values[[0, 0]], 9.0);
let (tolerance_field, tolerance_matrices) =
matrix_batch("svd_tolerance", [[[5.0, 0.0], [0.0, 1.0]]]);
let (tolerance_return_field, tolerance_output) = invoke_udf(
&svd_tolerance_udf,
vec![
ColumnarValue::Array(Arc::new(tolerance_matrices)),
ColumnarValue::Scalar(ScalarValue::Float64(Some(2.0))),
],
vec![tolerance_field, Arc::new(Field::new("tolerance", DataType::Float64, false))],
&[None, Some(ScalarValue::Float64(Some(2.0)))],
1,
)
.expect("matrix_svd_with_tolerance");
let DataType::Struct(tolerance_fields) = tolerance_return_field.data_type() else {
panic!("expected tolerance SVD struct return");
};
let tolerance = struct_array(&tolerance_output);
let singular_values = tolerance
.column(1)
.as_any()
.downcast_ref::<FixedSizeListArray>()
.expect("singular values");
let singular_values = ndarrow::fixed_size_list_as_array2::<Float64Type>(singular_values)
.expect("singular values");
let vt = tolerance.column(2).as_any().downcast_ref::<FixedSizeListArray>().expect("vt tensor");
let vt = ndarrow::fixed_shape_tensor_as_array_viewd::<Float64Type>(&tolerance_fields[2], vt)
.expect("vt tensor")
.into_dimensionality::<Ix3>()
.expect("rank-3 tensor");
assert_eq!(vt.dim(), (1, 2, 2));
assert_close(singular_values[[0, 0]], 5.0);
assert_close(singular_values[[0, 1]], 0.0);
let (null_space_field, null_space_matrices) =
matrix_batch("null_space", [[[1.0, 1.0], [1.0, 1.0]], [[0.0, 0.0], [0.0, 0.0]]]);
let (null_space_return_field, null_space_output) = invoke_udf(
&svd_null_space_udf,
vec![ColumnarValue::Array(Arc::new(null_space_matrices))],
vec![null_space_field],
&[None],
2,
)
.expect("matrix_svd_null_space");
let mut rows = variable_shape_rows(&null_space_return_field, &null_space_output);
let (_, basis0) = rows.next().expect("basis0").expect("basis0 tensor");
let (_, basis1) = rows.next().expect("basis1").expect("basis1 tensor");
assert_eq!(basis0.shape(), &[2, 1]);
assert_eq!(basis1.shape(), &[2, 2]);
}
#[test]
fn matrix_svd_variant_udfs_validate_scalar_contracts() {
let svd_truncated_udf = udfs::matrix_svd_truncated_udf();
let svd_tolerance_udf = udfs::matrix_svd_with_tolerance_udf();
let (field, matrices) = matrix_batch("svd", [[[1.0, 0.0], [0.0, 1.0]]]);
let k_field = Arc::new(Field::new("k", DataType::Int64, false));
let tolerance_field = Arc::new(Field::new("tolerance", DataType::Float64, false));
let zero_k_error = invoke_udf_error(
&svd_truncated_udf,
vec![
ColumnarValue::Array(Arc::new(matrices.clone())),
ColumnarValue::Scalar(ScalarValue::Int64(Some(0))),
],
vec![Arc::clone(&field), Arc::clone(&k_field)],
&[None, Some(ScalarValue::Int64(Some(0)))],
1,
);
let tolerance_error = invoke_udf_error(
&svd_tolerance_udf,
vec![
ColumnarValue::Array(Arc::new(matrices.clone())),
ColumnarValue::Scalar(ScalarValue::Float64(Some(-1.0))),
],
vec![Arc::clone(&field), Arc::clone(&tolerance_field)],
&[None, Some(ScalarValue::Float64(Some(-1.0)))],
1,
);
let scalar_error = invoke_udf_error(
&svd_truncated_udf,
vec![
ColumnarValue::Array(Arc::new(matrices)),
ColumnarValue::Array(Arc::new(Int64Array::from(vec![1_i64]))),
],
vec![field, k_field],
&[None, Some(ScalarValue::Int64(Some(1)))],
1,
);
assert!(zero_k_error.contains("k must be greater than 0"));
assert!(tolerance_error.contains("tolerance must be non-negative"));
assert!(scalar_error.contains("argument 2 must be an integer scalar"));
}
#[test]
fn matrix_decomposition_variant_udfs_cover_float32_branches() {
let (qr_field, qr_matrices) =
matrix_batch_f32("qr_rect", [[[1.0, 0.0], [0.0, 2.0], [0.0, 0.0]]]);
for udf in [
udfs::matrix_qr_reduced_udf(),
udfs::matrix_qr_pivoted_udf(),
udfs::matrix_qr_reconstruct_udf(),
] {
let (field, output) = invoke_udf(
&udf,
vec![ColumnarValue::Array(Arc::new(qr_matrices.clone()))],
vec![Arc::clone(&qr_field)],
&[None],
1,
)
.unwrap_or_else(|error| panic!("{} f32: {error}", udf.name()));
assert_eq!(array_data_type(&output), field.data_type());
}
let (svd_field, svd_matrices) = matrix_batch_f32("svd", [[[4.0, 0.0], [0.0, 2.0]]]);
let svd_cases = [
(
udfs::matrix_svd_truncated_udf(),
Arc::clone(&svd_field),
svd_matrices.clone(),
vec![ColumnarValue::Scalar(ScalarValue::Int64(Some(1)))],
vec![Arc::new(Field::new("k", DataType::Int64, false))],
),
(
udfs::matrix_svd_with_tolerance_udf(),
Arc::clone(&svd_field),
svd_matrices.clone(),
vec![ColumnarValue::Scalar(ScalarValue::Float64(Some(1.0)))],
vec![Arc::new(Field::new("tolerance", DataType::Float64, false))],
),
];
for (udf, field, values, scalar_args, scalar_fields) in svd_cases {
let mut args = vec![ColumnarValue::Array(Arc::new(values))];
args.extend(scalar_args.clone());
let mut arg_fields = vec![field];
arg_fields.extend(scalar_fields);
let scalar_refs = scalar_args
.into_iter()
.map(|value| match value {
ColumnarValue::Scalar(value) => Some(value),
ColumnarValue::Array(_) => None,
})
.collect::<Vec<_>>();
let mut refs = vec![None];
refs.extend(scalar_refs);
let (field, output) = invoke_udf(&udf, args, arg_fields, &refs, 1)
.unwrap_or_else(|error| panic!("{} f32: {error}", udf.name()));
assert_eq!(array_data_type(&output), field.data_type());
}
let (null_space_field, null_space_matrices) =
matrix_batch_f32("null_space", [[[1.0, 1.0], [1.0, 1.0]]]);
let (field, output) = invoke_udf(
&udfs::matrix_svd_null_space_udf(),
vec![ColumnarValue::Array(Arc::new(null_space_matrices))],
vec![null_space_field],
&[None],
1,
)
.expect("matrix_svd_null_space f32");
assert_eq!(array_data_type(&output), field.data_type());
}
#[test]
fn matrix_spectral_and_orthogonalization_udfs_cover_outputs() {
let (spectral_field, spectral_matrices) = matrix_batch("spectral", [[[4.0, 0.0], [0.0, 9.0]]]);
let (identity_field, identity_matrices) = matrix_batch("identity", [[[1.0, 0.0], [0.0, 1.0]]]);
let (spectral_return_field, spectral_output) = invoke_udf(
&udfs::matrix_eigen_symmetric_udf(),
vec![ColumnarValue::Array(Arc::new(spectral_matrices.clone()))],
vec![Arc::clone(&spectral_field)],
&[None],
1,
)
.expect("matrix_eigen_symmetric");
assert_eigen_struct_output(&spectral_return_field, &spectral_output, 4.0, 9.0);
let (generalized_return_field, generalized_output) = invoke_udf(
&udfs::matrix_eigen_generalized_udf(),
vec![
ColumnarValue::Array(Arc::new(spectral_matrices.clone())),
ColumnarValue::Array(Arc::new(identity_matrices)),
],
vec![Arc::clone(&spectral_field), identity_field],
&[None, None],
1,
)
.expect("matrix_eigen_generalized");
assert_eigen_struct_output(&generalized_return_field, &generalized_output, 4.0, 9.0);
let (balanced_return_field, balanced_output) = invoke_udf(
&udfs::matrix_balance_nonsymmetric_udf(),
vec![ColumnarValue::Array(Arc::new(spectral_matrices.clone()))],
vec![Arc::clone(&spectral_field)],
&[None],
1,
)
.expect("matrix_balance_nonsymmetric");
assert_tensor_vector_struct_output(&balanced_return_field, &balanced_output, [4.0, 9.0]);
let (schur_return_field, schur_output) = invoke_udf(
&udfs::matrix_schur_udf(),
vec![ColumnarValue::Array(Arc::new(spectral_matrices.clone()))],
vec![Arc::clone(&spectral_field)],
&[None],
1,
)
.expect("matrix_schur");
assert_two_tensor_struct_output(&schur_return_field, &schur_output, 4.0, 9.0);
let (polar_return_field, polar_output) = invoke_udf(
&udfs::matrix_polar_udf(),
vec![ColumnarValue::Array(Arc::new(spectral_matrices.clone()))],
vec![Arc::clone(&spectral_field)],
&[None],
1,
)
.expect("matrix_polar");
assert_two_tensor_struct_output(&polar_return_field, &polar_output, 4.0, 9.0);
for udf in [udfs::matrix_gram_schmidt_udf(), udfs::matrix_gram_schmidt_classic_udf()] {
let (field, output) = invoke_udf(
&udf,
vec![ColumnarValue::Array(Arc::new(spectral_matrices.clone()))],
vec![Arc::clone(&spectral_field)],
&[None],
1,
)
.unwrap_or_else(|error| panic!("{} output: {error}", udf.name()));
assert_orthogonal_matrix_output(&field, &output);
}
}
#[test]
fn matrix_spectral_and_orthogonalization_udfs_cover_float32_branches() {
let (spectral_field, spectral_matrices) =
matrix_batch_f32("spectral", [[[4.0, 0.0], [0.0, 9.0]]]);
let (identity_field, identity_matrices) =
matrix_batch_f32("identity", [[[1.0, 0.0], [0.0, 1.0]]]);
for udf in [
udfs::matrix_eigen_symmetric_udf(),
udfs::matrix_balance_nonsymmetric_udf(),
udfs::matrix_schur_udf(),
udfs::matrix_polar_udf(),
udfs::matrix_gram_schmidt_udf(),
udfs::matrix_gram_schmidt_classic_udf(),
] {
let (field, output) = invoke_udf(
&udf,
vec![ColumnarValue::Array(Arc::new(spectral_matrices.clone()))],
vec![Arc::clone(&spectral_field)],
&[None],
1,
)
.unwrap_or_else(|error| panic!("{} f32: {error}", udf.name()));
assert_eq!(array_data_type(&output), field.data_type());
}
let (field, output) = invoke_udf(
&udfs::matrix_eigen_generalized_udf(),
vec![
ColumnarValue::Array(Arc::new(spectral_matrices)),
ColumnarValue::Array(Arc::new(identity_matrices)),
],
vec![spectral_field, identity_field],
&[None, None],
1,
)
.expect("matrix_eigen_generalized f32");
assert_eq!(array_data_type(&output), field.data_type());
}
#[test]
fn matrix_spectral_udfs_validate_square_and_pair_contracts() {
let (rect_field, rect_matrices) = matrix_batch("rect", [[[1.0, 0.0], [0.0, 1.0], [0.0, 0.0]]]);
for udf in [
udfs::matrix_eigen_symmetric_udf(),
udfs::matrix_balance_nonsymmetric_udf(),
udfs::matrix_schur_udf(),
udfs::matrix_polar_udf(),
] {
let error = invoke_udf_error(
&udf,
vec![ColumnarValue::Array(Arc::new(rect_matrices.clone()))],
vec![Arc::clone(&rect_field)],
&[None],
1,
);
assert!(error.contains("requires square matrices"), "{}: {error}", udf.name());
}
let (left_field, left_matrices) = matrix_batch("left", [[[4.0, 0.0], [0.0, 9.0]]]);
let (right_field, right_matrices) = matrix_batch_f32("right", [[[1.0, 0.0], [0.0, 1.0]]]);
let value_type_error = invoke_udf_error(
&udfs::matrix_eigen_generalized_udf(),
vec![
ColumnarValue::Array(Arc::new(left_matrices.clone())),
ColumnarValue::Array(Arc::new(right_matrices)),
],
vec![Arc::clone(&left_field), right_field],
&[None, None],
1,
);
assert!(value_type_error.contains("matrix value type mismatch"));
let (shape_field, shape_matrices) =
matrix_batch("shape", [[[1.0, 0.0, 0.0], [0.0, 1.0, 0.0], [0.0, 0.0, 1.0]]]);
let shape_error = invoke_udf_error(
&udfs::matrix_eigen_generalized_udf(),
vec![
ColumnarValue::Array(Arc::new(left_matrices)),
ColumnarValue::Array(Arc::new(shape_matrices)),
],
vec![left_field, shape_field],
&[None, None],
1,
);
assert!(shape_error.contains("matrix shape mismatch"));
}
#[test]
fn matrix_solve_sylvester_udf_covers_f32_contract() {
let (left_f32_field, left_f32) = matrix_batch_f32("left_f32", [[[1.0, 0.0], [0.0, 2.0]]]);
let (right_f32_field, right_f32) = matrix_batch_f32("right_f32", [[[3.0, 0.0], [0.0, 4.0]]]);
let (constant_f32_field, constant_f32) =
matrix_batch_f32("constant_f32", [[[5.0, 6.0], [7.0, 8.0]]]);
let (sylvester_f32_field, sylvester_f32_output) = invoke_udf(
&udfs::matrix_solve_sylvester_udf(),
vec![
ColumnarValue::Array(Arc::new(left_f32.clone())),
ColumnarValue::Array(Arc::new(right_f32.clone())),
ColumnarValue::Array(Arc::new(constant_f32.clone())),
],
vec![
Arc::clone(&left_f32_field),
Arc::clone(&right_f32_field),
Arc::clone(&constant_f32_field),
],
&[None, None, None],
1,
)
.expect("matrix_solve_sylvester f32");
let sylvester_f32 = ndarrow::fixed_shape_tensor_as_array_viewd::<Float32Type>(
sylvester_f32_field.as_ref(),
fixed_size_list_array(&sylvester_f32_output),
)
.expect("sylvester f32 output")
.into_dimensionality::<Ix3>()
.expect("rank-3 sylvester f32 output");
let left_f32_view = ndarrow::fixed_shape_tensor_as_array_viewd::<Float32Type>(
left_f32_field.as_ref(),
&left_f32,
)
.expect("left f32")
.into_dimensionality::<Ix3>()
.expect("rank-3 left f32");
let right_f32_view = ndarrow::fixed_shape_tensor_as_array_viewd::<Float32Type>(
right_f32_field.as_ref(),
&right_f32,
)
.expect("right f32")
.into_dimensionality::<Ix3>()
.expect("rank-3 right f32");
let constant_f32_view = ndarrow::fixed_shape_tensor_as_array_viewd::<Float32Type>(
constant_f32_field.as_ref(),
&constant_f32,
)
.expect("constant f32")
.into_dimensionality::<Ix3>()
.expect("rank-3 constant f32");
let expected_f32 = nabled::linalg::sylvester::solve_sylvester_view(
&left_f32_view.index_axis(Axis(0), 0),
&right_f32_view.index_axis(Axis(0), 0),
&constant_f32_view.index_axis(Axis(0), 0),
)
.expect("expected sylvester f32");
for i in 0..expected_f32.nrows() {
for j in 0..expected_f32.ncols() {
assert_close(f64::from(sylvester_f32[[0, i, j]]), f64::from(expected_f32[[i, j]]));
}
}
}
#[test]
fn matrix_solve_sylvester_mixed_f64_udf_covers_struct_output() {
let (left_f64_field, left_f64) = matrix_batch("left_f64", [[[1.0, 0.0], [0.0, 2.0]]]);
let (right_f64_field, right_f64) = matrix_batch("right_f64", [[[3.0, 0.0], [0.0, 4.0]]]);
let (constant_f64_field, constant_f64) =
matrix_batch("constant_f64", [[[5.0, 6.0], [7.0, 8.0]]]);
let mixed_f64_result = invoke_udf(
&udfs::matrix_solve_sylvester_mixed_f64_udf(),
vec![
ColumnarValue::Array(Arc::new(left_f64.clone())),
ColumnarValue::Array(Arc::new(right_f64.clone())),
ColumnarValue::Array(Arc::new(constant_f64.clone())),
],
vec![
Arc::clone(&left_f64_field),
Arc::clone(&right_f64_field),
Arc::clone(&constant_f64_field),
],
&[None, None, None],
1,
);
if !cfg!(feature = "magma-system") {
let error = mixed_f64_result.expect_err("mixed f64 should require magma-system");
assert!(error.to_string().contains("magma-system"));
return;
}
let (mixed_f64_field, mixed_f64_output) =
mixed_f64_result.expect("matrix_solve_sylvester_mixed_f64");
let DataType::Struct(mixed_f64_fields) = mixed_f64_field.data_type() else {
panic!("expected mixed f64 struct output");
};
let mixed_f64_output = struct_array(&mixed_f64_output);
let mixed_f64_solution = ndarrow::fixed_shape_tensor_as_array_viewd::<Float64Type>(
&mixed_f64_fields[0],
mixed_f64_output
.column(0)
.as_any()
.downcast_ref::<FixedSizeListArray>()
.expect("mixed f64 solution"),
)
.expect("mixed f64 solution")
.into_dimensionality::<Ix3>()
.expect("rank-3 mixed f64 solution");
let left_f64_view = ndarrow::fixed_shape_tensor_as_array_viewd::<Float64Type>(
left_f64_field.as_ref(),
&left_f64,
)
.expect("left f64")
.into_dimensionality::<Ix3>()
.expect("rank-3 left f64");
let right_f64_view = ndarrow::fixed_shape_tensor_as_array_viewd::<Float64Type>(
right_f64_field.as_ref(),
&right_f64,
)
.expect("right f64")
.into_dimensionality::<Ix3>()
.expect("rank-3 right f64");
let constant_f64_view = ndarrow::fixed_shape_tensor_as_array_viewd::<Float64Type>(
constant_f64_field.as_ref(),
&constant_f64,
)
.expect("constant f64")
.into_dimensionality::<Ix3>()
.expect("rank-3 constant f64");
let expected_mixed_f64 = nabled::linalg::sylvester::solve_sylvester_mixed_f64_view(
&left_f64_view.index_axis(Axis(0), 0),
&right_f64_view.index_axis(Axis(0), 0),
&constant_f64_view.index_axis(Axis(0), 0),
)
.expect("expected mixed f64");
for i in 0..expected_mixed_f64.solution.nrows() {
for j in 0..expected_mixed_f64.solution.ncols() {
assert_close(mixed_f64_solution[[0, i, j]], expected_mixed_f64.solution[[i, j]]);
}
}
}
#[test]
fn matrix_solve_sylvester_complex_udf_covers_complex_contract() {
let (left_complex_field, left_complex) = complex_matrix_batch("left_complex", [[
[Complex64::new(1.0, 0.0), Complex64::new(0.0, 0.0)],
[Complex64::new(0.0, 0.0), Complex64::new(2.0, 0.0)],
]]);
let (right_complex_field, right_complex) = complex_matrix_batch("right_complex", [[
[Complex64::new(3.0, 0.0), Complex64::new(0.0, 0.0)],
[Complex64::new(0.0, 0.0), Complex64::new(4.0, 0.0)],
]]);
let (constant_complex_field, constant_complex) = complex_matrix_batch("constant_complex", [[
[Complex64::new(5.0, 1.0), Complex64::new(6.0, 0.0)],
[Complex64::new(7.0, -1.0), Complex64::new(8.0, 0.0)],
]]);
let (complex_field, complex_output) = invoke_udf(
&udfs::matrix_solve_sylvester_complex_udf(),
vec![
ColumnarValue::Array(Arc::new(left_complex.clone())),
ColumnarValue::Array(Arc::new(right_complex.clone())),
ColumnarValue::Array(Arc::new(constant_complex.clone())),
],
vec![
Arc::clone(&left_complex_field),
Arc::clone(&right_complex_field),
Arc::clone(&constant_complex_field),
],
&[None, None, None],
1,
)
.expect("matrix_solve_sylvester_complex");
let complex_solution = complex_fixed_shape_view3(&complex_field, &complex_output);
let left_complex_view = complex_matrix_view(&left_complex_field, &left_complex);
let right_complex_view = complex_matrix_view(&right_complex_field, &right_complex);
let constant_complex_view = complex_matrix_view(&constant_complex_field, &constant_complex);
let expected_complex = nabled::linalg::sylvester::solve_sylvester_complex_view(
&left_complex_view.index_axis(Axis(0), 0),
&right_complex_view.index_axis(Axis(0), 0),
&constant_complex_view.index_axis(Axis(0), 0),
)
.expect("expected complex sylvester");
for i in 0..expected_complex.nrows() {
for j in 0..expected_complex.ncols() {
assert_complex_close(complex_solution[[0, i, j]], expected_complex[[i, j]]);
}
}
}
#[test]
fn matrix_solve_sylvester_mixed_complex_udf_covers_struct_output() {
let (left_complex_field, left_complex) = complex_matrix_batch("left_complex", [[
[Complex64::new(1.0, 0.0), Complex64::new(0.0, 0.0)],
[Complex64::new(0.0, 0.0), Complex64::new(2.0, 0.0)],
]]);
let (right_complex_field, right_complex) = complex_matrix_batch("right_complex", [[
[Complex64::new(3.0, 0.0), Complex64::new(0.0, 0.0)],
[Complex64::new(0.0, 0.0), Complex64::new(4.0, 0.0)],
]]);
let (constant_complex_field, constant_complex) = complex_matrix_batch("constant_complex", [[
[Complex64::new(5.0, 1.0), Complex64::new(6.0, 0.0)],
[Complex64::new(7.0, -1.0), Complex64::new(8.0, 0.0)],
]]);
let left_complex_view = complex_matrix_view(&left_complex_field, &left_complex);
let right_complex_view = complex_matrix_view(&right_complex_field, &right_complex);
let constant_complex_view = complex_matrix_view(&constant_complex_field, &constant_complex);
let mixed_complex_result = invoke_udf(
&udfs::matrix_solve_sylvester_mixed_complex_udf(),
vec![
ColumnarValue::Array(Arc::new(left_complex.clone())),
ColumnarValue::Array(Arc::new(right_complex.clone())),
ColumnarValue::Array(Arc::new(constant_complex.clone())),
],
vec![
Arc::clone(&left_complex_field),
Arc::clone(&right_complex_field),
Arc::clone(&constant_complex_field),
],
&[None, None, None],
1,
);
if !cfg!(feature = "magma-system") {
let error = mixed_complex_result.expect_err("mixed complex should require magma-system");
assert!(error.to_string().contains("magma-system"));
return;
}
let (mixed_complex_field, mixed_complex_output) =
mixed_complex_result.expect("matrix_solve_sylvester_mixed_complex");
let DataType::Struct(mixed_complex_fields) = mixed_complex_field.data_type() else {
panic!("expected mixed complex struct output");
};
let mixed_complex_output = struct_array(&mixed_complex_output);
let mixed_complex_solution = ndarrow::complex64_fixed_shape_tensor_as_array_viewd(
&mixed_complex_fields[0],
mixed_complex_output
.column(0)
.as_any()
.downcast_ref::<FixedSizeListArray>()
.expect("mixed complex solution"),
)
.expect("mixed complex solution")
.into_dimensionality::<Ix3>()
.expect("rank-3 mixed complex solution");
let expected_mixed_complex = nabled::linalg::sylvester::solve_sylvester_mixed_complex_view(
&left_complex_view.index_axis(Axis(0), 0),
&right_complex_view.index_axis(Axis(0), 0),
&constant_complex_view.index_axis(Axis(0), 0),
)
.expect("expected mixed complex");
for i in 0..expected_mixed_complex.solution.nrows() {
for j in 0..expected_mixed_complex.solution.ncols() {
assert_complex_close(
mixed_complex_solution[[0, i, j]],
expected_mixed_complex.solution[[i, j]],
);
}
}
}
#[test]
fn matrix_sylvester_udfs_validate_contracts() {
let (left_field, left) = matrix_batch("left", [[[1.0, 0.0], [0.0, 2.0]]]);
let (right_field, right) = matrix_batch("right", [[[3.0, 0.0], [0.0, 4.0]]]);
let (bad_constant_field, bad_constant) =
matrix_batch("bad_constant", [[[5.0, 6.0, 7.0], [8.0, 9.0, 10.0]]]);
let shape_error = invoke_udf_error(
&udfs::matrix_solve_sylvester_udf(),
vec![
ColumnarValue::Array(Arc::new(left.clone())),
ColumnarValue::Array(Arc::new(right.clone())),
ColumnarValue::Array(Arc::new(bad_constant)),
],
vec![Arc::clone(&left_field), Arc::clone(&right_field), bad_constant_field],
&[None, None, None],
1,
);
assert!(shape_error.contains("constant matrix shape mismatch"));
let (left_f32_field, left_f32) = matrix_batch_f32("left_f32", [[[1.0_f32, 0.0], [0.0, 2.0]]]);
let (right_f32_field, right_f32) =
matrix_batch_f32("right_f32", [[[3.0_f32, 0.0], [0.0, 4.0]]]);
let (constant_f32_field, constant_f32) =
matrix_batch_f32("constant_f32", [[[5.0_f32, 6.0], [7.0, 8.0]]]);
let mixed_f64_error = invoke_udf_error(
&udfs::matrix_solve_sylvester_mixed_f64_udf(),
vec![
ColumnarValue::Array(Arc::new(left_f32)),
ColumnarValue::Array(Arc::new(right_f32)),
ColumnarValue::Array(Arc::new(constant_f32)),
],
vec![left_f32_field, right_f32_field, constant_f32_field],
&[None, None, None],
1,
);
assert!(mixed_f64_error.contains("requires Float64 matrix inputs"));
}
#[test]
fn matrix_qr_helper_udfs_cover_solver_scalar_and_reconstruction_outputs() {
let qr_solve_udf = udfs::matrix_qr_solve_least_squares_udf();
let qr_condition_udf = udfs::matrix_qr_condition_number_udf();
let qr_reconstruct_udf = udfs::matrix_qr_reconstruct_udf();
let (qr_field, qr_matrices) =
matrix_batch("qr", [[[2.0, 0.0], [0.0, 1.0]], [[3.0, 0.0], [0.0, 4.0]]]);
let rhs = fixed_size_list([[4.0, 3.0], [9.0, 8.0]]);
let rhs_field = vector_field("rhs", &DataType::Float64, 2, false).expect("rhs field");
let (_, qr_solution) = invoke_udf(
&qr_solve_udf,
vec![
ColumnarValue::Array(Arc::new(qr_matrices.clone())),
ColumnarValue::Array(Arc::new(rhs)),
],
vec![Arc::clone(&qr_field), rhs_field],
&[None, None],
2,
)
.expect("matrix_qr_solve_least_squares");
let qr_solution =
ndarrow::fixed_size_list_as_array2::<Float64Type>(fixed_size_list_array(&qr_solution))
.expect("least-squares solution");
assert_close(qr_solution[[0, 0]], 2.0);
assert_close(qr_solution[[0, 1]], 3.0);
assert_close(qr_solution[[1, 0]], 3.0);
assert_close(qr_solution[[1, 1]], 2.0);
let (_, qr_condition) = invoke_udf(
&qr_condition_udf,
vec![ColumnarValue::Array(Arc::new(qr_matrices.clone()))],
vec![Arc::clone(&qr_field)],
&[None],
2,
)
.expect("matrix_qr_condition_number");
assert_close(f64_array(&qr_condition).value(0), 2.0);
assert_close(f64_array(&qr_condition).value(1), 4.0 / 3.0);
let (qr_reconstruct_field, qr_reconstructed) = invoke_udf(
&qr_reconstruct_udf,
vec![ColumnarValue::Array(Arc::new(qr_matrices))],
vec![qr_field],
&[None],
2,
)
.expect("matrix_qr_reconstruct");
let qr_reconstructed = fixed_shape_view3(&qr_reconstruct_field, &qr_reconstructed);
assert_close(qr_reconstructed[[0, 0, 0]], 2.0);
assert_close(qr_reconstructed[[0, 1, 1]], 1.0);
assert_close(qr_reconstructed[[1, 0, 0]], 3.0);
assert_close(qr_reconstructed[[1, 1, 1]], 4.0);
}
#[test]
fn matrix_svd_helper_udfs_cover_inverse_scalar_rank_and_reconstruction_outputs() {
let svd_pseudo_inverse_udf = udfs::matrix_svd_pseudo_inverse_udf();
let svd_condition_udf = udfs::matrix_svd_condition_number_udf();
let svd_rank_udf = udfs::matrix_svd_rank_udf();
let svd_reconstruct_udf = udfs::matrix_svd_reconstruct_udf();
let (svd_field, svd_matrices) =
matrix_batch("svd", [[[4.0, 0.0], [0.0, 2.0]], [[9.0, 0.0], [0.0, 3.0]]]);
let (pseudo_inverse_field, pseudo_inverse) = invoke_udf(
&svd_pseudo_inverse_udf,
vec![ColumnarValue::Array(Arc::new(svd_matrices.clone()))],
vec![Arc::clone(&svd_field)],
&[None],
2,
)
.expect("matrix_svd_pseudo_inverse");
let pseudo_inverse = fixed_shape_view3(&pseudo_inverse_field, &pseudo_inverse);
assert_close(pseudo_inverse[[0, 0, 0]], 0.25);
assert_close(pseudo_inverse[[0, 1, 1]], 0.5);
assert_close(pseudo_inverse[[1, 0, 0]], 1.0 / 9.0);
assert_close(pseudo_inverse[[1, 1, 1]], 1.0 / 3.0);
let (_, svd_condition) = invoke_udf(
&svd_condition_udf,
vec![ColumnarValue::Array(Arc::new(svd_matrices.clone()))],
vec![Arc::clone(&svd_field)],
&[None],
2,
)
.expect("matrix_svd_condition_number");
assert_close(f64_array(&svd_condition).value(0), 2.0);
assert_close(f64_array(&svd_condition).value(1), 3.0);
let (svd_reconstruct_field, svd_reconstructed) = invoke_udf(
&svd_reconstruct_udf,
vec![ColumnarValue::Array(Arc::new(svd_matrices.clone()))],
vec![Arc::clone(&svd_field)],
&[None],
2,
)
.expect("matrix_svd_reconstruct");
let svd_reconstructed = fixed_shape_view3(&svd_reconstruct_field, &svd_reconstructed);
assert_close(svd_reconstructed[[0, 0, 0]], 4.0);
assert_close(svd_reconstructed[[0, 1, 1]], 2.0);
assert_close(svd_reconstructed[[1, 0, 0]], 9.0);
assert_close(svd_reconstructed[[1, 1, 1]], 3.0);
let (rank_field, rank_matrices) =
matrix_batch("rank", [[[1.0, 0.0], [0.0, 1.0]], [[1.0, 1.0], [1.0, 1.0]]]);
let (_, rank) = invoke_udf(
&svd_rank_udf,
vec![ColumnarValue::Array(Arc::new(rank_matrices))],
vec![rank_field],
&[None],
2,
)
.expect("matrix_svd_rank");
let ColumnarValue::Array(rank) = rank else {
panic!("expected rank array output");
};
let rank = rank.as_any().downcast_ref::<Int64Array>().expect("rank int64");
assert_eq!(rank.value(0), 2);
assert_eq!(rank.value(1), 1);
}
#[test]
fn lower_triangular_udfs_cover_outputs() {
let lower_solve_udf = udfs::matrix_solve_lower_udf();
let lower_matrix_udf = udfs::matrix_solve_lower_matrix_udf();
let (lower_field, lower_matrices) =
matrix_batch("lower", [[[2.0, 0.0], [3.0, 1.0]], [[4.0, 0.0], [1.0, 2.0]]]);
let lower_rhs = fixed_size_list([[4.0, 5.0], [8.0, 5.0]]);
let lower_rhs_field =
vector_field("lower_rhs", &DataType::Float64, 2, false).expect("lower rhs field");
let (_, lower_solution) = invoke_udf(
&lower_solve_udf,
vec![
ColumnarValue::Array(Arc::new(lower_matrices.clone())),
ColumnarValue::Array(Arc::new(lower_rhs)),
],
vec![Arc::clone(&lower_field), lower_rhs_field],
&[None, None],
2,
)
.expect("matrix_solve_lower");
let lower_solution =
ndarrow::fixed_size_list_as_array2::<Float64Type>(fixed_size_list_array(&lower_solution))
.expect("lower triangular solution");
assert_close(lower_solution[[0, 0]], 2.0);
assert_close(lower_solution[[0, 1]], -1.0);
assert_close(lower_solution[[1, 0]], 2.0);
assert_close(lower_solution[[1, 1]], 1.5);
let (lower_rhs_matrix_field, lower_rhs_matrices) =
matrix_batch("lower_rhs_matrix", [[[4.0, 2.0], [5.0, 1.0]], [[8.0, 4.0], [5.0, 3.0]]]);
let (lower_matrix_field, lower_matrix_solution) = invoke_udf(
&lower_matrix_udf,
vec![
ColumnarValue::Array(Arc::new(lower_matrices)),
ColumnarValue::Array(Arc::new(lower_rhs_matrices)),
],
vec![lower_field, lower_rhs_matrix_field],
&[None, None],
2,
)
.expect("matrix_solve_lower_matrix");
let lower_matrix_solution = fixed_shape_view3(&lower_matrix_field, &lower_matrix_solution);
assert_close(lower_matrix_solution[[0, 0, 0]], 2.0);
assert_close(lower_matrix_solution[[0, 0, 1]], 1.0);
assert_close(lower_matrix_solution[[0, 1, 0]], -1.0);
assert_close(lower_matrix_solution[[0, 1, 1]], -2.0);
assert_close(lower_matrix_solution[[1, 0, 0]], 2.0);
assert_close(lower_matrix_solution[[1, 0, 1]], 1.0);
assert_close(lower_matrix_solution[[1, 1, 0]], 1.5);
assert_close(lower_matrix_solution[[1, 1, 1]], 1.0);
}
#[test]
fn upper_triangular_udfs_cover_outputs() {
let upper_solve_udf = udfs::matrix_solve_upper_udf();
let upper_matrix_udf = udfs::matrix_solve_upper_matrix_udf();
let (upper_field, upper_matrices) =
matrix_batch("upper", [[[2.0, 3.0], [0.0, 4.0]], [[5.0, 1.0], [0.0, 2.0]]]);
let upper_rhs = fixed_size_list([[8.0, 12.0], [9.0, 4.0]]);
let upper_rhs_field =
vector_field("upper_rhs", &DataType::Float64, 2, false).expect("upper rhs field");
let (_, upper_solution) = invoke_udf(
&upper_solve_udf,
vec![
ColumnarValue::Array(Arc::new(upper_matrices.clone())),
ColumnarValue::Array(Arc::new(upper_rhs)),
],
vec![Arc::clone(&upper_field), upper_rhs_field],
&[None, None],
2,
)
.expect("matrix_solve_upper");
let upper_solution =
ndarrow::fixed_size_list_as_array2::<Float64Type>(fixed_size_list_array(&upper_solution))
.expect("upper triangular solution");
assert_close(upper_solution[[0, 0]], -0.5);
assert_close(upper_solution[[0, 1]], 3.0);
assert_close(upper_solution[[1, 0]], 1.4);
assert_close(upper_solution[[1, 1]], 2.0);
let (upper_rhs_matrix_field, upper_rhs_matrices) =
matrix_batch("upper_rhs_matrix", [[[8.0, 2.0], [12.0, 8.0]], [[9.0, 7.0], [4.0, 6.0]]]);
let (upper_matrix_field, upper_matrix_solution) = invoke_udf(
&upper_matrix_udf,
vec![
ColumnarValue::Array(Arc::new(upper_matrices)),
ColumnarValue::Array(Arc::new(upper_rhs_matrices)),
],
vec![upper_field, upper_rhs_matrix_field],
&[None, None],
2,
)
.expect("matrix_solve_upper_matrix");
let upper_matrix_solution = fixed_shape_view3(&upper_matrix_field, &upper_matrix_solution);
assert_close(upper_matrix_solution[[0, 0, 0]], -0.5);
assert_close(upper_matrix_solution[[0, 0, 1]], -2.0);
assert_close(upper_matrix_solution[[0, 1, 0]], 3.0);
assert_close(upper_matrix_solution[[0, 1, 1]], 2.0);
assert_close(upper_matrix_solution[[1, 0, 0]], 1.4);
assert_close(upper_matrix_solution[[1, 0, 1]], 0.8);
assert_close(upper_matrix_solution[[1, 1, 0]], 2.0);
assert_close(upper_matrix_solution[[1, 1, 1]], 3.0);
}
#[test]
fn matrix_zero_config_function_udfs_cover_outputs() {
let exp_eigen_udf = udfs::matrix_exp_eigen_udf();
let log_eigen_udf = udfs::matrix_log_eigen_udf();
let log_svd_udf = udfs::matrix_log_svd_udf();
let sign_udf = udfs::matrix_sign_udf();
let (exp_field, exp_matrices) =
matrix_batch("exp", [[[0.0, 0.0], [0.0, 1.0]], [[1.0, 0.0], [0.0, 2.0]]]);
let (exp_eigen_field, exp_eigen_output) = invoke_udf(
&exp_eigen_udf,
vec![ColumnarValue::Array(Arc::new(exp_matrices))],
vec![exp_field],
&[None],
2,
)
.expect("matrix_exp_eigen");
let exp_eigen_output = fixed_shape_view3(&exp_eigen_field, &exp_eigen_output);
assert_close(exp_eigen_output[[0, 0, 0]], 1.0);
assert_close(exp_eigen_output[[0, 1, 1]], 1.0_f64.exp());
assert_close(exp_eigen_output[[1, 0, 0]], 1.0_f64.exp());
assert_close(exp_eigen_output[[1, 1, 1]], 2.0_f64.exp());
let (log_field, log_matrices) = matrix_batch("log", [
[[1.0, 0.0], [0.0, std::f64::consts::E]],
[[std::f64::consts::E.powi(2), 0.0], [0.0, std::f64::consts::E.powi(3)]],
]);
let (log_eigen_field, log_eigen_output) = invoke_udf(
&log_eigen_udf,
vec![ColumnarValue::Array(Arc::new(log_matrices.clone()))],
vec![Arc::clone(&log_field)],
&[None],
2,
)
.expect("matrix_log_eigen");
let log_eigen_output = fixed_shape_view3(&log_eigen_field, &log_eigen_output);
assert_close(log_eigen_output[[0, 0, 0]], 0.0);
assert_close(log_eigen_output[[0, 1, 1]], 1.0);
assert_close(log_eigen_output[[1, 0, 0]], 2.0);
assert_close(log_eigen_output[[1, 1, 1]], 3.0);
let (log_svd_field, log_svd_output) = invoke_udf(
&log_svd_udf,
vec![ColumnarValue::Array(Arc::new(log_matrices))],
vec![log_field],
&[None],
2,
)
.expect("matrix_log_svd");
let log_svd_output = fixed_shape_view3(&log_svd_field, &log_svd_output);
assert_close(log_svd_output[[0, 0, 0]], 0.0);
assert_close(log_svd_output[[0, 1, 1]], 1.0);
assert_close(log_svd_output[[1, 0, 0]], 2.0);
assert_close(log_svd_output[[1, 1, 1]], 3.0);
let (sign_field, sign_matrices) =
matrix_batch("sign", [[[4.0, 0.0], [0.0, -9.0]], [[-2.0, 0.0], [0.0, 3.0]]]);
let (sign_return_field, sign_output) = invoke_udf(
&sign_udf,
vec![ColumnarValue::Array(Arc::new(sign_matrices))],
vec![sign_field],
&[None],
2,
)
.expect("matrix_sign");
let sign_output = fixed_shape_view3(&sign_return_field, &sign_output);
assert_close(sign_output[[0, 0, 0]], 1.0);
assert_close(sign_output[[0, 1, 1]], -1.0);
assert_close(sign_output[[1, 0, 0]], -1.0);
assert_close(sign_output[[1, 1, 1]], 1.0);
}
#[test]
fn parameterized_matrix_function_udfs_cover_outputs() {
let exp_udf = udfs::matrix_exp_udf();
let log_taylor_udf = udfs::matrix_log_taylor_udf();
let power_udf = udfs::matrix_power_udf();
let (exp_field, exp_matrices) =
matrix_batch("exp", [[[0.0, 0.0], [0.0, 1.0]], [[1.0, 0.0], [0.0, 2.0]]]);
let scalar_terms = ScalarValue::Int64(Some(32));
let scalar_tolerance = ScalarValue::Float64(Some(1.0e-12));
let (exp_return_field, exp_output) = invoke_udf(
&exp_udf,
vec![
ColumnarValue::Array(Arc::new(exp_matrices)),
ColumnarValue::Scalar(scalar_terms.clone()),
ColumnarValue::Scalar(scalar_tolerance.clone()),
],
vec![
exp_field,
Arc::new(Field::new("max_terms", DataType::Int64, false)),
Arc::new(Field::new("tolerance", DataType::Float64, false)),
],
&[None, Some(scalar_terms.clone()), Some(scalar_tolerance.clone())],
2,
)
.expect("matrix_exp");
let exp_output = fixed_shape_view3(&exp_return_field, &exp_output);
assert_close(exp_output[[0, 0, 0]], 1.0);
assert_close(exp_output[[0, 1, 1]], 1.0_f64.exp());
assert_close(exp_output[[1, 0, 0]], 1.0_f64.exp());
assert_close(exp_output[[1, 1, 1]], 2.0_f64.exp());
let (log_taylor_field, log_taylor_matrices) =
matrix_batch("log_taylor", [[[1.0, 0.0], [0.0, 1.1]], [[1.0, 0.0], [0.0, 1.2]]]);
let (log_taylor_return_field, log_taylor_output) = invoke_udf(
&log_taylor_udf,
vec![
ColumnarValue::Array(Arc::new(log_taylor_matrices)),
ColumnarValue::Scalar(ScalarValue::Int64(Some(64))),
ColumnarValue::Scalar(ScalarValue::Float64(Some(1.0e-12))),
],
vec![
Arc::clone(&log_taylor_field),
Arc::new(Field::new("max_terms", DataType::Int64, false)),
Arc::new(Field::new("tolerance", DataType::Float64, false)),
],
&[None, Some(ScalarValue::Int64(Some(64))), Some(ScalarValue::Float64(Some(1.0e-12)))],
2,
)
.expect("matrix_log_taylor");
let log_taylor_output = fixed_shape_view3(&log_taylor_return_field, &log_taylor_output);
assert_close(log_taylor_output[[0, 0, 0]], 0.0);
assert_close(log_taylor_output[[0, 1, 1]], 1.1_f64.ln());
assert_close(log_taylor_output[[1, 0, 0]], 0.0);
assert_close(log_taylor_output[[1, 1, 1]], 1.2_f64.ln());
let (power_field, power_matrices) =
matrix_batch("power", [[[4.0, 0.0], [0.0, 9.0]], [[16.0, 0.0], [0.0, 25.0]]]);
let (power_return_field, power_output) = invoke_udf(
&power_udf,
vec![
ColumnarValue::Array(Arc::new(power_matrices)),
ColumnarValue::Scalar(ScalarValue::Float64(Some(0.5))),
],
vec![power_field, Arc::new(Field::new("power", DataType::Float64, false))],
&[None, Some(ScalarValue::Float64(Some(0.5)))],
2,
)
.expect("matrix_power");
let power_output = fixed_shape_view3(&power_return_field, &power_output);
assert_close(power_output[[0, 0, 0]], 2.0);
assert_close(power_output[[0, 1, 1]], 3.0);
assert_close(power_output[[1, 0, 0]], 4.0);
assert_close(power_output[[1, 1, 1]], 5.0);
}
#[test]
fn parameterized_matrix_function_udfs_validate_scalar_contracts() {
let exp_udf = udfs::matrix_exp_udf();
let log_taylor_udf = udfs::matrix_log_taylor_udf();
let power_udf = udfs::matrix_power_udf();
let (field, matrices) = matrix_batch("matrix", [[[1.0, 0.0], [0.0, 1.1]]]);
let max_terms_field = Arc::new(Field::new("max_terms", DataType::Int64, false));
let tolerance_field = Arc::new(Field::new("tolerance", DataType::Float64, false));
let power_field = Arc::new(Field::new("power", DataType::Float64, false));
let missing_scalar_error = invoke_udf_error(
&exp_udf,
vec![
ColumnarValue::Array(Arc::new(matrices.clone())),
ColumnarValue::Scalar(ScalarValue::Int64(Some(16))),
ColumnarValue::Scalar(ScalarValue::Float64(Some(1.0e-8))),
],
vec![Arc::clone(&field), Arc::clone(&max_terms_field), Arc::clone(&tolerance_field)],
&[None, None, Some(ScalarValue::Float64(Some(1.0e-8)))],
1,
);
let tolerance_error = invoke_udf_error(
&log_taylor_udf,
vec![
ColumnarValue::Array(Arc::new(matrices.clone())),
ColumnarValue::Scalar(ScalarValue::Int64(Some(16))),
ColumnarValue::Scalar(ScalarValue::Float64(Some(-1.0))),
],
vec![Arc::clone(&field), Arc::clone(&max_terms_field), Arc::clone(&tolerance_field)],
&[None, Some(ScalarValue::Int64(Some(16))), Some(ScalarValue::Float64(Some(-1.0)))],
1,
);
let runtime_scalar_error = invoke_udf_error(
&power_udf,
vec![
ColumnarValue::Array(Arc::new(matrices)),
ColumnarValue::Array(Arc::new(Float64Array::from(vec![0.5]))),
],
vec![field, power_field],
&[None, Some(ScalarValue::Float64(Some(0.5)))],
1,
);
assert!(missing_scalar_error.contains("argument 2 must be a non-null scalar"));
assert!(tolerance_error.contains("tolerance must be positive"));
assert!(runtime_scalar_error.contains("argument 2 must be a numeric scalar"));
}
#[test]
fn matrix_stats_and_pca_udfs_cover_batch_outputs() {
let (field, matrices) =
matrix_batch("stats", [[[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]], [[1.0, 0.0], [0.0, 1.0], [
1.0, 1.0,
]]]);
let center_udf = udfs::matrix_center_columns_udf();
let covariance_udf = udfs::matrix_covariance_udf();
let correlation_udf = udfs::matrix_correlation_udf();
let pca_udf = udfs::matrix_pca_udf();
let (center_field, centered) = invoke_udf(
¢er_udf,
vec![ColumnarValue::Array(Arc::new(matrices.clone()))],
vec![Arc::clone(&field)],
&[None],
2,
)
.expect("matrix_center_columns");
let centered = fixed_shape_view3(¢er_field, ¢ered);
assert_close(centered[[0, 0, 0]], -2.0);
assert_close(centered[[0, 2, 1]], 2.0);
assert_close(centered[[1, 0, 0]], 1.0 / 3.0);
assert_close(centered[[1, 1, 1]], 1.0 / 3.0);
let (cov_field, covariance) = invoke_udf(
&covariance_udf,
vec![ColumnarValue::Array(Arc::new(matrices.clone()))],
vec![Arc::clone(&field)],
&[None],
2,
)
.expect("matrix_covariance");
let covariance = fixed_shape_view3(&cov_field, &covariance);
assert_close(covariance[[0, 0, 0]], 4.0);
assert_close(covariance[[0, 0, 1]], 4.0);
assert_close(covariance[[1, 0, 0]], 1.0 / 3.0);
assert_close(covariance[[1, 0, 1]], -1.0 / 6.0);
let (corr_field, correlation) = invoke_udf(
&correlation_udf,
vec![ColumnarValue::Array(Arc::new(matrices.clone()))],
vec![Arc::clone(&field)],
&[None],
2,
)
.expect("matrix_correlation");
let correlation = fixed_shape_view3(&corr_field, &correlation);
assert_close(correlation[[0, 0, 0]], 1.0);
assert_close(correlation[[0, 0, 1]], 1.0);
assert_close(correlation[[1, 0, 1]], -0.5);
let (pca_return_field, pca_output) = invoke_udf(
&pca_udf,
vec![ColumnarValue::Array(Arc::new(matrices.clone()))],
vec![Arc::clone(&field)],
&[None],
2,
)
.expect("matrix_pca");
let DataType::Struct(pca_fields) = pca_return_field.data_type() else {
panic!("expected PCA struct return");
};
let pca = struct_array(&pca_output);
let components =
pca.column(0).as_any().downcast_ref::<FixedSizeListArray>().expect("components");
let components =
ndarrow::fixed_shape_tensor_as_array_viewd::<Float64Type>(&pca_fields[0], components)
.expect("components")
.into_dimensionality::<Ix3>()
.expect("rank-3 tensor");
let explained =
pca.column(1).as_any().downcast_ref::<FixedSizeListArray>().expect("explained variance");
let explained =
ndarrow::fixed_size_list_as_array2::<Float64Type>(explained).expect("explained variance");
let ratio = pca
.column(2)
.as_any()
.downcast_ref::<FixedSizeListArray>()
.expect("explained variance ratio");
let ratio = ndarrow::fixed_size_list_as_array2::<Float64Type>(ratio).expect("ratio");
let mean = pca.column(3).as_any().downcast_ref::<FixedSizeListArray>().expect("mean");
let mean = ndarrow::fixed_size_list_as_array2::<Float64Type>(mean).expect("mean");
let scores = pca.column(4).as_any().downcast_ref::<FixedSizeListArray>().expect("scores");
let scores = ndarrow::fixed_shape_tensor_as_array_viewd::<Float64Type>(&pca_fields[4], scores)
.expect("scores")
.into_dimensionality::<Ix3>()
.expect("rank-3 tensor");
assert_close(mean[[0, 0]], 3.0);
assert_close(mean[[0, 1]], 4.0);
assert_close(explained[[0, 0]], 8.0);
assert_close(explained[[0, 1]], 0.0);
assert_close(ratio[[0, 0]], 1.0);
assert_close(ratio[[0, 1]], 0.0);
assert_close(components[[0, 0, 0]].abs(), 1.0 / 2.0_f64.sqrt());
assert_close(components[[0, 0, 1]].abs(), 1.0 / 2.0_f64.sqrt());
assert_close(scores[[0, 1, 0]], 0.0);
assert_close(scores[[0, 1, 1]], 0.0);
}
#[test]
fn matrix_pca_application_udfs_cover_batch_outputs() {
let (field, matrices) =
matrix_batch("stats", [[[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]], [[1.0, 0.0], [0.0, 1.0], [
1.0, 1.0,
]]]);
let pca_udf = udfs::matrix_pca_udf();
let pca_transform_udf = udfs::matrix_pca_transform_udf();
let pca_inverse_udf = udfs::matrix_pca_inverse_transform_udf();
let (pca_return_field, pca_output) = invoke_udf(
&pca_udf,
vec![ColumnarValue::Array(Arc::new(matrices.clone()))],
vec![Arc::clone(&field)],
&[None],
2,
)
.expect("matrix_pca");
let scores = assert_pca_scores_from_struct(&pca_return_field, &pca_output).to_owned();
let pca = struct_array(&pca_output).clone();
let (transformed_field, transformed_output) = invoke_udf(
&pca_transform_udf,
vec![
ColumnarValue::Array(Arc::new(matrices.clone())),
ColumnarValue::Array(Arc::new(pca.clone())),
],
vec![Arc::clone(&field), Arc::clone(&pca_return_field)],
&[None, None],
2,
)
.expect("matrix_pca_transform");
let transformed = fixed_shape_view3(&transformed_field, &transformed_output);
assert_close(transformed[[0, 0, 0]], scores[[0, 0, 0]]);
assert_close(transformed[[0, 2, 0]], scores[[0, 2, 0]]);
assert_close(transformed[[1, 1, 1]], scores[[1, 1, 1]]);
let ColumnarValue::Array(transformed_array) = &transformed_output else {
panic!("expected array output");
};
let (reconstructed_field, reconstructed) = invoke_udf(
&pca_inverse_udf,
vec![
ColumnarValue::Array(Arc::clone(transformed_array)),
ColumnarValue::Array(Arc::new(pca.clone())),
],
vec![transformed_field, Arc::clone(&pca_return_field)],
&[None, None],
2,
)
.expect("matrix_pca_inverse_transform");
let reconstructed = fixed_shape_view3(&reconstructed_field, &reconstructed);
assert_close(reconstructed[[0, 0, 0]], 1.0);
assert_close(reconstructed[[0, 2, 1]], 6.0);
assert_close(reconstructed[[1, 0, 0]], 1.0);
assert_close(reconstructed[[1, 2, 1]], 1.0);
}
#[test]
fn matrix_pca_application_udfs_validate_contracts() {
let pca_udf = udfs::matrix_pca_udf();
let pca_transform_udf = udfs::matrix_pca_transform_udf();
let pca_inverse_udf = udfs::matrix_pca_inverse_transform_udf();
let (fit_field, fit_matrices) = matrix_batch("fit", [[[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]]]);
let (wide_field, wide_matrices) =
matrix_batch("wide", [[[1.0, 2.0, 3.0], [4.0, 5.0, 6.0], [7.0, 8.0, 9.0]]]);
let (short_score_field, short_scores) = matrix_batch("short_scores", [[[1.0], [0.0], [1.0]]]);
let (pca_field, pca_output) = invoke_udf(
&pca_udf,
vec![ColumnarValue::Array(Arc::new(fit_matrices.clone()))],
vec![Arc::clone(&fit_field)],
&[None],
1,
)
.expect("matrix_pca");
let pca = struct_array(&pca_output).clone();
let transform_error = invoke_udf_error(
&pca_transform_udf,
vec![
ColumnarValue::Array(Arc::new(wide_matrices)),
ColumnarValue::Array(Arc::new(pca.clone())),
],
vec![wide_field, Arc::clone(&pca_field)],
&[None, None],
1,
);
let inverse_error = invoke_udf_error(
&pca_inverse_udf,
vec![ColumnarValue::Array(Arc::new(short_scores)), ColumnarValue::Array(Arc::new(pca))],
vec![short_score_field, pca_field],
&[None, None],
1,
);
assert!(transform_error.contains("matrix feature width mismatch"));
assert!(inverse_error.contains("score width mismatch"));
}
fn assert_complex_pca_fit_output(
field: &FieldRef,
matrices: &FixedSizeListArray,
pca_return_field: &FieldRef,
pca_output: &ColumnarValue,
) -> Array3<Complex64> {
let DataType::Struct(pca_fields) = pca_return_field.data_type() else {
panic!("expected complex PCA struct return");
};
let pca = struct_array(pca_output);
let matrix_view = complex_matrix_view(field, matrices);
let explained =
pca.column(1).as_any().downcast_ref::<FixedSizeListArray>().expect("explained variance");
let explained =
ndarrow::fixed_size_list_as_array2::<Float64Type>(explained).expect("explained");
let ratio = pca
.column(2)
.as_any()
.downcast_ref::<FixedSizeListArray>()
.expect("explained variance ratio");
let ratio = ndarrow::fixed_size_list_as_array2::<Float64Type>(ratio).expect("ratio");
let mean = pca.column(3).as_any().downcast_ref::<FixedSizeListArray>().expect("mean");
let mean = ndarrow::complex64_as_array_view2(mean).expect("complex mean");
let scores = assert_complex_pca_scores_from_struct(pca_return_field, pca_output).to_owned();
let components =
pca.column(0).as_any().downcast_ref::<FixedSizeListArray>().expect("components");
let components =
ndarrow::complex64_fixed_shape_tensor_as_array_viewd(&pca_fields[0], components)
.expect("components")
.into_dimensionality::<Ix3>()
.expect("rank-3 components");
for row in 0..matrix_view.len_of(Axis(0)) {
let expected =
nabled::ml::pca::compute_pca_complex_view(&matrix_view.index_axis(Axis(0), row), None)
.expect("expected complex pca");
for col in 0..expected.mean.len() {
assert_complex_close(mean[[row, col]], expected.mean[col]);
}
for col in 0..expected.explained_variance.len() {
assert_close(explained[[row, col]], expected.explained_variance[col]);
assert_close(ratio[[row, col]], expected.explained_variance_ratio[col]);
}
for i in 0..expected.scores.nrows() {
for j in 0..expected.scores.ncols() {
assert_complex_close(scores[[row, i, j]], expected.scores[[i, j]]);
}
}
for i in 0..expected.components.nrows() {
for j in 0..expected.components.ncols() {
assert_complex_close(components[[row, i, j]], expected.components[[i, j]]);
}
}
}
scores
}
fn assert_complex_pca_transform_inverse(
field: &FieldRef,
matrices: &FixedSizeListArray,
pca_return_field: &FieldRef,
pca_output: &ColumnarValue,
scores: &Array3<Complex64>,
) {
let pca_transform_udf = udfs::matrix_pca_transform_complex_udf();
let pca_inverse_udf = udfs::matrix_pca_inverse_transform_complex_udf();
let pca = struct_array(pca_output).clone();
let matrix_view = complex_matrix_view(field, matrices);
let (transformed_field, transformed_output) = invoke_udf(
&pca_transform_udf,
vec![
ColumnarValue::Array(Arc::new(matrices.clone())),
ColumnarValue::Array(Arc::new(pca.clone())),
],
vec![Arc::clone(field), Arc::clone(pca_return_field)],
&[None, None],
2,
)
.expect("matrix_pca_transform_complex");
let transformed = complex_fixed_shape_view3(&transformed_field, &transformed_output);
for row in 0..transformed.len_of(Axis(0)) {
for i in 0..transformed.len_of(Axis(1)) {
for j in 0..transformed.len_of(Axis(2)) {
assert_complex_close(transformed[[row, i, j]], scores[[row, i, j]]);
}
}
}
let ColumnarValue::Array(transformed_array) = &transformed_output else {
panic!("expected array output");
};
let (reconstructed_field, reconstructed_output) = invoke_udf(
&pca_inverse_udf,
vec![
ColumnarValue::Array(Arc::clone(transformed_array)),
ColumnarValue::Array(Arc::new(pca)),
],
vec![transformed_field, Arc::clone(pca_return_field)],
&[None, None],
2,
)
.expect("matrix_pca_inverse_transform_complex");
let reconstructed = complex_fixed_shape_view3(&reconstructed_field, &reconstructed_output);
for row in 0..reconstructed.len_of(Axis(0)) {
for i in 0..reconstructed.len_of(Axis(1)) {
for j in 0..reconstructed.len_of(Axis(2)) {
assert_complex_close(reconstructed[[row, i, j]], matrix_view[[row, i, j]]);
}
}
}
}
#[test]
fn complex_matrix_pca_udfs_cover_batch_outputs() {
let (field, matrices) = complex_matrix_batch("complex_pca", [
[
[Complex64::new(1.0, 1.0), Complex64::new(2.0, 0.0)],
[Complex64::new(3.0, 1.0), Complex64::new(4.0, 0.0)],
[Complex64::new(5.0, 1.0), Complex64::new(6.0, 0.0)],
],
[
[Complex64::new(2.0, 0.0), Complex64::new(1.0, -1.0)],
[Complex64::new(4.0, 0.0), Complex64::new(3.0, -1.0)],
[Complex64::new(6.0, 0.0), Complex64::new(5.0, -1.0)],
],
]);
let pca_udf = udfs::matrix_pca_complex_udf();
let (pca_return_field, pca_output) = invoke_udf(
&pca_udf,
vec![ColumnarValue::Array(Arc::new(matrices.clone()))],
vec![Arc::clone(&field)],
&[None],
2,
)
.expect("matrix_pca_complex");
let scores = assert_complex_pca_fit_output(&field, &matrices, &pca_return_field, &pca_output);
assert_complex_pca_transform_inverse(
&field,
&matrices,
&pca_return_field,
&pca_output,
&scores,
);
}
#[test]
fn complex_matrix_pca_application_udfs_validate_contracts() {
let pca_udf = udfs::matrix_pca_complex_udf();
let pca_transform_udf = udfs::matrix_pca_transform_complex_udf();
let pca_inverse_udf = udfs::matrix_pca_inverse_transform_complex_udf();
let (fit_field, fit_matrices) = complex_matrix_batch("complex_fit", [[
[Complex64::new(1.0, 1.0), Complex64::new(2.0, 0.0)],
[Complex64::new(3.0, 1.0), Complex64::new(4.0, 0.0)],
[Complex64::new(5.0, 1.0), Complex64::new(6.0, 0.0)],
]]);
let (wide_field, wide_matrices) = complex_matrix_batch("complex_wide", [[
[Complex64::new(1.0, 0.0), Complex64::new(2.0, 0.0), Complex64::new(3.0, 0.0)],
[Complex64::new(4.0, 0.0), Complex64::new(5.0, 0.0), Complex64::new(6.0, 0.0)],
[Complex64::new(7.0, 0.0), Complex64::new(8.0, 0.0), Complex64::new(9.0, 0.0)],
]]);
let (short_score_field, short_scores) = complex_matrix_batch("complex_short_scores", [[
[Complex64::new(1.0, 0.0)],
[Complex64::new(0.0, 0.0)],
[Complex64::new(1.0, 0.0)],
]]);
let (pca_field, pca_output) = invoke_udf(
&pca_udf,
vec![ColumnarValue::Array(Arc::new(fit_matrices.clone()))],
vec![Arc::clone(&fit_field)],
&[None],
1,
)
.expect("matrix_pca_complex");
let pca = struct_array(&pca_output).clone();
let transform_error = invoke_udf_error(
&pca_transform_udf,
vec![
ColumnarValue::Array(Arc::new(wide_matrices)),
ColumnarValue::Array(Arc::new(pca.clone())),
],
vec![wide_field, Arc::clone(&pca_field)],
&[None, None],
1,
);
let inverse_error = invoke_udf_error(
&pca_inverse_udf,
vec![ColumnarValue::Array(Arc::new(short_scores)), ColumnarValue::Array(Arc::new(pca))],
vec![short_score_field, pca_field],
&[None, None],
1,
);
assert!(transform_error.contains("matrix feature width mismatch"));
assert!(inverse_error.contains("score width mismatch"));
}
#[test]
fn iterative_solver_udfs_cover_outputs() {
let (field, matrices) =
matrix_batch("iterative", [[[4.0, 1.0], [1.0, 3.0]], [[10.0, 2.0], [2.0, 5.0]]]);
let rhs = fixed_size_list([[1.0, 2.0], [12.0, 7.0]]);
let rhs_field = vector_field("rhs", &DataType::Float64, 2, false).expect("rhs field");
let tolerance = ScalarValue::Float64(Some(1.0e-12));
let max_iterations = ScalarValue::Int64(Some(64));
let tolerance_field = Arc::new(Field::new("tolerance", DataType::Float64, false));
let max_iterations_field = Arc::new(Field::new("max_iterations", DataType::Int64, false));
for udf in [udfs::matrix_conjugate_gradient_udf(), udfs::matrix_gmres_udf()] {
let (_, output) = invoke_udf(
&udf,
vec![
ColumnarValue::Array(Arc::new(matrices.clone())),
ColumnarValue::Array(Arc::new(rhs.clone())),
ColumnarValue::Scalar(tolerance.clone()),
ColumnarValue::Scalar(max_iterations.clone()),
],
vec![
Arc::clone(&field),
Arc::clone(&rhs_field),
Arc::clone(&tolerance_field),
Arc::clone(&max_iterations_field),
],
&[None, None, Some(tolerance.clone()), Some(max_iterations.clone())],
2,
)
.unwrap_or_else(|error| panic!("{} output: {error}", udf.name()));
let output =
ndarrow::fixed_size_list_as_array2::<Float64Type>(fixed_size_list_array(&output))
.expect("iterative output");
assert_close(output[[0, 0]], 1.0 / 11.0);
assert_close(output[[0, 1]], 7.0 / 11.0);
assert_close(output[[1, 0]], 1.0);
assert_close(output[[1, 1]], 1.0);
}
}
#[test]
fn iterative_solver_udfs_validate_scalar_contracts() {
let (field, matrices) = matrix_batch("iterative", [[[4.0, 1.0], [1.0, 3.0]]]);
let rhs = fixed_size_list([[1.0, 2.0]]);
let rhs_field = vector_field("rhs", &DataType::Float64, 2, false).expect("rhs field");
let tolerance_field = Arc::new(Field::new("tolerance", DataType::Float64, false));
let max_iterations_field = Arc::new(Field::new("max_iterations", DataType::Int64, false));
let tolerance_error = invoke_udf_error(
&udfs::matrix_conjugate_gradient_udf(),
vec![
ColumnarValue::Array(Arc::new(matrices.clone())),
ColumnarValue::Array(Arc::new(rhs.clone())),
ColumnarValue::Scalar(ScalarValue::Float64(Some(-1.0))),
ColumnarValue::Scalar(ScalarValue::Int64(Some(32))),
],
vec![
Arc::clone(&field),
Arc::clone(&rhs_field),
Arc::clone(&tolerance_field),
Arc::clone(&max_iterations_field),
],
&[None, None, Some(ScalarValue::Float64(Some(-1.0))), Some(ScalarValue::Int64(Some(32)))],
1,
);
let max_iterations_error = invoke_udf_error(
&udfs::matrix_gmres_udf(),
vec![
ColumnarValue::Array(Arc::new(matrices.clone())),
ColumnarValue::Array(Arc::new(rhs.clone())),
ColumnarValue::Scalar(ScalarValue::Float64(Some(1.0e-12))),
ColumnarValue::Scalar(ScalarValue::Int64(Some(0))),
],
vec![
Arc::clone(&field),
Arc::clone(&rhs_field),
Arc::clone(&tolerance_field),
Arc::clone(&max_iterations_field),
],
&[None, None, Some(ScalarValue::Float64(Some(1.0e-12))), Some(ScalarValue::Int64(Some(0)))],
1,
);
let scalar_error = invoke_udf_error(
&udfs::matrix_conjugate_gradient_udf(),
vec![
ColumnarValue::Array(Arc::new(matrices)),
ColumnarValue::Array(Arc::new(rhs)),
ColumnarValue::Array(Arc::new(Float64Array::from(vec![1.0e-6]))),
ColumnarValue::Scalar(ScalarValue::Int64(Some(32))),
],
vec![field, rhs_field, tolerance_field, max_iterations_field],
&[None, None, Some(ScalarValue::Float64(Some(1.0e-6))), Some(ScalarValue::Int64(Some(32)))],
1,
);
assert!(tolerance_error.contains("tolerance must be positive"));
assert!(max_iterations_error.contains("max_iterations must be greater than 0"));
assert!(scalar_error.contains("argument 3 must be a numeric scalar"));
}
#[test]
fn iterative_solver_udfs_validate_shape_contracts() {
let (field, matrices) = matrix_batch("iterative", [[[4.0, 1.0], [1.0, 3.0]]]);
let rhs = fixed_size_list([[1.0, 2.0]]);
let rhs_field = vector_field("rhs", &DataType::Float64, 2, false).expect("rhs field");
let tolerance_field = Arc::new(Field::new("tolerance", DataType::Float64, false));
let max_iterations_field = Arc::new(Field::new("max_iterations", DataType::Int64, false));
let (rect_field, rect_matrices) = matrix_batch("rect", [[[1.0, 0.0], [0.0, 1.0], [0.0, 0.0]]]);
let rect_error = invoke_udf_error(
&udfs::matrix_gmres_udf(),
vec![
ColumnarValue::Array(Arc::new(rect_matrices)),
ColumnarValue::Array(Arc::new(rhs.clone())),
ColumnarValue::Scalar(ScalarValue::Float64(Some(1.0e-12))),
ColumnarValue::Scalar(ScalarValue::Int64(Some(32))),
],
vec![
rect_field,
Arc::clone(&rhs_field),
Arc::clone(&tolerance_field),
Arc::clone(&max_iterations_field),
],
&[
None,
None,
Some(ScalarValue::Float64(Some(1.0e-12))),
Some(ScalarValue::Int64(Some(32))),
],
1,
);
let wrong_rhs_field = vector_field("rhs_long", &DataType::Float64, 3, false).expect("rhs");
let wrong_rhs = fixed_size_list([[1.0, 2.0, 3.0]]);
let rhs_error = invoke_udf_error(
&udfs::matrix_conjugate_gradient_udf(),
vec![
ColumnarValue::Array(Arc::new(matrices)),
ColumnarValue::Array(Arc::new(wrong_rhs)),
ColumnarValue::Scalar(ScalarValue::Float64(Some(1.0e-12))),
ColumnarValue::Scalar(ScalarValue::Int64(Some(32))),
],
vec![field, wrong_rhs_field, tolerance_field, max_iterations_field],
&[
None,
None,
Some(ScalarValue::Float64(Some(1.0e-12))),
Some(ScalarValue::Int64(Some(32))),
],
1,
);
assert!(rect_error.contains("requires square matrices"));
assert!(rhs_error.contains("rhs vector length mismatch"));
}
#[test]
fn iterative_solver_udfs_cover_float32_branches() {
let (field, matrices) = matrix_batch_f32("iterative", [[[4.0, 1.0], [1.0, 3.0]]]);
let rhs_field = vector_field("rhs", &DataType::Float32, 2, false).expect("rhs field");
let rhs = fixed_size_list_f32([[1.0, 2.0]]);
let tolerance = ScalarValue::Float64(Some(1.0e-6));
let max_iterations = ScalarValue::Int64(Some(32));
let tolerance_field = Arc::new(Field::new("tolerance", DataType::Float64, false));
let max_iterations_field = Arc::new(Field::new("max_iterations", DataType::Int64, false));
for udf in [udfs::matrix_conjugate_gradient_udf(), udfs::matrix_gmres_udf()] {
let (return_field, output) = invoke_udf(
&udf,
vec![
ColumnarValue::Array(Arc::new(matrices.clone())),
ColumnarValue::Array(Arc::new(rhs.clone())),
ColumnarValue::Scalar(tolerance.clone()),
ColumnarValue::Scalar(max_iterations.clone()),
],
vec![
Arc::clone(&field),
Arc::clone(&rhs_field),
Arc::clone(&tolerance_field),
Arc::clone(&max_iterations_field),
],
&[None, None, Some(tolerance.clone()), Some(max_iterations.clone())],
1,
)
.unwrap_or_else(|error| panic!("{} f32: {error}", udf.name()));
assert_eq!(array_data_type(&output), return_field.data_type());
}
}
#[test]
fn sparse_batch_udfs_cover_dense_sparse_products_and_transpose() {
let (left_field, left) = sparse_batch("left");
let (right_dense_field, right_dense) = ragged_matrices("right_dense", vec![
vec![vec![1.0, 0.0], vec![0.0, 1.0], vec![1.0, 1.0]],
vec![vec![1.0, 0.0], vec![0.0, 1.0]],
]);
let (right_sparse_field, right_sparse) = sparse_batch_rhs("right_sparse");
let matmat_dense_udf = udfs::sparse_matmat_dense_udf();
let transpose_udf = udfs::sparse_transpose_udf();
let matmat_sparse_udf = udfs::sparse_matmat_sparse_udf();
let (dense_return_field, dense_result) = invoke_udf(
&matmat_dense_udf,
vec![
ColumnarValue::Array(Arc::new(left.clone())),
ColumnarValue::Array(Arc::new(right_dense)),
],
vec![Arc::clone(&left_field), right_dense_field],
&[None, None],
2,
)
.expect("sparse_matmat_dense");
let mut dense_iter = variable_shape_rows(&dense_return_field, &dense_result);
let (_, row0) = dense_iter.next().expect("row0").expect("row0 view");
let row0 = row0.into_dimensionality::<Ix2>().expect("rank-2 output");
let (_, row1) = dense_iter.next().expect("row1").expect("row1 view");
let row1 = row1.into_dimensionality::<Ix2>().expect("rank-2 output");
assert_close(row0[[0, 0]], 3.0);
assert_close(row0[[0, 1]], 2.0);
assert_close(row0[[1, 1]], 3.0);
assert_close(row1[[0, 0]], 4.0);
assert_close(row1[[1, 1]], 6.0);
let (transpose_field, transpose_result) = invoke_udf(
&transpose_udf,
vec![ColumnarValue::Array(Arc::new(left.clone()))],
vec![Arc::clone(&left_field)],
&[None],
2,
)
.expect("sparse_transpose");
let transpose_result = struct_array(&transpose_result);
let mut transpose_iter =
ndarrow::csr_matrix_batch_iter::<Float64Type>(transpose_field.as_ref(), transpose_result)
.expect("transpose batch");
let (_, transposed0) = transpose_iter.next().expect("row0").expect("row0 csr");
let (_, transposed1) = transpose_iter.next().expect("row1").expect("row1 csr");
let transposed0 = csr_to_dense(&transposed0);
let transposed1 = csr_to_dense(&transposed1);
assert_eq!(transposed0.shape(), &[3, 2]);
assert_eq!(transposed1.shape(), &[2, 2]);
assert_close(transposed0[[0, 0]], 1.0);
assert_close(transposed0[[1, 1]], 3.0);
assert_close(transposed0[[2, 0]], 2.0);
assert_close(transposed1[[0, 0]], 4.0);
assert_close(transposed1[[0, 1]], 5.0);
assert_close(transposed1[[1, 1]], 6.0);
let (sparse_return_field, sparse_result) = invoke_udf(
&matmat_sparse_udf,
vec![ColumnarValue::Array(Arc::new(left)), ColumnarValue::Array(Arc::new(right_sparse))],
vec![left_field, right_sparse_field],
&[None, None],
2,
)
.expect("sparse_matmat_sparse");
let sparse_result = struct_array(&sparse_result);
let mut sparse_iter =
ndarrow::csr_matrix_batch_iter::<Float64Type>(sparse_return_field.as_ref(), sparse_result)
.expect("sparse product batch");
let (_, product0) = sparse_iter.next().expect("row0").expect("row0 csr");
let (_, product1) = sparse_iter.next().expect("row1").expect("row1 csr");
let product0 = csr_to_dense(&product0);
let product1 = csr_to_dense(&product1);
assert_close(product0[[0, 0]], 3.0);
assert_close(product0[[0, 1]], 2.0);
assert_close(product0[[1, 1]], 3.0);
assert_close(product1[[0, 0]], 4.0);
assert_close(product1[[1, 1]], 6.0);
}
#[test]
fn tensor_fixed_shape_last_axis_udfs_cover_outputs() {
let (tensor_field, tensor) =
matrix_batch("tensor", [[[3.0, 4.0], [0.0, 5.0]], [[8.0, 15.0], [7.0, 24.0]]]);
let (other_tensor_field, other_tensor) =
matrix_batch("other_tensor", [[[4.0, 0.0], [1.0, 2.0]], [[15.0, 8.0], [24.0, 7.0]]]);
let l2_udf = udfs::tensor_l2_norm_last_axis_udf();
let normalize_udf = udfs::tensor_normalize_last_axis_udf();
let dot_udf = udfs::tensor_batched_dot_last_axis_udf();
let (l2_field, l2_norms) = invoke_udf(
&l2_udf,
vec![ColumnarValue::Array(Arc::new(tensor.clone()))],
vec![Arc::clone(&tensor_field)],
&[None],
2,
)
.expect("tensor_l2_norm_last_axis");
let l2_norms = fixed_shape_viewd(&l2_field, &l2_norms)
.into_dimensionality::<Ix2>()
.expect("rank-2 tensor");
assert_close(l2_norms[[0, 0]], 5.0);
assert_close(l2_norms[[0, 1]], 5.0);
assert_close(l2_norms[[1, 0]], 17.0);
assert_close(l2_norms[[1, 1]], 25.0);
let (normalize_field, normalized) = invoke_udf(
&normalize_udf,
vec![ColumnarValue::Array(Arc::new(tensor.clone()))],
vec![Arc::clone(&tensor_field)],
&[None],
2,
)
.expect("tensor_normalize_last_axis");
let normalized = fixed_shape_view3(&normalize_field, &normalized);
assert_close(normalized[[0, 0, 0]], 0.6);
assert_close(normalized[[0, 0, 1]], 0.8);
assert_close(normalized[[0, 1, 1]], 1.0);
let (dot_field, dots) = invoke_udf(
&dot_udf,
vec![
ColumnarValue::Array(Arc::new(tensor.clone())),
ColumnarValue::Array(Arc::new(other_tensor)),
],
vec![Arc::clone(&tensor_field), other_tensor_field],
&[None, None],
2,
)
.expect("tensor_batched_dot_last_axis");
let dots =
fixed_shape_viewd(&dot_field, &dots).into_dimensionality::<Ix2>().expect("rank-2 tensor");
assert_close(dots[[0, 0]], 12.0);
assert_close(dots[[0, 1]], 10.0);
assert_close(dots[[1, 0]], 240.0);
assert_close(dots[[1, 1]], 336.0);
}
#[test]
fn tensor_fixed_shape_axis_udfs_cover_outputs() {
let (permute_input_field, permute_tensor) =
matrix_batch("permute_tensor", [[[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]]]);
let (contract_left_field, contract_left) =
matrix_batch("contract_left", [[[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]]]);
let (contract_right_field, contract_right) =
matrix_batch("contract_right", [[[7.0, 8.0], [9.0, 10.0], [11.0, 12.0]]]);
let permute_udf = udfs::tensor_permute_axes_udf();
let contract_udf = udfs::tensor_contract_axes_udf();
let axis_field = Arc::new(Field::new("axis", DataType::Int64, false));
let (permuted_field, permuted) = invoke_udf(
&permute_udf,
vec![
ColumnarValue::Array(Arc::new(permute_tensor)),
ColumnarValue::Scalar(ScalarValue::Int64(Some(1))),
ColumnarValue::Scalar(ScalarValue::Int64(Some(0))),
],
vec![Arc::clone(&permute_input_field), Arc::clone(&axis_field), Arc::clone(&axis_field)],
&[None, Some(ScalarValue::Int64(Some(1))), Some(ScalarValue::Int64(Some(0)))],
1,
)
.expect("tensor_permute_axes");
let permuted = fixed_shape_view3(&permuted_field, &permuted);
assert_close(permuted[[0, 0, 0]], 1.0);
assert_close(permuted[[0, 0, 1]], 4.0);
assert_close(permuted[[0, 2, 0]], 3.0);
assert_close(permuted[[0, 2, 1]], 6.0);
let (contracted_field, contracted) = invoke_udf(
&contract_udf,
vec![
ColumnarValue::Array(Arc::new(contract_left)),
ColumnarValue::Array(Arc::new(contract_right)),
ColumnarValue::Scalar(ScalarValue::Int64(Some(1))),
ColumnarValue::Scalar(ScalarValue::Int64(Some(0))),
],
vec![
contract_left_field,
contract_right_field,
Arc::clone(&axis_field),
Arc::clone(&axis_field),
],
&[None, None, Some(ScalarValue::Int64(Some(1))), Some(ScalarValue::Int64(Some(0)))],
1,
)
.expect("tensor_contract_axes");
let contracted = fixed_shape_view3(&contracted_field, &contracted);
assert_close(contracted[[0, 0, 0]], 58.0);
assert_close(contracted[[0, 0, 1]], 64.0);
assert_close(contracted[[0, 1, 0]], 139.0);
assert_close(contracted[[0, 1, 1]], 154.0);
}
#[test]
fn tensor_axis_udfs_validate_contract_edges() {
let permute_udf = udfs::tensor_permute_axes_udf();
let contract_udf = udfs::tensor_contract_axes_udf();
let (tensor_field, tensor) = matrix_batch("tensor", [[[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]]]);
let (right_field, right) = matrix_batch("right", [[[7.0, 8.0], [9.0, 10.0], [11.0, 12.0]]]);
let axis_field = Arc::new(Field::new("axis", DataType::Int64, false));
let permutation_error = invoke_udf_error(
&permute_udf,
vec![
ColumnarValue::Array(Arc::new(tensor.clone())),
ColumnarValue::Scalar(ScalarValue::Int64(Some(0))),
],
vec![Arc::clone(&tensor_field), Arc::clone(&axis_field)],
&[None, Some(ScalarValue::Int64(Some(0)))],
1,
);
let duplicate_error = invoke_udf_error(
&permute_udf,
vec![
ColumnarValue::Array(Arc::new(tensor.clone())),
ColumnarValue::Scalar(ScalarValue::Int64(Some(0))),
ColumnarValue::Scalar(ScalarValue::Int64(Some(0))),
],
vec![Arc::clone(&tensor_field), Arc::clone(&axis_field), Arc::clone(&axis_field)],
&[None, Some(ScalarValue::Int64(Some(0))), Some(ScalarValue::Int64(Some(0)))],
1,
);
let runtime_scalar_error = invoke_udf_error(
&permute_udf,
vec![
ColumnarValue::Array(Arc::new(tensor.clone())),
ColumnarValue::Array(Arc::new(Int64Array::from(vec![1_i64]))),
ColumnarValue::Scalar(ScalarValue::Int64(Some(0))),
],
vec![Arc::clone(&tensor_field), Arc::clone(&axis_field), Arc::clone(&axis_field)],
&[None, Some(ScalarValue::Int64(Some(1))), Some(ScalarValue::Int64(Some(0)))],
1,
);
let contract_count_error = invoke_udf_error(
&contract_udf,
vec![
ColumnarValue::Array(Arc::new(tensor.clone())),
ColumnarValue::Array(Arc::new(right.clone())),
ColumnarValue::Scalar(ScalarValue::Int64(Some(0))),
ColumnarValue::Scalar(ScalarValue::Int64(Some(1))),
ColumnarValue::Scalar(ScalarValue::Int64(Some(0))),
],
vec![
Arc::clone(&tensor_field),
Arc::clone(&right_field),
Arc::clone(&axis_field),
Arc::clone(&axis_field),
Arc::clone(&axis_field),
],
&[
None,
None,
Some(ScalarValue::Int64(Some(0))),
Some(ScalarValue::Int64(Some(1))),
Some(ScalarValue::Int64(Some(0))),
],
1,
);
let contract_shape_error = invoke_udf_error(
&contract_udf,
vec![
ColumnarValue::Array(Arc::new(tensor)),
ColumnarValue::Array(Arc::new(right)),
ColumnarValue::Scalar(ScalarValue::Int64(Some(0))),
ColumnarValue::Scalar(ScalarValue::Int64(Some(0))),
],
vec![tensor_field, right_field, Arc::clone(&axis_field), Arc::clone(&axis_field)],
&[None, None, Some(ScalarValue::Int64(Some(0))), Some(ScalarValue::Int64(Some(0)))],
1,
);
assert!(permutation_error.contains("requires permutation length"));
assert!(duplicate_error.contains("duplicate axis"));
assert!(runtime_scalar_error.contains("must be an integer scalar"));
assert!(contract_count_error.contains("requires one or more left/right axis pairs"));
assert!(contract_shape_error.contains("axis mismatch"));
}
#[test]
fn tensor_axis_udfs_cover_float32_branches() {
let (permute_input_field, permute_tensor) =
matrix_batch_f32("permute_tensor", [[[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]]]);
let (contract_left_field, contract_left) =
matrix_batch_f32("contract_left", [[[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]]]);
let (contract_right_field, contract_right) =
matrix_batch_f32("contract_right", [[[7.0, 8.0], [9.0, 10.0], [11.0, 12.0]]]);
let axis_field = Arc::new(Field::new("axis", DataType::Int64, false));
let (permuted_field, permuted) = invoke_udf(
&udfs::tensor_permute_axes_udf(),
vec![
ColumnarValue::Array(Arc::new(permute_tensor)),
ColumnarValue::Scalar(ScalarValue::Int64(Some(1))),
ColumnarValue::Scalar(ScalarValue::Int64(Some(0))),
],
vec![permute_input_field, Arc::clone(&axis_field), Arc::clone(&axis_field)],
&[None, Some(ScalarValue::Int64(Some(1))), Some(ScalarValue::Int64(Some(0)))],
1,
)
.expect("tensor_permute_axes f32");
assert_eq!(array_data_type(&permuted), permuted_field.data_type());
let (contracted_field, contracted) = invoke_udf(
&udfs::tensor_contract_axes_udf(),
vec![
ColumnarValue::Array(Arc::new(contract_left)),
ColumnarValue::Array(Arc::new(contract_right)),
ColumnarValue::Scalar(ScalarValue::Int64(Some(1))),
ColumnarValue::Scalar(ScalarValue::Int64(Some(0))),
],
vec![
contract_left_field,
contract_right_field,
Arc::clone(&axis_field),
Arc::clone(&axis_field),
],
&[None, None, Some(ScalarValue::Int64(Some(1))), Some(ScalarValue::Int64(Some(0)))],
1,
)
.expect("tensor_contract_axes f32");
assert_eq!(array_data_type(&contracted), contracted_field.data_type());
}
#[test]
fn tensor_fixed_shape_matmul_udf_covers_rank3plus_batch() {
let (cube_left_field, cube_left) =
tensor_batch4("cube_left", [[[[1.0, 2.0], [3.0, 4.0]], [[2.0, 0.0], [1.0, 2.0]]]]);
let (cube_right_field, cube_right) =
tensor_batch4("cube_right", [[[[1.0, 0.0], [0.0, 1.0]], [[1.0, 1.0], [0.0, 1.0]]]]);
let matmul_udf = udfs::tensor_batched_matmul_last_two_udf();
let (matmul_field, matmul) = invoke_udf(
&matmul_udf,
vec![ColumnarValue::Array(Arc::new(cube_left)), ColumnarValue::Array(Arc::new(cube_right))],
vec![cube_left_field, cube_right_field],
&[None, None],
1,
)
.expect("tensor_batched_matmul_last_two");
let matmul = fixed_shape_view4(&matmul_field, &matmul);
assert_close(matmul[[0, 0, 0, 0]], 1.0);
assert_close(matmul[[0, 0, 1, 1]], 4.0);
assert_close(matmul[[0, 1, 0, 0]], 2.0);
assert_close(matmul[[0, 1, 1, 1]], 3.0);
}
#[test]
fn tensor_variable_shape_udfs_cover_outputs() {
let (variable_field, variable_batch) = ragged_matrices("variable", vec![
vec![vec![3.0, 4.0], vec![0.0, 5.0]],
vec![vec![6.0, 8.0, 0.0]],
]);
let (other_variable_field, other_variable_batch) = ragged_matrices("other_variable", vec![
vec![vec![4.0, 0.0], vec![1.0, 2.0]],
vec![vec![1.0, 0.0, 2.0]],
]);
let variable_sum_udf = udfs::tensor_variable_sum_last_axis_udf();
let variable_l2_udf = udfs::tensor_variable_l2_norm_last_axis_udf();
let variable_normalize_udf = udfs::tensor_variable_normalize_last_axis_udf();
let variable_dot_udf = udfs::tensor_variable_batched_dot_last_axis_udf();
let (variable_sum_field, variable_sums) = invoke_udf(
&variable_sum_udf,
vec![ColumnarValue::Array(Arc::new(variable_batch.clone()))],
vec![Arc::clone(&variable_field)],
&[None],
2,
)
.expect("tensor_variable_sum_last_axis");
let mut variable_sum_iter = variable_shape_rows(&variable_sum_field, &variable_sums);
let (_, sum_row0) = variable_sum_iter.next().expect("row0").expect("row0 view");
let sum_row0 = sum_row0.into_dimensionality::<Ix1>().expect("rank-1 output");
let (_, sum_row1) = variable_sum_iter.next().expect("row1").expect("row1 view");
let sum_row1 = sum_row1.into_dimensionality::<Ix1>().expect("rank-1 output");
assert_close(sum_row0[[0]], 7.0);
assert_close(sum_row0[[1]], 5.0);
assert_close(sum_row1[[0]], 14.0);
let (variable_l2_field, variable_l2) = invoke_udf(
&variable_l2_udf,
vec![ColumnarValue::Array(Arc::new(variable_batch.clone()))],
vec![Arc::clone(&variable_field)],
&[None],
2,
)
.expect("tensor_variable_l2_norm_last_axis");
let mut variable_l2_iter = variable_shape_rows(&variable_l2_field, &variable_l2);
let (_, l2_row0) = variable_l2_iter.next().expect("row0").expect("row0 view");
let l2_row0 = l2_row0.into_dimensionality::<Ix1>().expect("rank-1 output");
let (_, l2_row1) = variable_l2_iter.next().expect("row1").expect("row1 view");
let l2_row1 = l2_row1.into_dimensionality::<Ix1>().expect("rank-1 output");
assert_close(l2_row0[[0]], 5.0);
assert_close(l2_row0[[1]], 5.0);
assert_close(l2_row1[[0]], 10.0);
let (variable_normalize_field, variable_normalized) = invoke_udf(
&variable_normalize_udf,
vec![ColumnarValue::Array(Arc::new(variable_batch.clone()))],
vec![Arc::clone(&variable_field)],
&[None],
2,
)
.expect("tensor_variable_normalize_last_axis");
let mut variable_normalize_iter =
variable_shape_rows(&variable_normalize_field, &variable_normalized);
let (_, variable_normalized0) =
variable_normalize_iter.next().expect("row0").expect("row0 view");
let variable_normalized0 =
variable_normalized0.into_dimensionality::<Ix2>().expect("rank-2 output");
assert_close(variable_normalized0[[0, 0]], 0.6);
assert_close(variable_normalized0[[0, 1]], 0.8);
assert_close(variable_normalized0[[1, 1]], 1.0);
let (variable_dot_field, variable_dots) = invoke_udf(
&variable_dot_udf,
vec![
ColumnarValue::Array(Arc::new(variable_batch)),
ColumnarValue::Array(Arc::new(other_variable_batch)),
],
vec![variable_field, other_variable_field],
&[None, None],
2,
)
.expect("tensor_variable_batched_dot_last_axis");
let mut variable_dot_iter = variable_shape_rows(&variable_dot_field, &variable_dots);
let (_, dot_row0) = variable_dot_iter.next().expect("row0").expect("row0 view");
let dot_row0 = dot_row0.into_dimensionality::<Ix1>().expect("rank-1 output");
let (_, dot_row1) = variable_dot_iter.next().expect("row1").expect("row1 view");
let dot_row1 = dot_row1.into_dimensionality::<Ix1>().expect("rank-1 output");
assert_close(dot_row0[[0]], 12.0);
assert_close(dot_row0[[1]], 10.0);
assert_close(dot_row1[[0]], 6.0);
}
#[test]
fn udf_catalog_covers_core_trait_entrypoints() {
for udf in udfs::all_default_functions() {
assert!(!udf.name().is_empty());
let _ = udf.signature();
let _ = udf.inner().as_any();
let error =
udf.return_type(&[]).expect_err("return_type should be delegated through fields");
assert!(error.to_string().contains("return_field_from_args should be used instead"));
}
}
#[test]
fn vector_udfs_validate_array_and_storage_contracts() {
let l2_norm_udf = udfs::vector_l2_norm_udf();
let vector_input = fixed_size_list([[1.0, 2.0, 3.0]]);
let vector_field = vector_field("vector", &DataType::Float64, 3, false).expect("vector field");
let scalar_error = invoke_udf_error(
&l2_norm_udf,
vec![ColumnarValue::Scalar(ScalarValue::Float64(Some(1.0)))],
vec![Arc::clone(&vector_field)],
&[None],
1,
);
let storage_error = invoke_udf_error(
&l2_norm_udf,
vec![ColumnarValue::Array(Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0])))],
vec![Arc::clone(&vector_field)],
&[None],
3,
);
let field_error = invoke_udf_error(
&l2_norm_udf,
vec![ColumnarValue::Array(Arc::new(vector_input))],
vec![Arc::new(Field::new("vector", DataType::Float64, false))],
&[None],
1,
);
assert!(scalar_error.contains("argument 1 must be an array column"));
assert!(storage_error.contains("expected FixedSizeListArray storage"));
assert!(field_error.contains("expected FixedSizeList<Float32|Float64>(D)"));
}
#[test]
fn matrix_udfs_validate_shape_contracts() {
let matvec_udf = udfs::matrix_matvec_udf();
let matmul_udf = udfs::matrix_matmul_udf();
let lu_udf = udfs::matrix_lu_udf();
let lu_solve_udf = udfs::matrix_lu_solve_udf();
let qr_solve_udf = udfs::matrix_qr_solve_least_squares_udf();
let (left_field, left) = matrix_batch("left", [[[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]]]);
let (right_field, right) = matrix_batch("right", [[[1.0, 0.0], [0.0, 1.0]]]);
let (nonsquare_field, nonsquare) =
matrix_batch("nonsquare", [[[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]]]);
let rhs = fixed_size_list([[1.0, 2.0]]);
let rhs_field = vector_field("rhs", &DataType::Float64, 2, false).expect("rhs field");
let matmul_error = invoke_udf_error(
&matmul_udf,
vec![ColumnarValue::Array(Arc::new(left)), ColumnarValue::Array(Arc::new(right))],
vec![left_field, right_field],
&[None, None],
1,
);
let matvec_error = invoke_udf_error(
&matvec_udf,
vec![
ColumnarValue::Array(Arc::new(nonsquare.clone())),
ColumnarValue::Array(Arc::new(rhs.clone())),
],
vec![Arc::clone(&nonsquare_field), Arc::clone(&rhs_field)],
&[None, None],
1,
);
let lu_error = invoke_udf_error(
&lu_udf,
vec![ColumnarValue::Array(Arc::new(nonsquare.clone()))],
vec![Arc::clone(&nonsquare_field)],
&[None],
1,
);
let rhs_error = invoke_udf_error(
&lu_solve_udf,
vec![ColumnarValue::Array(Arc::new(nonsquare)), ColumnarValue::Array(Arc::new(rhs))],
vec![nonsquare_field, rhs_field],
&[None, None],
1,
);
let qr_rhs_error = invoke_udf_error(
&qr_solve_udf,
vec![
ColumnarValue::Array(Arc::new(matrix_batch("qr", [[[1.0, 2.0], [3.0, 4.0]]]).1)),
ColumnarValue::Array(Arc::new(fixed_size_list([[1.0, 2.0, 3.0]]))),
],
vec![
matrix_batch("qr_field", [[[1.0, 2.0], [3.0, 4.0]]]).0,
vector_field("rhs_long", &DataType::Float64, 3, false).expect("rhs field"),
],
&[None, None],
1,
);
assert!(matmul_error.contains("incompatible matrix shapes"));
assert!(matvec_error.contains("rhs vector length mismatch"));
assert!(lu_error.contains("matrix_lu requires square matrices"));
assert!(rhs_error.contains("matrix_lu_solve requires square matrices"));
assert!(qr_rhs_error.contains("rhs vector length mismatch"));
}
#[test]
fn triangular_and_matrix_function_udfs_validate_shape_contracts() {
let solve_lower_udf = udfs::matrix_solve_lower_udf();
let solve_upper_udf = udfs::matrix_solve_upper_udf();
let solve_lower_matrix_udf = udfs::matrix_solve_lower_matrix_udf();
let sign_udf = udfs::matrix_sign_udf();
let (square_field, square) = matrix_batch("square", [[[2.0, 0.0], [0.0, 4.0]]]);
let (nonsquare_field, nonsquare) =
matrix_batch("nonsquare", [[[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]]]);
let short_rhs = fixed_size_list([[1.0, 2.0, 3.0]]);
let short_rhs_field =
vector_field("rhs_long", &DataType::Float64, 3, false).expect("rhs long field");
let (rhs_matrix_field, rhs_matrices) = matrix_batch("rhs_matrix", [[[1.0], [2.0], [3.0]]]);
let lower_square_error = invoke_udf_error(
&solve_lower_udf,
vec![
ColumnarValue::Array(Arc::new(nonsquare.clone())),
ColumnarValue::Array(Arc::new(short_rhs.clone())),
],
vec![Arc::clone(&nonsquare_field), Arc::clone(&short_rhs_field)],
&[None, None],
1,
);
let upper_rhs_error = invoke_udf_error(
&solve_upper_udf,
vec![
ColumnarValue::Array(Arc::new(square.clone())),
ColumnarValue::Array(Arc::new(short_rhs)),
],
vec![Arc::clone(&square_field), short_rhs_field],
&[None, None],
1,
);
let lower_matrix_rhs_error = invoke_udf_error(
&solve_lower_matrix_udf,
vec![ColumnarValue::Array(Arc::new(square)), ColumnarValue::Array(Arc::new(rhs_matrices))],
vec![square_field, rhs_matrix_field],
&[None, None],
1,
);
let sign_error = invoke_udf_error(
&sign_udf,
vec![ColumnarValue::Array(Arc::new(nonsquare))],
vec![nonsquare_field],
&[None],
1,
);
assert!(lower_square_error.contains("requires square matrices"));
assert!(upper_rhs_error.contains("rhs vector length mismatch"));
assert!(lower_matrix_rhs_error.contains("rhs matrix row mismatch"));
assert!(sign_error.contains("requires square matrices"));
}
#[test]
fn matrix_lu_solve_and_linear_regression_validate_runtime_batch_edges() {
let lu_solve_udf = udfs::matrix_lu_solve_udf();
let linear_regression_udf = udfs::linear_regression_udf();
let (matrices_field, matrices) =
matrix_batch("matrices", [[[1.0, 0.0], [0.0, 1.0]], [[2.0, 1.0], [1.0, 2.0]]]);
let short_rhs = fixed_size_list([[1.0, 2.0]]);
let short_rhs_field = vector_field("rhs", &DataType::Float64, 2, false).expect("rhs field");
let (design_field, design) =
matrix_batch("design", [[[1.0, 0.0], [0.0, 1.0]], [[1.0, 1.0], [1.0, 0.0]]]);
let response = fixed_size_list([[1.0, 2.0]]);
let response_field =
vector_field("response", &DataType::Float64, 2, false).expect("response field");
let bool_field = Arc::new(Field::new("add_intercept", DataType::Boolean, false));
let lu_solve_error = invoke_udf_error(
&lu_solve_udf,
vec![
ColumnarValue::Array(Arc::new(matrices)),
ColumnarValue::Array(Arc::new(short_rhs.clone())),
],
vec![Arc::clone(&matrices_field), Arc::clone(&short_rhs_field)],
&[None, None],
2,
);
let missing_scalar_error = invoke_udf_error(
&linear_regression_udf,
vec![
ColumnarValue::Array(Arc::new(design.clone())),
ColumnarValue::Array(Arc::new(response.clone())),
ColumnarValue::Scalar(ScalarValue::Boolean(Some(false))),
],
vec![Arc::clone(&design_field), Arc::clone(&response_field), Arc::clone(&bool_field)],
&[None, None, None],
2,
);
let scalar_runtime_error = invoke_udf_error(
&linear_regression_udf,
vec![
ColumnarValue::Array(Arc::new(design.clone())),
ColumnarValue::Array(Arc::new(response.clone())),
ColumnarValue::Array(Arc::new(Float64Array::from(vec![1.0, 0.0]))),
],
vec![Arc::clone(&design_field), Arc::clone(&response_field), Arc::clone(&bool_field)],
&[None, None, Some(ScalarValue::Boolean(Some(false)))],
2,
);
let regression_batch_error = invoke_udf_error(
&linear_regression_udf,
vec![
ColumnarValue::Array(Arc::new(design)),
ColumnarValue::Array(Arc::new(response)),
ColumnarValue::Scalar(ScalarValue::Boolean(Some(false))),
],
vec![design_field, response_field, bool_field],
&[None, None, Some(ScalarValue::Boolean(Some(false)))],
2,
);
assert!(lu_solve_error.contains("batch length mismatch"));
assert!(missing_scalar_error.contains("argument 3 must be a non-null scalar"));
assert!(scalar_runtime_error.contains("argument 3 must be a scalar Boolean"));
assert!(regression_batch_error.contains("batch length mismatch"));
}
#[test]
fn sparse_and_tensor_udfs_validate_contract_edges() {
let sparse_matvec_udf = udfs::sparse_matvec_udf();
let tensor_sum_udf = udfs::tensor_sum_last_axis_udf();
let (sparse_field, sparse) = sparse_batch("sparse");
let (ragged_field, ragged_vectors) = ragged_vectors("vectors", vec![vec![1.0, 2.0], vec![3.0]]);
let rank_two_vectors = crate::metadata::variable_shape_tensor_field(
"vectors",
&DataType::Float64,
2,
Some(&[None, None] as &[Option<i32>]),
false,
)
.expect("rank-2 ragged field");
let rank_one_tensor =
crate::metadata::fixed_shape_tensor_field("tensor", &DataType::Float64, &[4], false)
.expect("rank-1 tensor field");
let rank_one_tensor_values = FixedSizeListArray::from_iter_primitive::<Float64Type, _, _>(
vec![Some(vec![Some(1.0), Some(2.0), Some(3.0), Some(4.0)])],
4,
);
let sparse_rank_error = invoke_udf_error(
&sparse_matvec_udf,
vec![
ColumnarValue::Array(Arc::new(sparse.clone())),
ColumnarValue::Array(Arc::new(ragged_vectors.clone())),
],
vec![Arc::clone(&sparse_field), rank_two_vectors],
&[None, None],
2,
);
let sparse_storage_error = invoke_udf_error(
&sparse_matvec_udf,
vec![
ColumnarValue::Array(Arc::new(fixed_size_list([[1.0, 2.0]]))),
ColumnarValue::Array(Arc::new(ragged_vectors)),
],
vec![sparse_field, ragged_field],
&[None, None],
1,
);
let tensor_rank_error = invoke_udf_error(
&tensor_sum_udf,
vec![ColumnarValue::Array(Arc::new(rank_one_tensor_values))],
vec![rank_one_tensor],
&[None],
1,
);
assert!(sparse_rank_error.contains("batch of rank-1 dense vectors"));
assert!(sparse_storage_error.contains("expected StructArray storage"));
assert!(tensor_rank_error.contains("requires tensors with rank >= 2"));
}
#[test]
fn float32_constructor_udfs_reject_invalid_shapes() {
let values_field =
Arc::new(Field::new("values", DataType::new_list(DataType::Float32, false), false));
let int_list_field =
Arc::new(Field::new("shape", DataType::new_list(DataType::Int32, false), false));
let rows_field = Arc::new(Field::new("rows", DataType::Int64, false));
let cols_field = Arc::new(Field::new("cols", DataType::Int64, false));
let rank_field = Arc::new(Field::new("rank", DataType::Int64, false));
let row_ptrs_field =
Arc::new(Field::new("row_ptrs", DataType::new_list(DataType::Int32, false), false));
let indices_field =
Arc::new(Field::new("indices", DataType::new_list(DataType::UInt32, false), false));
let matrix_error = invoke_udf_error(
&udfs::make_matrix_udf(),
vec![
ColumnarValue::Scalar(scalar_float32_list(vec![1.0, 2.0, 3.0])),
ColumnarValue::Scalar(ScalarValue::Int64(Some(2))),
ColumnarValue::Scalar(ScalarValue::Int64(Some(2))),
],
vec![Arc::clone(&values_field), Arc::clone(&rows_field), Arc::clone(&cols_field)],
&[
Some(scalar_float32_list(vec![1.0, 2.0, 3.0])),
Some(ScalarValue::Int64(Some(2))),
Some(ScalarValue::Int64(Some(2))),
],
1,
);
let variable_error = invoke_udf_error(
&udfs::make_variable_tensor_udf(),
vec![
ColumnarValue::Scalar(scalar_float32_list(vec![1.0, 2.0])),
ColumnarValue::Scalar(scalar_int32_list(vec![1, 2])),
ColumnarValue::Scalar(ScalarValue::Int64(Some(1))),
],
vec![Arc::clone(&values_field), Arc::clone(&int_list_field), Arc::clone(&rank_field)],
&[
Some(scalar_float32_list(vec![1.0, 2.0])),
Some(scalar_int32_list(vec![1, 2])),
Some(ScalarValue::Int64(Some(1))),
],
1,
);
let csr_error = invoke_udf_error(
&udfs::make_csr_matrix_batch_udf(),
vec![
ColumnarValue::Array(Arc::new(int32_list_array(vec![vec![2, 2]]))),
ColumnarValue::Array(Arc::new(int32_list_array(vec![vec![0, 1, 2], vec![0, 1]]))),
ColumnarValue::Array(Arc::new(u32_list_array(vec![vec![0, 1]]))),
ColumnarValue::Array(Arc::new(float32_list_array(vec![vec![1.0, 2.0]]))),
],
vec![int_list_field, row_ptrs_field, indices_field, values_field],
&[None, None, None, None],
2,
);
assert!(matrix_error.contains("expected 4 row-major values"));
assert!(variable_error.contains("expected rank 1"));
assert!(csr_error.contains("argument length mismatch"));
}
#[test]
fn float32_tensor_and_sparse_udfs_validate_contract_edges() {
let sparse_matvec_udf = udfs::sparse_matvec_udf();
let tensor_sum_udf = udfs::tensor_sum_last_axis_udf();
let variable_sum_udf = udfs::tensor_variable_sum_last_axis_udf();
let (sparse_field, sparse) = sparse_batch_f32("sparse");
let (ragged_vectors_field, ragged_vectors) =
ragged_vectors_f32("vectors", vec![vec![1.0, 2.0], vec![3.0]]);
let rank_two_vectors = crate::metadata::variable_shape_tensor_field(
"vectors",
&DataType::Float32,
2,
Some(&[None, None] as &[Option<i32>]),
false,
)
.expect("rank-2 ragged field");
let rank_one_tensor =
crate::metadata::fixed_shape_tensor_field("tensor", &DataType::Float32, &[4], false)
.expect("rank-1 tensor field");
let rank_one_tensor_values = FixedSizeListArray::from_iter_primitive::<Float32Type, _, _>(
vec![Some(vec![Some(1.0), Some(2.0), Some(3.0), Some(4.0)])],
4,
);
let sparse_rank_error = invoke_udf_error(
&sparse_matvec_udf,
vec![
ColumnarValue::Array(Arc::new(sparse)),
ColumnarValue::Array(Arc::new(ragged_vectors.clone())),
],
vec![sparse_field, rank_two_vectors],
&[None, None],
2,
);
let tensor_rank_error = invoke_udf_error(
&tensor_sum_udf,
vec![ColumnarValue::Array(Arc::new(rank_one_tensor_values))],
vec![rank_one_tensor],
&[None],
1,
);
let variable_rank_error = invoke_udf_error(
&variable_sum_udf,
vec![ColumnarValue::Array(Arc::new(ragged_vectors))],
vec![ragged_vectors_field],
&[None],
2,
);
assert!(sparse_rank_error.contains("batch of rank-1 dense vectors"));
assert!(tensor_rank_error.contains("requires tensors with rank >= 2"));
assert!(variable_rank_error.contains("requires tensors with rank >= 2"));
}
#[test]
fn dense_constructor_udfs_cover_float32_branches() {
let values_field =
Arc::new(Field::new("values", DataType::new_list(DataType::Float32, false), false));
let nested_values_field = Arc::new(Field::new(
"nested_values",
DataType::new_list(DataType::new_list(DataType::Float32, false), false),
false,
));
let (vector_field_out, vector_output) = invoke_udf(
&udfs::make_vector_udf(),
vec![
ColumnarValue::Scalar(scalar_float32_list(vec![3.0, 4.0])),
ColumnarValue::Scalar(ScalarValue::Int64(Some(2))),
],
vec![Arc::clone(&values_field), Arc::new(Field::new("dim", DataType::Int64, false))],
&[Some(scalar_float32_list(vec![3.0, 4.0])), Some(ScalarValue::Int64(Some(2)))],
1,
)
.expect("make_vector f32");
assert_eq!(
vector_field_out.data_type(),
&DataType::new_fixed_size_list(DataType::Float32, 2, false)
);
assert_eq!(array_data_type(&vector_output), vector_field_out.data_type());
let (matrix_field_out, matrix_output) = invoke_udf(
&udfs::make_matrix_udf(),
vec![
ColumnarValue::Scalar(scalar_nested_float32_list(vec![vec![1.0, 2.0], vec![3.0, 4.0]])),
ColumnarValue::Scalar(ScalarValue::Int64(Some(2))),
ColumnarValue::Scalar(ScalarValue::Int64(Some(2))),
],
vec![
Arc::clone(&nested_values_field),
Arc::new(Field::new("rows", DataType::Int64, false)),
Arc::new(Field::new("cols", DataType::Int64, false)),
],
&[
Some(scalar_nested_float32_list(vec![vec![1.0, 2.0], vec![3.0, 4.0]])),
Some(ScalarValue::Int64(Some(2))),
Some(ScalarValue::Int64(Some(2))),
],
1,
)
.expect("make_matrix f32");
assert_eq!(array_data_type(&matrix_output), matrix_field_out.data_type());
let (tensor_field_out, tensor_output) = invoke_udf(
&udfs::make_tensor_udf(),
vec![
ColumnarValue::Scalar(scalar_float32_list(vec![1.0, 2.0, 3.0, 4.0])),
ColumnarValue::Scalar(ScalarValue::Int64(Some(2))),
ColumnarValue::Scalar(ScalarValue::Int64(Some(2))),
],
vec![
Arc::clone(&values_field),
Arc::new(Field::new("d0", DataType::Int64, false)),
Arc::new(Field::new("d1", DataType::Int64, false)),
],
&[
Some(scalar_float32_list(vec![1.0, 2.0, 3.0, 4.0])),
Some(ScalarValue::Int64(Some(2))),
Some(ScalarValue::Int64(Some(2))),
],
1,
)
.expect("make_tensor f32");
assert_eq!(array_data_type(&tensor_output), tensor_field_out.data_type());
}
#[test]
fn variable_and_sparse_constructor_udfs_cover_float32_branches() {
let values_field =
Arc::new(Field::new("values", DataType::new_list(DataType::Float32, false), false));
let int_list_field =
Arc::new(Field::new("shape", DataType::new_list(DataType::Int32, false), false));
let u32_list_field =
Arc::new(Field::new("indices", DataType::new_list(DataType::UInt32, false), false));
let (variable_field_out, variable_output) = invoke_udf(
&udfs::make_variable_tensor_udf(),
vec![
ColumnarValue::Scalar(scalar_float32_list(vec![1.0, 2.0, 3.0])),
ColumnarValue::Scalar(scalar_int32_list(vec![3])),
ColumnarValue::Scalar(ScalarValue::Int64(Some(1))),
],
vec![
Arc::clone(&values_field),
Arc::clone(&int_list_field),
Arc::new(Field::new("rank", DataType::Int64, false)),
],
&[
Some(scalar_float32_list(vec![1.0, 2.0, 3.0])),
Some(scalar_int32_list(vec![3])),
Some(ScalarValue::Int64(Some(1))),
],
1,
)
.expect("make_variable_tensor f32");
assert_eq!(array_data_type(&variable_output), variable_field_out.data_type());
let (csr_field_out, csr_output) = invoke_udf(
&udfs::make_csr_matrix_batch_udf(),
vec![
ColumnarValue::Array(Arc::new(int32_list_array(vec![vec![2, 2]]))),
ColumnarValue::Array(Arc::new(int32_list_array(vec![vec![0, 1, 2]]))),
ColumnarValue::Array(Arc::new(u32_list_array(vec![vec![0, 1]]))),
ColumnarValue::Array(Arc::new(float32_list_array(vec![vec![5.0, 6.0]]))),
],
vec![
Arc::clone(&int_list_field),
Arc::new(Field::new("row_ptrs", DataType::new_list(DataType::Int32, false), false)),
u32_list_field,
values_field,
],
&[None, None, None, None],
1,
)
.expect("make_csr_matrix_batch f32");
assert_eq!(array_data_type(&csr_output), csr_field_out.data_type());
}
#[test]
fn float32_constructor_udfs_cover_flat_and_empty_paths() {
let values_field =
Arc::new(Field::new("values", DataType::new_list(DataType::Float32, false), false));
let int_list_field =
Arc::new(Field::new("shape", DataType::new_list(DataType::Int32, false), false));
let u32_list_field =
Arc::new(Field::new("indices", DataType::new_list(DataType::UInt32, false), false));
let (matrix_field_out, matrix_output) = invoke_udf(
&udfs::make_matrix_udf(),
vec![
ColumnarValue::Scalar(scalar_float32_list(vec![1.0, 2.0, 3.0, 4.0])),
ColumnarValue::Scalar(ScalarValue::Int64(Some(2))),
ColumnarValue::Scalar(ScalarValue::Int64(Some(2))),
],
vec![
Arc::clone(&values_field),
Arc::new(Field::new("rows", DataType::Int64, false)),
Arc::new(Field::new("cols", DataType::Int64, false)),
],
&[
Some(scalar_float32_list(vec![1.0, 2.0, 3.0, 4.0])),
Some(ScalarValue::Int64(Some(2))),
Some(ScalarValue::Int64(Some(2))),
],
1,
)
.expect("make_matrix flat f32");
assert_eq!(array_data_type(&matrix_output), matrix_field_out.data_type());
let (variable_field_out, variable_output) = invoke_udf(
&udfs::make_variable_tensor_udf(),
vec![
ColumnarValue::Array(Arc::new(float32_list_array(vec![]))),
ColumnarValue::Array(Arc::new(int32_list_array(vec![]))),
ColumnarValue::Scalar(ScalarValue::Int64(Some(1))),
],
vec![
Arc::clone(&values_field),
Arc::clone(&int_list_field),
Arc::new(Field::new("rank", DataType::Int64, false)),
],
&[None, None, Some(ScalarValue::Int64(Some(1)))],
0,
)
.expect("make_variable_tensor empty f32");
assert_eq!(array_data_type(&variable_output), variable_field_out.data_type());
let (csr_field_out, csr_output) = invoke_udf(
&udfs::make_csr_matrix_batch_udf(),
vec![
ColumnarValue::Array(Arc::new(int32_list_array(vec![]))),
ColumnarValue::Array(Arc::new(int32_list_array(vec![]))),
ColumnarValue::Array(Arc::new(u32_list_array(vec![]))),
ColumnarValue::Array(Arc::new(float32_list_array(vec![]))),
],
vec![
int_list_field,
Arc::new(Field::new("row_ptrs", DataType::new_list(DataType::Int32, false), false)),
u32_list_field,
values_field,
],
&[None, None, None, None],
0,
)
.expect("make_csr_matrix_batch empty f32");
assert_eq!(array_data_type(&csr_output), csr_field_out.data_type());
}
#[test]
fn vector_udfs_cover_float32_branches() {
let vectors = fixed_size_list_f32([[3.0, 4.0], [5.0, 12.0]]);
let other = fixed_size_list_f32([[1.0, 2.0], [2.0, 1.0]]);
let vector_field = vector_field("vector", &DataType::Float32, 2, false).expect("vector field");
let (_norm_field, norm_output) = invoke_udf(
&udfs::vector_l2_norm_udf(),
vec![ColumnarValue::Array(Arc::new(vectors.clone()))],
vec![Arc::clone(&vector_field)],
&[None],
2,
)
.expect("vector_l2_norm f32");
assert!((f32_array(&norm_output).value(0) - 5.0_f32).abs() < 1.0e-6_f32);
let (_dot_field, dot_output) = invoke_udf(
&udfs::vector_dot_udf(),
vec![
ColumnarValue::Array(Arc::new(vectors.clone())),
ColumnarValue::Array(Arc::new(other.clone())),
],
vec![Arc::clone(&vector_field), Arc::clone(&vector_field)],
&[None, None],
2,
)
.expect("vector_dot f32");
assert!((f32_array(&dot_output).value(0) - 11.0_f32).abs() < 1.0e-6_f32);
let (_cos_field, cos_output) = invoke_udf(
&udfs::vector_cosine_similarity_udf(),
vec![
ColumnarValue::Array(Arc::new(vectors.clone())),
ColumnarValue::Array(Arc::new(other.clone())),
],
vec![Arc::clone(&vector_field), Arc::clone(&vector_field)],
&[None, None],
2,
)
.expect("vector_cosine_similarity f32");
assert!(f32_array(&cos_output).value(0) > 0.0);
let (_distance_field, distance_output) = invoke_udf(
&udfs::vector_cosine_distance_udf(),
vec![
ColumnarValue::Array(Arc::new(vectors.clone())),
ColumnarValue::Array(Arc::new(other)),
],
vec![Arc::clone(&vector_field), Arc::clone(&vector_field)],
&[None, None],
2,
)
.expect("vector_cosine_distance f32");
assert!(f32_array(&distance_output).value(0) >= 0.0);
let (normalize_field, normalize_output) = invoke_udf(
&udfs::vector_normalize_udf(),
vec![ColumnarValue::Array(Arc::new(vectors))],
vec![vector_field],
&[None],
2,
)
.expect("vector_normalize f32");
assert_eq!(array_data_type(&normalize_output), normalize_field.data_type());
}
#[test]
fn matrix_and_decomposition_udfs_cover_float32_branches() {
let (matrix_field, matrices) = matrix_batch_f32("matrix", [[[4.0, 1.0], [1.0, 3.0]]]);
let rhs_field = vector_field("rhs", &DataType::Float32, 2, false).expect("rhs field");
let rhs = fixed_size_list_f32([[1.0, 2.0]]);
let (matmul_field, matmul_output) = invoke_udf(
&udfs::matrix_matmul_udf(),
vec![
ColumnarValue::Array(Arc::new(matrices.clone())),
ColumnarValue::Array(Arc::new(matrices.clone())),
],
vec![Arc::clone(&matrix_field), Arc::clone(&matrix_field)],
&[None, None],
1,
)
.expect("matrix_matmul f32");
assert_eq!(array_data_type(&matmul_output), matmul_field.data_type());
let (matvec_field, matvec_output) = invoke_udf(
&udfs::matrix_matvec_udf(),
vec![
ColumnarValue::Array(Arc::new(matrices.clone())),
ColumnarValue::Array(Arc::new(rhs.clone())),
],
vec![Arc::clone(&matrix_field), Arc::clone(&rhs_field)],
&[None, None],
1,
)
.expect("matrix_matvec f32");
assert_eq!(array_data_type(&matvec_output), matvec_field.data_type());
let (solve_field, solve_output) = invoke_udf(
&udfs::matrix_lu_solve_udf(),
vec![
ColumnarValue::Array(Arc::new(matrices.clone())),
ColumnarValue::Array(Arc::new(rhs.clone())),
],
vec![Arc::clone(&matrix_field), Arc::clone(&rhs_field)],
&[None, None],
1,
)
.expect("matrix_lu_solve f32");
assert_eq!(array_data_type(&solve_output), solve_field.data_type());
let (cholesky_solve_field, cholesky_solve_output) = invoke_udf(
&udfs::matrix_cholesky_solve_udf(),
vec![
ColumnarValue::Array(Arc::new(matrices.clone())),
ColumnarValue::Array(Arc::new(rhs.clone())),
],
vec![Arc::clone(&matrix_field), Arc::clone(&rhs_field)],
&[None, None],
1,
)
.expect("matrix_cholesky_solve f32");
assert_eq!(array_data_type(&cholesky_solve_output), cholesky_solve_field.data_type());
for udf in [
udfs::matrix_lu_udf(),
udfs::matrix_inverse_udf(),
udfs::matrix_determinant_udf(),
udfs::matrix_log_determinant_udf(),
udfs::matrix_cholesky_udf(),
udfs::matrix_cholesky_inverse_udf(),
udfs::matrix_qr_udf(),
udfs::matrix_svd_udf(),
udfs::matrix_qr_condition_number_udf(),
udfs::matrix_qr_reconstruct_udf(),
udfs::matrix_svd_pseudo_inverse_udf(),
udfs::matrix_svd_condition_number_udf(),
udfs::matrix_svd_rank_udf(),
udfs::matrix_svd_reconstruct_udf(),
] {
let (_field, output) = invoke_udf(
&udf,
vec![ColumnarValue::Array(Arc::new(matrices.clone()))],
vec![Arc::clone(&matrix_field)],
&[None],
1,
)
.unwrap_or_else(|error| panic!("{} f32: {error}", udf.name()));
match output {
ColumnarValue::Array(_) => {}
ColumnarValue::Scalar(_) => panic!("expected array output"),
}
}
let (_field, output) = invoke_udf(
&udfs::matrix_qr_solve_least_squares_udf(),
vec![ColumnarValue::Array(Arc::new(matrices.clone())), ColumnarValue::Array(Arc::new(rhs))],
vec![matrix_field, rhs_field],
&[None, None],
1,
)
.expect("matrix_qr_solve_least_squares f32");
match output {
ColumnarValue::Array(_) => {}
ColumnarValue::Scalar(_) => panic!("expected array output"),
}
}
#[test]
fn triangular_udfs_cover_float32_branches() {
let (triangular_field, triangular_matrices) =
matrix_batch_f32("triangular", [[[2.0, 0.0], [1.0, 4.0]]]);
let rhs_field = vector_field("rhs", &DataType::Float32, 2, false).expect("rhs field");
let rhs = fixed_size_list_f32([[4.0, 10.0]]);
let (rhs_matrix_field, rhs_matrices) =
matrix_batch_f32("rhs_matrix", [[[4.0, 8.0], [10.0, 12.0]]]);
for udf in [udfs::matrix_solve_lower_udf(), udfs::matrix_solve_upper_udf()] {
let (field, output) = invoke_udf(
&udf,
vec![
ColumnarValue::Array(Arc::new(triangular_matrices.clone())),
ColumnarValue::Array(Arc::new(rhs.clone())),
],
vec![Arc::clone(&triangular_field), Arc::clone(&rhs_field)],
&[None, None],
1,
)
.unwrap_or_else(|error| panic!("{} f32: {error}", udf.name()));
assert_eq!(array_data_type(&output), field.data_type());
}
for udf in [udfs::matrix_solve_lower_matrix_udf(), udfs::matrix_solve_upper_matrix_udf()] {
let (field, output) = invoke_udf(
&udf,
vec![
ColumnarValue::Array(Arc::new(triangular_matrices.clone())),
ColumnarValue::Array(Arc::new(rhs_matrices.clone())),
],
vec![Arc::clone(&triangular_field), Arc::clone(&rhs_matrix_field)],
&[None, None],
1,
)
.unwrap_or_else(|error| panic!("{} f32: {error}", udf.name()));
assert_eq!(array_data_type(&output), field.data_type());
}
}
#[test]
fn matrix_function_udfs_cover_float32_branches() {
let (exp_field, exp_matrices) = matrix_batch_f32("exp", [[[0.0, 0.0], [0.0, 1.0]]]);
let (exp_taylor_field, exp_taylor_matrices) =
matrix_batch_f32("exp_taylor", [[[0.0, 0.0], [0.0, 1.0]]]);
let (log_field, log_matrices) =
matrix_batch_f32("log", [[[1.0, 0.0], [0.0, std::f32::consts::E.powi(2)]]]);
let (log_taylor_field, log_taylor_matrices) =
matrix_batch_f32("log_taylor", [[[1.0, 0.0], [0.0, 1.1]]]);
let (power_field, power_matrices) = matrix_batch_f32("power", [[[4.0, 0.0], [0.0, 9.0]]]);
let (sign_field, sign_matrices) = matrix_batch_f32("sign", [[[4.0, 0.0], [0.0, -2.0]]]);
for (udf, field, values) in [
(udfs::matrix_exp_eigen_udf(), Arc::clone(&exp_field), exp_matrices),
(udfs::matrix_log_eigen_udf(), Arc::clone(&log_field), log_matrices.clone()),
(udfs::matrix_log_svd_udf(), log_field, log_matrices),
(udfs::matrix_sign_udf(), sign_field, sign_matrices),
] {
let (return_field, output) =
invoke_udf(&udf, vec![ColumnarValue::Array(Arc::new(values))], vec![field], &[None], 1)
.unwrap_or_else(|error| panic!("{} f32: {error}", udf.name()));
assert_eq!(array_data_type(&output), return_field.data_type());
}
for (udf, field, values, scalar_args, scalar_fields) in [
(
udfs::matrix_exp_udf(),
exp_taylor_field,
exp_taylor_matrices,
vec![
ColumnarValue::Scalar(ScalarValue::Int64(Some(32))),
ColumnarValue::Scalar(ScalarValue::Float64(Some(1.0e-6))),
],
vec![
Arc::new(Field::new("max_terms", DataType::Int64, false)),
Arc::new(Field::new("tolerance", DataType::Float64, false)),
],
),
(
udfs::matrix_log_taylor_udf(),
log_taylor_field,
log_taylor_matrices,
vec![
ColumnarValue::Scalar(ScalarValue::Int64(Some(64))),
ColumnarValue::Scalar(ScalarValue::Float64(Some(1.0e-6))),
],
vec![
Arc::new(Field::new("max_terms", DataType::Int64, false)),
Arc::new(Field::new("tolerance", DataType::Float64, false)),
],
),
(
udfs::matrix_power_udf(),
power_field,
power_matrices,
vec![ColumnarValue::Scalar(ScalarValue::Float64(Some(0.5)))],
vec![Arc::new(Field::new("power", DataType::Float64, false))],
),
] {
let mut args = vec![ColumnarValue::Array(Arc::new(values))];
args.extend(scalar_args.clone());
let mut arg_fields = vec![field];
arg_fields.extend(scalar_fields);
let scalar_arguments = scalar_args
.into_iter()
.map(|value| match value {
ColumnarValue::Scalar(value) => Some(value),
ColumnarValue::Array(_) => None,
})
.collect::<Vec<_>>();
let mut scalar_refs = vec![None];
scalar_refs.extend(scalar_arguments);
let (return_field, output) = invoke_udf(&udf, args, arg_fields, &scalar_refs, 1)
.unwrap_or_else(|error| panic!("{} f32: {error}", udf.name()));
assert_eq!(array_data_type(&output), return_field.data_type());
}
}
#[test]
fn tensor_udfs_cover_float32_branches() {
let (tensor_field, tensors) =
tensor_batch4_f32("tensor", [[[[1.0, 2.0], [3.0, 4.0]], [[2.0, 1.0], [0.0, 1.0]]]]);
let (ragged_matrix_field, ragged_matrices) =
ragged_matrices_f32("matrices", vec![vec![vec![1.0, 2.0], vec![3.0, 4.0]]]);
for udf in [
udfs::tensor_sum_last_axis_udf(),
udfs::tensor_l2_norm_last_axis_udf(),
udfs::tensor_normalize_last_axis_udf(),
] {
let (_field, output) = invoke_udf(
&udf,
vec![ColumnarValue::Array(Arc::new(tensors.clone()))],
vec![Arc::clone(&tensor_field)],
&[None],
1,
)
.unwrap_or_else(|error| panic!("{} f32: {error}", udf.name()));
match output {
ColumnarValue::Array(_) => {}
ColumnarValue::Scalar(_) => panic!("expected array output"),
}
}
let (_field, _output) = invoke_udf(
&udfs::tensor_batched_dot_last_axis_udf(),
vec![
ColumnarValue::Array(Arc::new(tensors.clone())),
ColumnarValue::Array(Arc::new(tensors.clone())),
],
vec![Arc::clone(&tensor_field), Arc::clone(&tensor_field)],
&[None, None],
1,
)
.expect("tensor batched dot f32");
let (_field, _output) = invoke_udf(
&udfs::tensor_batched_matmul_last_two_udf(),
vec![
ColumnarValue::Array(Arc::new(tensors.clone())),
ColumnarValue::Array(Arc::new(tensors)),
],
vec![Arc::clone(&tensor_field), Arc::clone(&tensor_field)],
&[None, None],
1,
)
.expect("tensor batched matmul f32");
for udf in [
udfs::tensor_variable_sum_last_axis_udf(),
udfs::tensor_variable_l2_norm_last_axis_udf(),
udfs::tensor_variable_normalize_last_axis_udf(),
] {
let (_field, _output) = invoke_udf(
&udf,
vec![ColumnarValue::Array(Arc::new(ragged_matrices.clone()))],
vec![Arc::clone(&ragged_matrix_field)],
&[None],
1,
)
.unwrap_or_else(|error| panic!("{} f32: {error}", udf.name()));
}
let (_field, _output) = invoke_udf(
&udfs::tensor_variable_batched_dot_last_axis_udf(),
vec![
ColumnarValue::Array(Arc::new(ragged_matrices.clone())),
ColumnarValue::Array(Arc::new(ragged_matrices)),
],
vec![Arc::clone(&ragged_matrix_field), Arc::clone(&ragged_matrix_field)],
&[None, None],
1,
)
.expect("variable tensor batched dot f32");
}
#[test]
fn sparse_udfs_cover_float32_branches() {
let (ragged_vector_field, ragged_vectors) =
ragged_vectors_f32("vectors", vec![vec![1.0, 2.0, 3.0], vec![3.0, 4.0]]);
let (dense_field, dense_matrices) = ragged_matrices_f32("dense", vec![
vec![vec![1.0, 0.0], vec![0.0, 1.0], vec![1.0, 1.0]],
vec![vec![1.0, 2.0], vec![3.0, 4.0]],
]);
let (sparse_field, sparse) = sparse_batch_f32("sparse");
let (sparse_rhs_field, sparse_rhs) = sparse_batch_rhs_f32("sparse_rhs");
let (sparse_solve_field, sparse_solve) = ndarrow::csr_batch_to_extension_array(
"sparse_solve",
vec![[2, 2]],
vec![vec![0, 2, 4]],
vec![vec![0, 1, 0, 1]],
vec![vec![4.0_f32, 1.0, 1.0, 3.0]],
)
.map(|(field, array)| (Arc::new(field), array))
.expect("sparse solve batch");
let (sparse_solve_rhs_field, sparse_solve_rhs) =
ragged_vectors_f32("rhs", vec![vec![1.0, 2.0]]);
for udf in [
udfs::sparse_matvec_udf(),
udfs::sparse_lu_solve_udf(),
udfs::sparse_matmat_dense_udf(),
udfs::sparse_transpose_udf(),
udfs::sparse_matmat_sparse_udf(),
] {
let args = if udf.name() == "sparse_matvec" {
vec![
ColumnarValue::Array(Arc::new(sparse.clone())),
ColumnarValue::Array(Arc::new(ragged_vectors.clone())),
]
} else if udf.name() == "sparse_matmat_dense" {
vec![
ColumnarValue::Array(Arc::new(sparse.clone())),
ColumnarValue::Array(Arc::new(dense_matrices.clone())),
]
} else if udf.name() == "sparse_lu_solve" {
vec![
ColumnarValue::Array(Arc::new(sparse_solve.clone())),
ColumnarValue::Array(Arc::new(sparse_solve_rhs.clone())),
]
} else if udf.name() == "sparse_transpose" {
vec![ColumnarValue::Array(Arc::new(sparse.clone()))]
} else {
vec![
ColumnarValue::Array(Arc::new(sparse.clone())),
ColumnarValue::Array(Arc::new(sparse_rhs.clone())),
]
};
let fields = if udf.name() == "sparse_matvec" {
vec![Arc::clone(&sparse_field), Arc::clone(&ragged_vector_field)]
} else if udf.name() == "sparse_lu_solve" {
vec![Arc::clone(&sparse_solve_field), Arc::clone(&sparse_solve_rhs_field)]
} else if udf.name() == "sparse_matmat_dense" {
vec![Arc::clone(&sparse_field), Arc::clone(&dense_field)]
} else if udf.name() == "sparse_transpose" {
vec![Arc::clone(&sparse_field)]
} else {
vec![Arc::clone(&sparse_field), Arc::clone(&sparse_rhs_field)]
};
let (_field, _output) = invoke_udf(&udf, args, fields, &[None, None], 2)
.unwrap_or_else(|error| panic!("{} f32: {error}", udf.name()));
}
}
#[test]
fn ml_udfs_cover_float32_branches() {
let (matrix_field, matrices) =
matrix_batch_f32("design", [[[1.0, 0.0], [0.0, 1.0], [1.0, 1.0]]]);
let response_field =
vector_field("response", &DataType::Float32, 3, false).expect("response field");
let responses = fixed_size_list_f32([[1.0, 2.0, 3.0]]);
let bool_field = Arc::new(Field::new("add_intercept", DataType::Boolean, false));
for udf in [
udfs::matrix_column_means_udf(),
udfs::matrix_center_columns_udf(),
udfs::matrix_covariance_udf(),
udfs::matrix_correlation_udf(),
udfs::matrix_pca_udf(),
] {
let (_field, output) = invoke_udf(
&udf,
vec![ColumnarValue::Array(Arc::new(matrices.clone()))],
vec![Arc::clone(&matrix_field)],
&[None],
1,
)
.unwrap_or_else(|error| panic!("{} f32: {error}", udf.name()));
match output {
ColumnarValue::Array(_) => {}
ColumnarValue::Scalar(_) => panic!("expected array output"),
}
}
let (pca_field, pca_output) = invoke_udf(
&udfs::matrix_pca_udf(),
vec![ColumnarValue::Array(Arc::new(matrices.clone()))],
vec![Arc::clone(&matrix_field)],
&[None],
1,
)
.expect("matrix_pca f32");
let pca_array = struct_array(&pca_output).clone();
let (projected_field, projected_output) = invoke_udf(
&udfs::matrix_pca_transform_udf(),
vec![
ColumnarValue::Array(Arc::new(matrices.clone())),
ColumnarValue::Array(Arc::new(pca_array.clone())),
],
vec![Arc::clone(&matrix_field), Arc::clone(&pca_field)],
&[None, None],
1,
)
.expect("matrix_pca_transform f32");
assert_eq!(array_data_type(&projected_output), projected_field.data_type());
let (_inverse_field, inverse_output) = invoke_udf(
&udfs::matrix_pca_inverse_transform_udf(),
vec![projected_output, ColumnarValue::Array(Arc::new(pca_array))],
vec![projected_field, pca_field],
&[None, None],
1,
)
.expect("matrix_pca_inverse_transform f32");
match inverse_output {
ColumnarValue::Array(_) => {}
ColumnarValue::Scalar(_) => panic!("expected array output"),
}
let (regression_field, regression_output) = invoke_udf(
&udfs::linear_regression_udf(),
vec![
ColumnarValue::Array(Arc::new(matrices)),
ColumnarValue::Array(Arc::new(responses)),
ColumnarValue::Scalar(ScalarValue::Boolean(Some(true))),
],
vec![matrix_field, response_field, bool_field],
&[None, None, Some(ScalarValue::Boolean(Some(true)))],
1,
)
.expect("linear_regression f32");
assert_eq!(array_data_type(®ression_output), regression_field.data_type());
}
struct DifferentiationFixture {
function_field: FieldRef,
step_field: FieldRef,
tolerance_field: FieldRef,
iterations_field: FieldRef,
vector_field_f64: FieldRef,
vector_field_f32: FieldRef,
vectors: FixedSizeListArray,
vectors_f32: FixedSizeListArray,
square: ScalarValue,
sum_squares: ScalarValue,
step: ScalarValue,
step_f32: ScalarValue,
tolerance: ScalarValue,
iterations: ScalarValue,
}
fn differentiation_fixture() -> DifferentiationFixture {
DifferentiationFixture {
function_field: Arc::new(Field::new("function", DataType::Utf8, false)),
step_field: Arc::new(Field::new("step_size", DataType::Float64, false)),
tolerance_field: Arc::new(Field::new("tolerance", DataType::Float64, false)),
iterations_field: Arc::new(Field::new("max_iterations", DataType::Int64, false)),
vector_field_f64: vector_field("vector", &DataType::Float64, 2, false).expect("field"),
vector_field_f32: vector_field("vector_f32", &DataType::Float32, 2, false).expect("field"),
vectors: fixed_size_list([[2.0, 3.0]]),
vectors_f32: fixed_size_list_f32([[1.0_f32, 2.0]]),
square: ScalarValue::Utf8(Some("square".to_string())),
sum_squares: ScalarValue::Utf8(Some("sum_squares".to_string())),
step: ScalarValue::Float64(Some(1.0e-6)),
step_f32: ScalarValue::Float64(Some(1.0e-3)),
tolerance: ScalarValue::Float64(Some(1.0e-8)),
iterations: ScalarValue::Int64(Some(32)),
}
}
#[test]
fn differentiation_udfs_produce_jacobians() {
let fixture = differentiation_fixture();
let (jacobian_field, jacobian_output) = invoke_udf(
&udfs::jacobian_udf(),
vec![
ColumnarValue::Scalar(fixture.square.clone()),
ColumnarValue::Array(Arc::new(fixture.vectors.clone())),
ColumnarValue::Scalar(fixture.step.clone()),
ColumnarValue::Scalar(fixture.tolerance.clone()),
ColumnarValue::Scalar(fixture.iterations.clone()),
],
vec![
Arc::clone(&fixture.function_field),
Arc::clone(&fixture.vector_field_f64),
Arc::clone(&fixture.step_field),
Arc::clone(&fixture.tolerance_field),
Arc::clone(&fixture.iterations_field),
],
&[
Some(fixture.square.clone()),
None,
Some(fixture.step.clone()),
Some(fixture.tolerance.clone()),
Some(fixture.iterations.clone()),
],
1,
)
.expect("jacobian");
let jacobian = fixed_shape_viewd(&jacobian_field, &jacobian_output)
.into_dimensionality::<Ix3>()
.expect("rank-3 jacobian");
assert!((jacobian[[0, 0, 0]] - 4.0).abs() < 1.0e-3);
assert!((jacobian[[0, 1, 1]] - 6.0).abs() < 1.0e-3);
assert!(jacobian[[0, 0, 1]].abs() < 1.0e-3);
assert!(jacobian[[0, 1, 0]].abs() < 1.0e-3);
let (central_field, central_output) = invoke_udf(
&udfs::jacobian_central_udf(),
vec![
ColumnarValue::Scalar(fixture.square.clone()),
ColumnarValue::Array(Arc::new(fixture.vectors)),
ColumnarValue::Scalar(fixture.step.clone()),
],
vec![fixture.function_field, fixture.vector_field_f64, fixture.step_field],
&[Some(fixture.square), None, Some(fixture.step)],
1,
)
.expect("jacobian_central");
let central = fixed_shape_viewd(¢ral_field, ¢ral_output)
.into_dimensionality::<Ix3>()
.expect("rank-3 central jacobian");
assert!((central[[0, 0, 0]] - 4.0).abs() < 1.0e-5);
assert!((central[[0, 1, 1]] - 6.0).abs() < 1.0e-5);
}
#[test]
fn differentiation_udfs_produce_gradient_and_hessian() {
let fixture = differentiation_fixture();
let (gradient_field, gradient_output) = invoke_udf(
&udfs::gradient_udf(),
vec![
ColumnarValue::Scalar(fixture.sum_squares.clone()),
ColumnarValue::Array(Arc::new(fixture.vectors.clone())),
ColumnarValue::Scalar(fixture.step.clone()),
],
vec![
Arc::clone(&fixture.function_field),
Arc::clone(&fixture.vector_field_f64),
Arc::clone(&fixture.step_field),
],
&[Some(fixture.sum_squares.clone()), None, Some(fixture.step.clone())],
1,
)
.expect("gradient");
let gradient =
ndarrow::fixed_size_list_as_array2::<Float64Type>(fixed_size_list_array(&gradient_output))
.expect("gradient");
assert!(matches!(gradient_field.data_type(), DataType::FixedSizeList(_, _)));
assert_close_eps(gradient[[0, 0]], 4.0, 1.0e-5);
assert_close_eps(gradient[[0, 1]], 6.0, 1.0e-5);
let (hessian_field, hessian_output) = invoke_udf(
&udfs::hessian_udf(),
vec![
ColumnarValue::Scalar(fixture.sum_squares.clone()),
ColumnarValue::Array(Arc::new(fixture.vectors)),
ColumnarValue::Scalar(fixture.step.clone()),
],
vec![fixture.function_field, fixture.vector_field_f64, fixture.step_field],
&[Some(fixture.sum_squares), None, Some(fixture.step)],
1,
)
.expect("hessian");
let hessian = fixed_shape_viewd(&hessian_field, &hessian_output)
.into_dimensionality::<Ix3>()
.expect("rank-3 hessian");
assert_close_eps(hessian[[0, 0, 0]], 2.0, 1.0e-3);
assert_close_eps(hessian[[0, 1, 1]], 2.0, 1.0e-3);
assert_close_eps(hessian[[0, 0, 1]], 0.0, 1.0e-3);
assert_close_eps(hessian[[0, 1, 0]], 0.0, 1.0e-3);
}
#[test]
fn differentiation_udfs_support_float32_gradients() {
let fixture = differentiation_fixture();
let (gradient_field, gradient_output) = invoke_udf(
&udfs::gradient_udf(),
vec![
ColumnarValue::Scalar(fixture.sum_squares.clone()),
ColumnarValue::Array(Arc::new(fixture.vectors_f32)),
ColumnarValue::Scalar(fixture.step_f32.clone()),
],
vec![fixture.function_field, fixture.vector_field_f32, fixture.step_field],
&[Some(fixture.sum_squares), None, Some(fixture.step_f32)],
1,
)
.expect("gradient f32");
assert_eq!(array_data_type(&gradient_output), gradient_field.data_type());
let gradient =
ndarrow::fixed_size_list_as_array2::<Float32Type>(fixed_size_list_array(&gradient_output))
.expect("gradient f32");
assert_close_eps(f64::from(gradient[[0, 0]]), 2.0, 1.0e-2);
assert_close_eps(f64::from(gradient[[0, 1]]), 4.0, 1.0e-2);
}
#[test]
fn differentiation_udfs_reject_unknown_named_functions() {
let fixture = differentiation_fixture();
let invalid = ScalarValue::Utf8(Some("unknown".to_string()));
let error = invoke_udf(
&udfs::jacobian_udf(),
vec![
ColumnarValue::Scalar(invalid.clone()),
ColumnarValue::Array(Arc::new(fixture.vectors)),
],
vec![fixture.function_field, fixture.vector_field_f64],
&[Some(invalid), None],
1,
)
.expect_err("unsupported named function should fail");
assert!(error.to_string().contains("unsupported named vector function"));
}
struct OptimizationFixture {
function_field: FieldRef,
learning_rate_field: FieldRef,
iterations_field: FieldRef,
tolerance_field: FieldRef,
contraction_field: FieldRef,
sufficient_decrease_field: FieldRef,
point_step_field: FieldRef,
objective: ScalarValue,
learning_rate: ScalarValue,
iterations: ScalarValue,
tolerance: ScalarValue,
point_field: FieldRef,
points: FixedSizeListArray,
direction_field: FieldRef,
directions: FixedSizeListArray,
initial_field: FieldRef,
initials: FixedSizeListArray,
initial_norm: f64,
}
fn optimization_fixture() -> OptimizationFixture {
OptimizationFixture {
function_field: Arc::new(Field::new("function", DataType::Utf8, false)),
learning_rate_field: Arc::new(Field::new("learning_rate", DataType::Float64, false)),
iterations_field: Arc::new(Field::new("max_iterations", DataType::Int64, false)),
tolerance_field: Arc::new(Field::new("tolerance", DataType::Float64, false)),
contraction_field: Arc::new(Field::new("contraction", DataType::Float64, false)),
sufficient_decrease_field: Arc::new(Field::new(
"sufficient_decrease",
DataType::Float64,
false,
)),
point_step_field: Arc::new(Field::new("initial_step", DataType::Float64, false)),
objective: ScalarValue::Utf8(Some("norm_squared".to_string())),
learning_rate: ScalarValue::Float64(Some(0.25)),
iterations: ScalarValue::Int64(Some(256)),
tolerance: ScalarValue::Float64(Some(1.0e-6)),
point_field: complex_vector_batch("point", [[
Complex64::new(1.0, 0.0),
Complex64::new(0.0, 0.0),
]])
.0,
points: complex_vector_batch("point", [[
Complex64::new(1.0, 0.0),
Complex64::new(0.0, 0.0),
]])
.1,
direction_field: complex_vector_batch("direction", [[
Complex64::new(-1.0, 0.0),
Complex64::new(0.0, 0.0),
]])
.0,
directions: complex_vector_batch("direction", [[
Complex64::new(-1.0, 0.0),
Complex64::new(0.0, 0.0),
]])
.1,
initial_field: complex_vector_batch("initial", [[
Complex64::new(1.0, 1.0),
Complex64::new(-1.0, 0.0),
]])
.0,
initials: complex_vector_batch("initial", [[
Complex64::new(1.0, 1.0),
Complex64::new(-1.0, 0.0),
]])
.1,
initial_norm: [Complex64::new(1.0, 1.0), Complex64::new(-1.0, 0.0)]
.into_iter()
.map(|value| value.norm_sqr())
.sum::<f64>(),
}
}
#[test]
fn optimization_udfs_backtracking_line_search_returns_positive_step() {
let fixture = optimization_fixture();
let (line_search_field, line_search_output) = invoke_udf(
&udfs::backtracking_line_search_complex_udf(),
vec![
ColumnarValue::Scalar(fixture.objective.clone()),
ColumnarValue::Array(Arc::new(fixture.points)),
ColumnarValue::Array(Arc::new(fixture.directions)),
ColumnarValue::Scalar(ScalarValue::Float64(Some(1.0))),
ColumnarValue::Scalar(ScalarValue::Float64(Some(0.5))),
ColumnarValue::Scalar(ScalarValue::Float64(Some(1.0e-4))),
ColumnarValue::Scalar(ScalarValue::Int64(Some(16))),
],
vec![
fixture.function_field,
fixture.point_field,
fixture.direction_field,
fixture.point_step_field,
fixture.contraction_field,
fixture.sufficient_decrease_field,
fixture.iterations_field,
],
&[
Some(fixture.objective),
None,
None,
Some(ScalarValue::Float64(Some(1.0))),
Some(ScalarValue::Float64(Some(0.5))),
Some(ScalarValue::Float64(Some(1.0e-4))),
Some(ScalarValue::Int64(Some(16))),
],
1,
)
.expect("backtracking line search");
assert_eq!(line_search_field.data_type(), &DataType::Float64);
let alpha = f64_array(&line_search_output).value(0);
assert!(alpha.is_finite());
assert!(alpha > 0.0);
}
#[test]
fn optimization_udfs_gradient_descent_reduces_objective() {
let fixture = optimization_fixture();
let (field, output) = invoke_udf(
&udfs::gradient_descent_complex_udf(),
vec![
ColumnarValue::Scalar(fixture.objective.clone()),
ColumnarValue::Array(Arc::new(fixture.initials)),
ColumnarValue::Scalar(fixture.learning_rate.clone()),
ColumnarValue::Scalar(fixture.iterations.clone()),
ColumnarValue::Scalar(fixture.tolerance.clone()),
],
vec![
fixture.function_field,
fixture.initial_field,
fixture.learning_rate_field,
fixture.iterations_field,
fixture.tolerance_field,
],
&[
Some(fixture.objective),
None,
Some(fixture.learning_rate),
Some(fixture.iterations),
Some(fixture.tolerance),
],
1,
)
.expect("gradient_descent_complex");
assert_eq!(array_data_type(&output), field.data_type());
let result = ndarrow::complex64_as_array_view2(fixed_size_list_array(&output)).expect("gd");
let norm = result.row(0).iter().copied().map(|value| value.norm_sqr()).sum::<f64>();
assert!(norm < fixture.initial_norm);
}
#[test]
fn optimization_udfs_adam_and_momentum_reduce_objective() {
let fixture = optimization_fixture();
let (adam_field, adam_output) = invoke_udf(
&udfs::adam_complex_udf(),
vec![
ColumnarValue::Scalar(fixture.objective.clone()),
ColumnarValue::Array(Arc::new(fixture.initials.clone())),
],
vec![Arc::clone(&fixture.function_field), Arc::clone(&fixture.initial_field)],
&[Some(fixture.objective.clone()), None],
1,
)
.expect("adam_complex");
assert_eq!(array_data_type(&adam_output), adam_field.data_type());
let adam =
ndarrow::complex64_as_array_view2(fixed_size_list_array(&adam_output)).expect("adam");
let adam_norm = adam.row(0).iter().copied().map(|value| value.norm_sqr()).sum::<f64>();
assert!(adam_norm < fixture.initial_norm);
let (momentum_field, momentum_output) = invoke_udf(
&udfs::momentum_descent_complex_udf(),
vec![
ColumnarValue::Scalar(fixture.objective),
ColumnarValue::Array(Arc::new(fixture.initials)),
ColumnarValue::Scalar(fixture.learning_rate),
ColumnarValue::Scalar(ScalarValue::Float64(Some(0.8))),
ColumnarValue::Scalar(fixture.iterations),
ColumnarValue::Scalar(fixture.tolerance),
],
vec![
fixture.function_field,
fixture.initial_field,
fixture.learning_rate_field,
Arc::new(Field::new("momentum", DataType::Float64, false)),
fixture.iterations_field,
fixture.tolerance_field,
],
&[
Some(ScalarValue::Utf8(Some("norm_squared".to_string()))),
None,
Some(ScalarValue::Float64(Some(0.25))),
Some(ScalarValue::Float64(Some(0.8))),
Some(ScalarValue::Int64(Some(256))),
Some(ScalarValue::Float64(Some(1.0e-6))),
],
1,
)
.expect("momentum_descent_complex");
assert_eq!(array_data_type(&momentum_output), momentum_field.data_type());
let momentum = ndarrow::complex64_as_array_view2(fixed_size_list_array(&momentum_output))
.expect("momentum");
let momentum_norm = momentum.row(0).iter().copied().map(|value| value.norm_sqr()).sum::<f64>();
assert!(momentum_norm < fixture.initial_norm);
}
#[test]
fn optimization_udfs_reject_invalid_learning_rate() {
let objective = ScalarValue::Utf8(Some("norm_squared".to_string()));
let error = invoke_udf(
&udfs::gradient_descent_complex_udf(),
vec![
ColumnarValue::Scalar(objective),
ColumnarValue::Array(Arc::new(
complex_vector_batch("bad_initial", [[
Complex64::new(1.0, 0.0),
Complex64::new(0.0, 0.0),
]])
.1,
)),
ColumnarValue::Scalar(ScalarValue::Float64(Some(-1.0))),
],
vec![
Arc::new(Field::new("function", DataType::Utf8, false)),
complex_vector_field("bad_initial", 2, false).expect("complex vector field"),
Arc::new(Field::new("learning_rate", DataType::Float64, false)),
],
&[
Some(ScalarValue::Utf8(Some("norm_squared".to_string()))),
None,
Some(ScalarValue::Float64(Some(-1.0))),
],
1,
)
.expect_err("invalid learning-rate should fail");
assert!(error.to_string().contains("invalid gradient-descent configuration"));
}
struct SparseFactorizationFixture {
sparse_field: FieldRef,
sparse: StructArray,
rhs_field: FieldRef,
rhs: StructArray,
rhs_matrix_field: FieldRef,
rhs_matrices: StructArray,
}
fn sparse_factorization_fixture() -> SparseFactorizationFixture {
let (sparse_field, sparse) = ndarrow::csr_batch_to_extension_array(
"sparse_factorization",
vec![[2, 2], [2, 2]],
vec![vec![0, 2, 4], vec![0, 1, 2]],
vec![vec![0, 1, 0, 1], vec![0, 1]],
vec![vec![4.0, 1.0, 1.0, 3.0], vec![2.0, 5.0]],
)
.map(|(field, array)| (Arc::new(field), array))
.expect("sparse factorization batch");
let (rhs_field, rhs) = ragged_vectors("rhs", vec![vec![1.0, 2.0], vec![4.0, 10.0]]);
let (rhs_matrix_field, rhs_matrices) = ragged_matrices("rhs_matrices", vec![
vec![vec![1.0, 0.0], vec![0.0, 1.0]],
vec![vec![1.0, 0.0], vec![0.0, 1.0]],
]);
SparseFactorizationFixture {
sparse_field,
sparse,
rhs_field,
rhs,
rhs_matrix_field,
rhs_matrices,
}
}
fn sparse_factorization_fixture_f32() -> SparseFactorizationFixture {
let (sparse_field, sparse) = ndarrow::csr_batch_to_extension_array(
"sparse_factorization_f32",
vec![[2, 2], [2, 2]],
vec![vec![0, 2, 4], vec![0, 1, 2]],
vec![vec![0, 1, 0, 1], vec![0, 1]],
vec![vec![4.0_f32, 1.0, 1.0, 3.0], vec![2.0_f32, 5.0]],
)
.map(|(field, array)| (Arc::new(field), array))
.expect("sparse factorization batch");
let (rhs_field, rhs) = ragged_vectors_f32("rhs_f32", vec![vec![1.0, 2.0], vec![4.0, 10.0]]);
let (rhs_matrix_field, rhs_matrices) = ragged_matrices_f32("rhs_matrices_f32", vec![
vec![vec![1.0, 0.0], vec![0.0, 1.0]],
vec![vec![1.0, 0.0], vec![0.0, 1.0]],
]);
SparseFactorizationFixture {
sparse_field,
sparse,
rhs_field,
rhs,
rhs_matrix_field,
rhs_matrices,
}
}
#[test]
fn sparse_lu_factorization_udfs_solve_vectors_and_matrices() {
let fixture = sparse_factorization_fixture();
let (factor_field, factor_output) = invoke_udf(
&udfs::sparse_lu_factor_udf(),
vec![ColumnarValue::Array(Arc::new(fixture.sparse.clone()))],
vec![Arc::clone(&fixture.sparse_field)],
&[None],
2,
)
.expect("sparse_lu_factor");
let factorization = struct_array(&factor_output).clone();
assert_eq!(factorization.num_columns(), 3);
let (solve_field, solve_output) = invoke_udf(
&udfs::sparse_lu_solve_with_factorization_udf(),
vec![
ColumnarValue::Array(Arc::new(fixture.sparse.clone())),
ColumnarValue::Array(Arc::new(fixture.rhs.clone())),
ColumnarValue::Array(Arc::new(factorization.clone())),
],
vec![
Arc::clone(&fixture.sparse_field),
Arc::clone(&fixture.rhs_field),
Arc::clone(&factor_field),
],
&[None, None, None],
2,
)
.expect("sparse_lu_solve_with_factorization");
let mut solve_rows = variable_shape_rows(&solve_field, &solve_output);
let (_, first) = solve_rows.next().expect("row 0").expect("row 0 vector");
let (_, second) = solve_rows.next().expect("row 1").expect("row 1 vector");
let first = first.into_dimensionality::<Ix1>().expect("rank-1 vector");
let second = second.into_dimensionality::<Ix1>().expect("rank-1 vector");
assert_close(first[[0]], 1.0 / 11.0);
assert_close(first[[1]], 7.0 / 11.0);
assert_close(second[[0]], 2.0);
assert_close(second[[1]], 2.0);
let (field, output) = invoke_udf(
&udfs::sparse_lu_solve_multiple_with_factorization_udf(),
vec![
ColumnarValue::Array(Arc::new(fixture.sparse)),
ColumnarValue::Array(Arc::new(fixture.rhs_matrices)),
ColumnarValue::Array(Arc::new(factorization)),
],
vec![fixture.sparse_field, fixture.rhs_matrix_field, factor_field],
&[None, None, None],
2,
)
.expect("sparse_lu_solve_multiple_with_factorization");
let mut rows = variable_shape_rows(&field, &output);
let (_, first) = rows.next().expect("matrix row 0").expect("matrix row 0");
let (_, second) = rows.next().expect("matrix row 1").expect("matrix row 1");
let first = first.into_dimensionality::<Ix2>().expect("rank-2 matrix");
let second = second.into_dimensionality::<Ix2>().expect("rank-2 matrix");
assert_close(first[[0, 0]], 3.0 / 11.0);
assert_close(first[[0, 1]], -1.0 / 11.0);
assert_close(first[[1, 0]], -1.0 / 11.0);
assert_close(first[[1, 1]], 4.0 / 11.0);
assert_close(second[[0, 0]], 0.5);
assert_close(second[[1, 1]], 0.2);
}
#[test]
fn sparse_jacobi_preconditioner_udfs_apply_expected_scaling() {
let fixture = sparse_factorization_fixture();
let (jacobi_field, jacobi_output) = invoke_udf(
&udfs::sparse_jacobi_preconditioner_udf(),
vec![ColumnarValue::Array(Arc::new(fixture.sparse))],
vec![fixture.sparse_field],
&[None],
2,
)
.expect("sparse_jacobi_preconditioner");
let (field, output) = invoke_udf(
&udfs::sparse_apply_jacobi_preconditioner_udf(),
vec![
ColumnarValue::Array(Arc::new(struct_array(&jacobi_output).clone())),
ColumnarValue::Array(Arc::new(fixture.rhs)),
],
vec![jacobi_field, fixture.rhs_field],
&[None, None],
2,
)
.expect("sparse_apply_jacobi_preconditioner");
let mut rows = variable_shape_rows(&field, &output);
let (_, first) = rows.next().expect("jacobi row 0").expect("jacobi row 0");
let (_, second) = rows.next().expect("jacobi row 1").expect("jacobi row 1");
let first = first.into_dimensionality::<Ix1>().expect("rank-1 vector");
let second = second.into_dimensionality::<Ix1>().expect("rank-1 vector");
assert_close(first[[0]], 0.25);
assert_close(first[[1]], 2.0 / 3.0);
assert_close(second[[0]], 2.0);
assert_close(second[[1]], 2.0);
}
#[test]
fn sparse_ilut_preconditioner_udfs_produce_finite_rows() {
let fixture = sparse_factorization_fixture();
let (factor_field, factor_output) = invoke_udf(
&udfs::sparse_ilut_factor_udf(),
vec![
ColumnarValue::Array(Arc::new(fixture.sparse)),
ColumnarValue::Scalar(ScalarValue::Float64(Some(1.0e-8))),
ColumnarValue::Scalar(ScalarValue::Int64(Some(8))),
],
vec![
fixture.sparse_field,
Arc::new(Field::new("drop_tolerance", DataType::Float64, false)),
Arc::new(Field::new("max_fill", DataType::Int64, false)),
],
&[None, Some(ScalarValue::Float64(Some(1.0e-8))), Some(ScalarValue::Int64(Some(8)))],
2,
)
.expect("sparse_ilut_factor");
let (field, output) = invoke_udf(
&udfs::sparse_apply_ilut_preconditioner_udf(),
vec![
ColumnarValue::Array(Arc::new(struct_array(&factor_output).clone())),
ColumnarValue::Array(Arc::new(fixture.rhs)),
],
vec![factor_field, fixture.rhs_field],
&[None, None],
2,
)
.expect("sparse_apply_ilut_preconditioner");
let mut rows = variable_shape_rows(&field, &output);
for index in 0..2 {
let (_, row) = rows.next().expect("ilut row").expect("ilut row value");
let row = row.into_dimensionality::<Ix1>().expect("rank-1 vector");
assert_eq!(row.len(), 2, "unexpected ILUT row width at index {index}");
assert!(row.iter().all(|value| value.is_finite()));
}
}
fn assert_finite_variable_rank1_rows_f32(
field: &FieldRef,
output: &ColumnarValue,
expected_rows: usize,
context: &str,
) {
let mut rows = variable_shape_rows_f32(field, output);
for index in 0..expected_rows {
let (_, row) = rows.next().expect(context).expect(context);
let row = row.into_dimensionality::<Ix1>().expect("rank-1 vector");
assert_eq!(row.len(), 2, "unexpected {context} width at index {index}");
assert!(row.iter().all(|value| value.is_finite()));
}
}
fn assert_finite_variable_rank2_rows_f32(
field: &FieldRef,
output: &ColumnarValue,
expected_rows: usize,
expected_shape: &[usize],
context: &str,
) {
let mut rows = variable_shape_rows_f32(field, output);
for index in 0..expected_rows {
let (_, row) = rows.next().expect(context).expect(context);
let row = row.into_dimensionality::<Ix2>().expect("rank-2 matrix");
assert_eq!(row.shape(), expected_shape, "unexpected {context} shape at index {index}");
assert!(row.iter().all(|value| value.is_finite()));
}
}
#[test]
fn sparse_factorization_float32_lu_and_jacobi_paths() {
let fixture = sparse_factorization_fixture_f32();
let (factor_field, factor_output) = invoke_udf(
&udfs::sparse_lu_factor_udf(),
vec![ColumnarValue::Array(Arc::new(fixture.sparse.clone()))],
vec![Arc::clone(&fixture.sparse_field)],
&[None],
2,
)
.expect("sparse_lu_factor");
let factorization = struct_array(&factor_output).clone();
assert_eq!(factorization.num_columns(), 3);
let (solve_field, solve_output) = invoke_udf(
&udfs::sparse_lu_solve_with_factorization_udf(),
vec![
ColumnarValue::Array(Arc::new(fixture.sparse.clone())),
ColumnarValue::Array(Arc::new(fixture.rhs.clone())),
ColumnarValue::Array(Arc::new(factorization.clone())),
],
vec![
Arc::clone(&fixture.sparse_field),
Arc::clone(&fixture.rhs_field),
Arc::clone(&factor_field),
],
&[None, None, None],
2,
)
.expect("sparse_lu_solve_with_factorization");
assert_finite_variable_rank1_rows_f32(&solve_field, &solve_output, 2, "solve row");
let (multiple_field, multiple_output) = invoke_udf(
&udfs::sparse_lu_solve_multiple_with_factorization_udf(),
vec![
ColumnarValue::Array(Arc::new(fixture.sparse.clone())),
ColumnarValue::Array(Arc::new(fixture.rhs_matrices.clone())),
ColumnarValue::Array(Arc::new(factorization.clone())),
],
vec![
Arc::clone(&fixture.sparse_field),
Arc::clone(&fixture.rhs_matrix_field),
Arc::clone(&factor_field),
],
&[None, None, None],
2,
)
.expect("sparse_lu_solve_multiple_with_factorization");
assert_finite_variable_rank2_rows_f32(
&multiple_field,
&multiple_output,
2,
&[2, 2],
"solve matrix row",
);
let (jacobi_field, jacobi_output) = invoke_udf(
&udfs::sparse_jacobi_preconditioner_udf(),
vec![ColumnarValue::Array(Arc::new(fixture.sparse.clone()))],
vec![Arc::clone(&fixture.sparse_field)],
&[None],
2,
)
.expect("sparse_jacobi_preconditioner");
let (jacobi_apply_field, jacobi_apply_output) = invoke_udf(
&udfs::sparse_apply_jacobi_preconditioner_udf(),
vec![
ColumnarValue::Array(Arc::new(struct_array(&jacobi_output).clone())),
ColumnarValue::Array(Arc::new(fixture.rhs.clone())),
],
vec![jacobi_field, Arc::clone(&fixture.rhs_field)],
&[None, None],
2,
)
.expect("sparse_apply_jacobi_preconditioner");
assert_finite_variable_rank1_rows_f32(
&jacobi_apply_field,
&jacobi_apply_output,
2,
"jacobi row",
);
}
#[test]
fn sparse_factorization_float32_ilu_preconditioner_paths() {
let fixture = sparse_factorization_fixture_f32();
let (threshold_factor_field, threshold_factor_output) = invoke_udf(
&udfs::sparse_ilut_factor_udf(),
vec![
ColumnarValue::Array(Arc::new(fixture.sparse.clone())),
ColumnarValue::Scalar(ScalarValue::Float64(Some(1.0e-8))),
ColumnarValue::Scalar(ScalarValue::Int64(Some(8))),
],
vec![
Arc::clone(&fixture.sparse_field),
Arc::new(Field::new("drop_tolerance", DataType::Float64, false)),
Arc::new(Field::new("max_fill", DataType::Int64, false)),
],
&[None, Some(ScalarValue::Float64(Some(1.0e-8))), Some(ScalarValue::Int64(Some(8)))],
2,
)
.expect("sparse_ilut_factor");
let (threshold_apply_field, threshold_apply_output) = invoke_udf(
&udfs::sparse_apply_ilut_preconditioner_udf(),
vec![
ColumnarValue::Array(Arc::new(struct_array(&threshold_factor_output).clone())),
ColumnarValue::Array(Arc::new(fixture.rhs.clone())),
],
vec![threshold_factor_field, Arc::clone(&fixture.rhs_field)],
&[None, None],
2,
)
.expect("sparse_apply_ilut_preconditioner");
assert_finite_variable_rank1_rows_f32(
&threshold_apply_field,
&threshold_apply_output,
2,
"ilut row",
);
let (level_factor_field, level_factor_output) = invoke_udf(
&udfs::sparse_iluk_factor_udf(),
vec![
ColumnarValue::Array(Arc::new(fixture.sparse)),
ColumnarValue::Scalar(ScalarValue::Int64(Some(1))),
],
vec![fixture.sparse_field, Arc::new(Field::new("level_of_fill", DataType::Int64, false))],
&[None, Some(ScalarValue::Int64(Some(1)))],
2,
)
.expect("sparse_iluk_factor");
let (level_apply_field, level_apply_output) = invoke_udf(
&udfs::sparse_apply_iluk_preconditioner_udf(),
vec![
ColumnarValue::Array(Arc::new(struct_array(&level_factor_output).clone())),
ColumnarValue::Array(Arc::new(fixture.rhs)),
],
vec![level_factor_field, fixture.rhs_field],
&[None, None],
2,
)
.expect("sparse_apply_iluk_preconditioner");
assert_finite_variable_rank1_rows_f32(&level_apply_field, &level_apply_output, 2, "iluk row");
}
#[test]
fn sparse_iluk_preconditioner_udfs_produce_finite_rows() {
let fixture = sparse_factorization_fixture();
let (factor_field, factor_output) = invoke_udf(
&udfs::sparse_iluk_factor_udf(),
vec![
ColumnarValue::Array(Arc::new(fixture.sparse)),
ColumnarValue::Scalar(ScalarValue::Int64(Some(1))),
],
vec![fixture.sparse_field, Arc::new(Field::new("level_of_fill", DataType::Int64, false))],
&[None, Some(ScalarValue::Int64(Some(1)))],
2,
)
.expect("sparse_iluk_factor");
let (field, output) = invoke_udf(
&udfs::sparse_apply_iluk_preconditioner_udf(),
vec![
ColumnarValue::Array(Arc::new(struct_array(&factor_output).clone())),
ColumnarValue::Array(Arc::new(fixture.rhs)),
],
vec![factor_field, fixture.rhs_field],
&[None, None],
2,
)
.expect("sparse_apply_iluk_preconditioner");
let mut rows = variable_shape_rows(&field, &output);
for index in 0..2 {
let (_, row) = rows.next().expect("iluk row").expect("iluk row value");
let row = row.into_dimensionality::<Ix1>().expect("rank-1 vector");
assert_eq!(row.len(), 2, "unexpected ILU(k) row width at index {index}");
assert!(row.iter().all(|value| value.is_finite()));
}
}
struct TensorDecompositionFixture {
eps: f64,
tensor_field: FieldRef,
tensor: FixedSizeListArray,
original: Array4<f64>,
rank_field: FieldRef,
max_iterations_field: FieldRef,
tolerance_field: FieldRef,
rank_list_field: FieldRef,
rank: ScalarValue,
max_iterations: ScalarValue,
tolerance: ScalarValue,
ranks: ScalarValue,
}
fn tensor_decomposition_fixture() -> TensorDecompositionFixture {
let original =
Array4::from_shape_vec((1, 2, 2, 2), vec![1.0, 3.0, 0.5, 1.5, 2.0, 6.0, 1.0, 3.0])
.expect("tensor shape");
let (tensor_field, tensor) =
ndarrow::arrayd_to_fixed_shape_tensor("tensor", original.clone().into_dyn())
.map(|(field, array)| (Arc::new(field), array))
.expect("tensor batch");
TensorDecompositionFixture {
eps: 1.0e-6,
tensor_field,
tensor,
original,
rank_field: Arc::new(Field::new("rank", DataType::Int64, false)),
max_iterations_field: Arc::new(Field::new("max_iterations", DataType::Int64, false)),
tolerance_field: Arc::new(Field::new("tolerance", DataType::Float64, false)),
rank_list_field: Arc::new(Field::new(
"ranks",
DataType::new_list(DataType::Int32, false),
false,
)),
rank: ScalarValue::Int64(Some(1)),
max_iterations: ScalarValue::Int64(Some(50)),
tolerance: ScalarValue::Float64(Some(1.0e-10)),
ranks: scalar_int32_list(vec![1, 1, 1]),
}
}
fn tensor4_from_output(field: &FieldRef, output: &ColumnarValue) -> Array4<f64> {
fixed_shape_viewd(field, output)
.into_dimensionality::<Ix4>()
.expect("rank-4 tensor")
.to_owned()
}
fn assert_tensor4_matches(actual: &Array4<f64>, expected: &Array4<f64>, eps: f64) {
for i in 0..2 {
for j in 0..2 {
for k in 0..2 {
assert_close_eps(actual[[0, i, j, k]], expected[[0, i, j, k]], eps);
}
}
}
}
fn reconstruct_tt_tensor(tt_field: &FieldRef, tt_output: &ColumnarValue) -> Array4<f64> {
let (field, output) = invoke_udf(
&udfs::tensor_tt_svd_reconstruct_udf(),
vec![ColumnarValue::Array(Arc::new(struct_array(tt_output).clone()))],
vec![Arc::clone(tt_field)],
&[None],
1,
)
.expect("tensor_tt_svd_reconstruct");
tensor4_from_output(&field, &output)
}
fn reconstruct_tt_tensor_f32(tt_field: &FieldRef, tt_output: &ColumnarValue) -> Array4<f32> {
let (field, output) = invoke_udf(
&udfs::tensor_tt_svd_reconstruct_udf(),
vec![ColumnarValue::Array(Arc::new(struct_array(tt_output).clone()))],
vec![Arc::clone(tt_field)],
&[None],
1,
)
.expect("tensor_tt_svd_reconstruct");
fixed_shape_view4_f32(&field, &output).to_owned()
}
#[test]
fn tensor_cp_reconstruction_udfs_preserve_tensor() {
let fixture = tensor_decomposition_fixture();
let (cp3_field, cp3_output) = invoke_udf(
&udfs::tensor_cp_als3_udf(),
vec![
ColumnarValue::Array(Arc::new(fixture.tensor.clone())),
ColumnarValue::Scalar(fixture.rank.clone()),
ColumnarValue::Scalar(fixture.max_iterations.clone()),
ColumnarValue::Scalar(fixture.tolerance.clone()),
],
vec![
Arc::clone(&fixture.tensor_field),
Arc::clone(&fixture.rank_field),
Arc::clone(&fixture.max_iterations_field),
Arc::clone(&fixture.tolerance_field),
],
&[
None,
Some(fixture.rank.clone()),
Some(fixture.max_iterations.clone()),
Some(fixture.tolerance.clone()),
],
1,
)
.expect("tensor_cp_als3");
let (field, output) = invoke_udf(
&udfs::tensor_cp_als3_reconstruct_udf(),
vec![ColumnarValue::Array(Arc::new(struct_array(&cp3_output).clone()))],
vec![cp3_field],
&[None],
1,
)
.expect("tensor_cp_als3_reconstruct");
assert_tensor4_matches(&tensor4_from_output(&field, &output), &fixture.original, fixture.eps);
let (cp_nd_field, cp_nd_output) = invoke_udf(
&udfs::tensor_cp_als_nd_udf(),
vec![
ColumnarValue::Array(Arc::new(fixture.tensor)),
ColumnarValue::Scalar(fixture.rank),
ColumnarValue::Scalar(fixture.max_iterations),
ColumnarValue::Scalar(fixture.tolerance),
],
vec![
Arc::clone(&fixture.tensor_field),
fixture.rank_field,
fixture.max_iterations_field,
fixture.tolerance_field,
],
&[
None,
Some(ScalarValue::Int64(Some(1))),
Some(ScalarValue::Int64(Some(50))),
Some(ScalarValue::Float64(Some(1.0e-10))),
],
1,
)
.expect("tensor_cp_als_nd");
let (field, output) = invoke_udf(
&udfs::tensor_cp_als_nd_reconstruct_udf(),
vec![ColumnarValue::Array(Arc::new(struct_array(&cp_nd_output).clone()))],
vec![cp_nd_field],
&[None],
1,
)
.expect("tensor_cp_als_nd_reconstruct");
assert_tensor4_matches(&tensor4_from_output(&field, &output), &fixture.original, fixture.eps);
}
#[test]
fn tensor_tucker_udfs_project_and_expand_consistently() {
let fixture = tensor_decomposition_fixture();
let (hosvd_field, hosvd_output) = invoke_udf(
&udfs::tensor_hosvd_nd_udf(),
vec![
ColumnarValue::Array(Arc::new(fixture.tensor.clone())),
ColumnarValue::Scalar(fixture.ranks.clone()),
],
vec![Arc::clone(&fixture.tensor_field), Arc::clone(&fixture.rank_list_field)],
&[None, Some(fixture.ranks.clone())],
1,
)
.expect("tensor_hosvd_nd");
let (projected_field, projected_output) = invoke_udf(
&udfs::tensor_tucker_project_udf(),
vec![
ColumnarValue::Array(Arc::new(fixture.tensor.clone())),
ColumnarValue::Array(Arc::new(struct_array(&hosvd_output).clone())),
],
vec![Arc::clone(&fixture.tensor_field), Arc::clone(&hosvd_field)],
&[None, None],
1,
)
.expect("tensor_tucker_project");
let projected = tensor4_from_output(&projected_field, &projected_output);
assert_eq!(projected.shape(), &[1, 1, 1, 1]);
let (expanded_field, expanded_output) = invoke_udf(
&udfs::tensor_tucker_expand_udf(),
vec![ColumnarValue::Array(Arc::new(struct_array(&hosvd_output).clone()))],
vec![Arc::clone(&hosvd_field)],
&[None],
1,
)
.expect("tensor_tucker_expand");
assert_tensor4_matches(
&tensor4_from_output(&expanded_field, &expanded_output),
&fixture.original,
fixture.eps,
);
let (hooi_field, hooi_output) = invoke_udf(
&udfs::tensor_hooi_nd_udf(),
vec![
ColumnarValue::Array(Arc::new(fixture.tensor)),
ColumnarValue::Scalar(fixture.ranks),
ColumnarValue::Scalar(fixture.max_iterations),
ColumnarValue::Scalar(fixture.tolerance),
],
vec![
fixture.tensor_field,
fixture.rank_list_field,
fixture.max_iterations_field,
fixture.tolerance_field,
],
&[
None,
Some(scalar_int32_list(vec![1, 1, 1])),
Some(ScalarValue::Int64(Some(50))),
Some(ScalarValue::Float64(Some(1.0e-10))),
],
1,
)
.expect("tensor_hooi_nd");
let (field, output) = invoke_udf(
&udfs::tensor_tucker_expand_udf(),
vec![ColumnarValue::Array(Arc::new(struct_array(&hooi_output).clone()))],
vec![hooi_field],
&[None],
1,
)
.expect("tensor_tucker_expand hooi");
assert_tensor4_matches(&tensor4_from_output(&field, &output), &fixture.original, fixture.eps);
}
#[test]
fn tensor_tt_decomposition_udfs_preserve_tensor_and_inner_products() {
let fixture = tensor_decomposition_fixture();
let (tt_field, tt_output) = invoke_udf(
&udfs::tensor_tt_svd_udf(),
vec![
ColumnarValue::Array(Arc::new(fixture.tensor)),
ColumnarValue::Scalar(ScalarValue::Int64(Some(2))),
ColumnarValue::Scalar(fixture.tolerance.clone()),
],
vec![
Arc::clone(&fixture.tensor_field),
Arc::clone(&fixture.rank_field),
Arc::clone(&fixture.tolerance_field),
],
&[None, Some(ScalarValue::Int64(Some(2))), Some(fixture.tolerance.clone())],
1,
)
.expect("tensor_tt_svd");
let reconstructed = reconstruct_tt_tensor(&tt_field, &tt_output);
assert_tensor4_matches(&reconstructed, &fixture.original, fixture.eps);
let (norm_field, norm_output) = invoke_udf(
&udfs::tensor_tt_norm_udf(),
vec![ColumnarValue::Array(Arc::new(struct_array(&tt_output).clone()))],
vec![Arc::clone(&tt_field)],
&[None],
1,
)
.expect("tensor_tt_norm");
assert_eq!(norm_field.data_type(), &DataType::Float64);
let tt_norm = f64_array(&norm_output).value(0);
let (inner_field, inner_output) = invoke_udf(
&udfs::tensor_tt_inner_udf(),
vec![
ColumnarValue::Array(Arc::new(struct_array(&tt_output).clone())),
ColumnarValue::Array(Arc::new(struct_array(&tt_output).clone())),
],
vec![Arc::clone(&tt_field), Arc::clone(&tt_field)],
&[None, None],
1,
)
.expect("tensor_tt_inner");
assert_eq!(inner_field.data_type(), &DataType::Float64);
let tt_inner = f64_array(&inner_output).value(0);
assert_close_eps(tt_inner, tt_norm * tt_norm, fixture.eps);
}
#[test]
fn tensor_tt_structure_udfs_preserve_tensor() {
let fixture = tensor_decomposition_fixture();
let (tt_field, tt_output) = invoke_udf(
&udfs::tensor_tt_svd_udf(),
vec![
ColumnarValue::Array(Arc::new(fixture.tensor)),
ColumnarValue::Scalar(ScalarValue::Int64(Some(2))),
ColumnarValue::Scalar(fixture.tolerance.clone()),
],
vec![
Arc::clone(&fixture.tensor_field),
Arc::clone(&fixture.rank_field),
Arc::clone(&fixture.tolerance_field),
],
&[None, Some(ScalarValue::Int64(Some(2))), Some(fixture.tolerance.clone())],
1,
)
.expect("tensor_tt_svd");
let (left_field, left_output) = invoke_udf(
&udfs::tensor_tt_orthogonalize_left_udf(),
vec![ColumnarValue::Array(Arc::new(struct_array(&tt_output).clone()))],
vec![Arc::clone(&tt_field)],
&[None],
1,
)
.expect("tensor_tt_orthogonalize_left");
assert_tensor4_matches(
&reconstruct_tt_tensor(&left_field, &left_output),
&fixture.original,
fixture.eps,
);
let (right_field, right_output) = invoke_udf(
&udfs::tensor_tt_orthogonalize_right_udf(),
vec![ColumnarValue::Array(Arc::new(struct_array(&tt_output).clone()))],
vec![Arc::clone(&tt_field)],
&[None],
1,
)
.expect("tensor_tt_orthogonalize_right");
assert_tensor4_matches(
&reconstruct_tt_tensor(&right_field, &right_output),
&fixture.original,
fixture.eps,
);
let (round_field, round_output) = invoke_udf(
&udfs::tensor_tt_round_udf(),
vec![
ColumnarValue::Array(Arc::new(struct_array(&tt_output).clone())),
ColumnarValue::Scalar(ScalarValue::Int64(Some(2))),
ColumnarValue::Scalar(fixture.tolerance),
],
vec![tt_field, fixture.rank_field, fixture.tolerance_field],
&[None, Some(ScalarValue::Int64(Some(2))), Some(ScalarValue::Float64(Some(1.0e-10)))],
1,
)
.expect("tensor_tt_round");
assert_tensor4_matches(
&reconstruct_tt_tensor(&round_field, &round_output),
&fixture.original,
fixture.eps,
);
}
#[test]
fn tensor_tt_add_and_hadamard_udfs_produce_expected_values() {
let fixture = tensor_decomposition_fixture();
let (tt_field, tt_output) = invoke_udf(
&udfs::tensor_tt_svd_udf(),
vec![
ColumnarValue::Array(Arc::new(fixture.tensor)),
ColumnarValue::Scalar(ScalarValue::Int64(Some(2))),
ColumnarValue::Scalar(fixture.tolerance.clone()),
],
vec![
Arc::clone(&fixture.tensor_field),
Arc::clone(&fixture.rank_field),
Arc::clone(&fixture.tolerance_field),
],
&[None, Some(ScalarValue::Int64(Some(2))), Some(fixture.tolerance.clone())],
1,
)
.expect("tensor_tt_svd");
let (add_field, add_output) = invoke_udf(
&udfs::tensor_tt_add_udf(),
vec![
ColumnarValue::Array(Arc::new(struct_array(&tt_output).clone())),
ColumnarValue::Array(Arc::new(struct_array(&tt_output).clone())),
],
vec![Arc::clone(&tt_field), Arc::clone(&tt_field)],
&[None, None],
1,
)
.expect("tensor_tt_add");
let add_reconstructed = reconstruct_tt_tensor(&add_field, &add_output);
let (hadamard_field, hadamard_output) = invoke_udf(
&udfs::tensor_tt_hadamard_udf(),
vec![
ColumnarValue::Array(Arc::new(struct_array(&tt_output).clone())),
ColumnarValue::Array(Arc::new(struct_array(&tt_output).clone())),
],
vec![Arc::clone(&tt_field), Arc::clone(&tt_field)],
&[None, None],
1,
)
.expect("tensor_tt_hadamard");
let hadamard_reconstructed = reconstruct_tt_tensor(&hadamard_field, &hadamard_output);
let (round_field, round_output) = invoke_udf(
&udfs::tensor_tt_hadamard_round_udf(),
vec![
ColumnarValue::Array(Arc::new(struct_array(&tt_output).clone())),
ColumnarValue::Array(Arc::new(struct_array(&tt_output).clone())),
ColumnarValue::Scalar(ScalarValue::Int64(Some(2))),
ColumnarValue::Scalar(fixture.tolerance),
],
vec![
Arc::clone(&tt_field),
Arc::clone(&tt_field),
fixture.rank_field,
fixture.tolerance_field,
],
&[None, None, Some(ScalarValue::Int64(Some(2))), Some(ScalarValue::Float64(Some(1.0e-10)))],
1,
)
.expect("tensor_tt_hadamard_round");
let rounded_reconstructed = reconstruct_tt_tensor(&round_field, &round_output);
for i in 0..2 {
for j in 0..2 {
for k in 0..2 {
assert_close_eps(
add_reconstructed[[0, i, j, k]],
fixture.original[[0, i, j, k]] * 2.0,
fixture.eps,
);
let squared = fixture.original[[0, i, j, k]] * fixture.original[[0, i, j, k]];
assert_close_eps(hadamard_reconstructed[[0, i, j, k]], squared, fixture.eps);
assert_close_eps(rounded_reconstructed[[0, i, j, k]], squared, fixture.eps);
}
}
}
}
struct TensorDecompositionFixtureF32 {
tensor_field: FieldRef,
tensor: FixedSizeListArray,
original: Array4<f32>,
rank_field: FieldRef,
max_iterations_field: FieldRef,
tolerance_field: FieldRef,
rank_list_field: FieldRef,
rank: ScalarValue,
max_iterations: ScalarValue,
tolerance: ScalarValue,
ranks: ScalarValue,
}
fn tensor_decomposition_fixture_f32() -> TensorDecompositionFixtureF32 {
let original =
Array4::from_shape_vec((1, 2, 2, 2), vec![1.0_f32, 3.0, 0.5, 1.5, 2.0, 6.0, 1.0, 3.0])
.expect("tensor shape");
let (tensor_field, tensor) =
tensor_batch4_f32("tensor_f32", [[[[1.0, 3.0], [0.5, 1.5]], [[2.0, 6.0], [1.0, 3.0]]]]);
TensorDecompositionFixtureF32 {
tensor_field,
tensor,
original,
rank_field: Arc::new(Field::new("rank", DataType::Int64, false)),
max_iterations_field: Arc::new(Field::new("max_iterations", DataType::Int64, false)),
tolerance_field: Arc::new(Field::new("tolerance", DataType::Float64, false)),
rank_list_field: Arc::new(Field::new(
"ranks",
DataType::new_list(DataType::Int32, false),
false,
)),
rank: ScalarValue::Int64(Some(1)),
max_iterations: ScalarValue::Int64(Some(50)),
tolerance: ScalarValue::Float64(Some(1.0e-8)),
ranks: scalar_int32_list(vec![1, 1, 1]),
}
}
fn assert_tensor4_matches_f32(actual: &Array4<f32>, expected: &Array4<f32>, epsilon: f64) {
for (actual, expected) in actual.iter().zip(expected.iter()) {
assert!((f64::from(*actual) - f64::from(*expected)).abs() < epsilon);
}
}
#[test]
fn tensor_decomposition_float32_cp_paths() {
let fixture = tensor_decomposition_fixture_f32();
let (cp3_field, cp3_output) = invoke_udf(
&udfs::tensor_cp_als3_udf(),
vec![
ColumnarValue::Array(Arc::new(fixture.tensor.clone())),
ColumnarValue::Scalar(fixture.rank.clone()),
ColumnarValue::Scalar(fixture.max_iterations.clone()),
ColumnarValue::Scalar(fixture.tolerance.clone()),
],
vec![
Arc::clone(&fixture.tensor_field),
Arc::clone(&fixture.rank_field),
Arc::clone(&fixture.max_iterations_field),
Arc::clone(&fixture.tolerance_field),
],
&[
None,
Some(fixture.rank.clone()),
Some(fixture.max_iterations.clone()),
Some(fixture.tolerance.clone()),
],
1,
)
.expect("tensor_cp_als3");
let (cp3_reconstruct_field, cp3_reconstruct_output) = invoke_udf(
&udfs::tensor_cp_als3_reconstruct_udf(),
vec![ColumnarValue::Array(Arc::new(struct_array(&cp3_output).clone()))],
vec![cp3_field],
&[None],
1,
)
.expect("tensor_cp_als3_reconstruct");
assert_tensor4_matches_f32(
&fixed_shape_view4_f32(&cp3_reconstruct_field, &cp3_reconstruct_output).to_owned(),
&fixture.original,
1.0e-3,
);
let (cp_nd_field, cp_nd_output) = invoke_udf(
&udfs::tensor_cp_als_nd_udf(),
vec![
ColumnarValue::Array(Arc::new(fixture.tensor)),
ColumnarValue::Scalar(fixture.rank),
ColumnarValue::Scalar(fixture.max_iterations),
ColumnarValue::Scalar(fixture.tolerance.clone()),
],
vec![
fixture.tensor_field,
fixture.rank_field,
fixture.max_iterations_field,
Arc::clone(&fixture.tolerance_field),
],
&[
None,
Some(ScalarValue::Int64(Some(1))),
Some(ScalarValue::Int64(Some(50))),
Some(fixture.tolerance.clone()),
],
1,
)
.expect("tensor_cp_als_nd");
let (cp_nd_reconstruct_field, cp_nd_reconstruct_output) = invoke_udf(
&udfs::tensor_cp_als_nd_reconstruct_udf(),
vec![ColumnarValue::Array(Arc::new(struct_array(&cp_nd_output).clone()))],
vec![cp_nd_field],
&[None],
1,
)
.expect("tensor_cp_als_nd_reconstruct");
assert_eq!(
fixed_shape_view4_f32(&cp_nd_reconstruct_field, &cp_nd_reconstruct_output).shape(),
&[1, 2, 2, 2]
);
}
#[test]
fn tensor_decomposition_float32_tucker_paths() {
let fixture = tensor_decomposition_fixture_f32();
let (hosvd_field, hosvd_output) = invoke_udf(
&udfs::tensor_hosvd_nd_udf(),
vec![
ColumnarValue::Array(Arc::new(fixture.tensor.clone())),
ColumnarValue::Scalar(fixture.ranks.clone()),
],
vec![Arc::clone(&fixture.tensor_field), Arc::clone(&fixture.rank_list_field)],
&[None, Some(fixture.ranks.clone())],
1,
)
.expect("tensor_hosvd_nd");
let (projected_field, projected_output) = invoke_udf(
&udfs::tensor_tucker_project_udf(),
vec![
ColumnarValue::Array(Arc::new(fixture.tensor.clone())),
ColumnarValue::Array(Arc::new(struct_array(&hosvd_output).clone())),
],
vec![Arc::clone(&fixture.tensor_field), Arc::clone(&hosvd_field)],
&[None, None],
1,
)
.expect("tensor_tucker_project");
assert_eq!(fixed_shape_view4_f32(&projected_field, &projected_output).shape(), &[1, 1, 1, 1]);
let (expanded_field, expanded_output) = invoke_udf(
&udfs::tensor_tucker_expand_udf(),
vec![ColumnarValue::Array(Arc::new(struct_array(&hosvd_output).clone()))],
vec![Arc::clone(&hosvd_field)],
&[None],
1,
)
.expect("tensor_tucker_expand");
assert_eq!(fixed_shape_view4_f32(&expanded_field, &expanded_output).shape(), &[1, 2, 2, 2]);
let (hooi_field, hooi_output) = invoke_udf(
&udfs::tensor_hooi_nd_udf(),
vec![
ColumnarValue::Array(Arc::new(fixture.tensor)),
ColumnarValue::Scalar(fixture.ranks),
ColumnarValue::Scalar(fixture.max_iterations),
ColumnarValue::Scalar(fixture.tolerance.clone()),
],
vec![
fixture.tensor_field,
fixture.rank_list_field,
fixture.max_iterations_field,
Arc::clone(&fixture.tolerance_field),
],
&[
None,
Some(scalar_int32_list(vec![1, 1, 1])),
Some(ScalarValue::Int64(Some(50))),
Some(fixture.tolerance.clone()),
],
1,
)
.expect("tensor_hooi_nd");
let (hooi_expanded_field, hooi_expanded_output) = invoke_udf(
&udfs::tensor_tucker_expand_udf(),
vec![ColumnarValue::Array(Arc::new(struct_array(&hooi_output).clone()))],
vec![hooi_field],
&[None],
1,
)
.expect("tensor_tucker_expand");
assert_eq!(fixed_shape_view4_f32(&hooi_expanded_field, &hooi_expanded_output).shape(), &[
1, 2, 2, 2
]);
}
fn tt_decomposition_output_f32() -> (TensorDecompositionFixtureF32, FieldRef, ColumnarValue) {
let fixture = tensor_decomposition_fixture_f32();
let tensor = fixture.tensor.clone();
let tensor_field = Arc::clone(&fixture.tensor_field);
let (tt_field, tt_output) = invoke_udf(
&udfs::tensor_tt_svd_udf(),
vec![
ColumnarValue::Array(Arc::new(tensor)),
ColumnarValue::Scalar(ScalarValue::Int64(Some(2))),
ColumnarValue::Scalar(fixture.tolerance.clone()),
],
vec![tensor_field, Arc::clone(&fixture.rank_field), Arc::clone(&fixture.tolerance_field)],
&[None, Some(ScalarValue::Int64(Some(2))), Some(fixture.tolerance.clone())],
1,
)
.expect("tensor_tt_svd");
(fixture, tt_field, tt_output)
}
#[test]
fn tensor_decomposition_float32_tt_reconstruction_and_metrics() {
let (fixture, tt_field, tt_output) = tt_decomposition_output_f32();
assert_tensor4_matches_f32(
&reconstruct_tt_tensor_f32(&tt_field, &tt_output),
&fixture.original,
1.0e-3,
);
let (norm_field, norm_output) = invoke_udf(
&udfs::tensor_tt_norm_udf(),
vec![ColumnarValue::Array(Arc::new(struct_array(&tt_output).clone()))],
vec![Arc::clone(&tt_field)],
&[None],
1,
)
.expect("tensor_tt_norm");
assert_eq!(norm_field.data_type(), &DataType::Float32);
assert!(f32_array(&norm_output).value(0).is_finite());
let (inner_field, inner_output) = invoke_udf(
&udfs::tensor_tt_inner_udf(),
vec![
ColumnarValue::Array(Arc::new(struct_array(&tt_output).clone())),
ColumnarValue::Array(Arc::new(struct_array(&tt_output).clone())),
],
vec![Arc::clone(&tt_field), Arc::clone(&tt_field)],
&[None, None],
1,
)
.expect("tensor_tt_inner");
assert_eq!(inner_field.data_type(), &DataType::Float32);
assert!(f32_array(&inner_output).value(0).is_finite());
}
#[test]
fn tensor_decomposition_float32_tt_structure_paths() {
let (fixture, tt_field, tt_output) = tt_decomposition_output_f32();
let (left_field, left_output) = invoke_udf(
&udfs::tensor_tt_orthogonalize_left_udf(),
vec![ColumnarValue::Array(Arc::new(struct_array(&tt_output).clone()))],
vec![Arc::clone(&tt_field)],
&[None],
1,
)
.expect("tensor_tt_orthogonalize_left");
assert_eq!(reconstruct_tt_tensor_f32(&left_field, &left_output).shape(), &[1, 2, 2, 2]);
let (right_field, right_output) = invoke_udf(
&udfs::tensor_tt_orthogonalize_right_udf(),
vec![ColumnarValue::Array(Arc::new(struct_array(&tt_output).clone()))],
vec![Arc::clone(&tt_field)],
&[None],
1,
)
.expect("tensor_tt_orthogonalize_right");
assert_eq!(reconstruct_tt_tensor_f32(&right_field, &right_output).shape(), &[1, 2, 2, 2]);
let (round_field, round_output) = invoke_udf(
&udfs::tensor_tt_round_udf(),
vec![
ColumnarValue::Array(Arc::new(struct_array(&tt_output).clone())),
ColumnarValue::Scalar(ScalarValue::Int64(Some(2))),
ColumnarValue::Scalar(fixture.tolerance.clone()),
],
vec![Arc::clone(&tt_field), fixture.rank_field, Arc::clone(&fixture.tolerance_field)],
&[None, Some(ScalarValue::Int64(Some(2))), Some(fixture.tolerance.clone())],
1,
)
.expect("tensor_tt_round");
assert_eq!(reconstruct_tt_tensor_f32(&round_field, &round_output).shape(), &[1, 2, 2, 2]);
let (add_field, add_output) = invoke_udf(
&udfs::tensor_tt_add_udf(),
vec![
ColumnarValue::Array(Arc::new(struct_array(&tt_output).clone())),
ColumnarValue::Array(Arc::new(struct_array(&tt_output).clone())),
],
vec![Arc::clone(&tt_field), Arc::clone(&tt_field)],
&[None, None],
1,
)
.expect("tensor_tt_add");
assert!(
reconstruct_tt_tensor_f32(&add_field, &add_output).iter().all(|value| value.is_finite())
);
let (hadamard_field, hadamard_output) = invoke_udf(
&udfs::tensor_tt_hadamard_udf(),
vec![
ColumnarValue::Array(Arc::new(struct_array(&tt_output).clone())),
ColumnarValue::Array(Arc::new(struct_array(&tt_output).clone())),
],
vec![Arc::clone(&tt_field), Arc::clone(&tt_field)],
&[None, None],
1,
)
.expect("tensor_tt_hadamard");
assert!(
reconstruct_tt_tensor_f32(&hadamard_field, &hadamard_output)
.iter()
.all(|value| value.is_finite())
);
let (hadamard_round_field, hadamard_round_output) = invoke_udf(
&udfs::tensor_tt_hadamard_round_udf(),
vec![
ColumnarValue::Array(Arc::new(struct_array(&tt_output).clone())),
ColumnarValue::Array(Arc::new(struct_array(&tt_output).clone())),
ColumnarValue::Scalar(ScalarValue::Int64(Some(2))),
ColumnarValue::Scalar(fixture.tolerance),
],
vec![
Arc::clone(&tt_field),
Arc::clone(&tt_field),
Arc::new(Field::new("max_rank", DataType::Int64, false)),
fixture.tolerance_field,
],
&[None, None, Some(ScalarValue::Int64(Some(2))), Some(ScalarValue::Float64(Some(1.0e-8)))],
1,
)
.expect("tensor_tt_hadamard_round");
assert!(
reconstruct_tt_tensor_f32(&hadamard_round_field, &hadamard_round_output)
.iter()
.all(|value| value.is_finite())
);
}