use anyhow::{Result, anyhow};
use arrow::array::{
Array, ArrayRef, Float32Array, Float32Builder, Float64Array, Int32Array, Int32Builder,
Int64Array, Int64Builder, ListArray,
};
use datafusion::{logical_expr::ColumnarValue, scalar::ScalarValue};
use ndarray::ArrayD;
use std::sync::Arc;
pub fn arrayref_slice_to_ndarray(args: &[ArrayRef]) -> Result<ArrayD<f32>> {
if args.is_empty() {
return Err(anyhow!("No input array provided for conversion"));
}
if args.len() == 1 {
if let Some(_list_arr) = args[0].as_any().downcast_ref::<ListArray>() {
return list_arrayref_to_ndarray(args);
}
}
let num_features = args.len();
let sequence_length = args[0].len();
let batch_size = 1;
let mut tensor = ndarray::Array3::<f32>::zeros((batch_size, sequence_length, num_features));
for (feature_idx, array) in args.iter().enumerate() {
if let Some(f32_arr) = array.as_any().downcast_ref::<Float32Array>() {
for i in 0..sequence_length {
tensor[[0, i, feature_idx]] = f32_arr.value(i);
}
} else if let Some(f64_arr) = array.as_any().downcast_ref::<Float64Array>() {
for i in 0..sequence_length {
tensor[[0, i, feature_idx]] = f64_arr.value(i) as f32;
}
} else if let Some(i64_arr) = array.as_any().downcast_ref::<Int64Array>() {
for i in 0..sequence_length {
tensor[[0, i, feature_idx]] = i64_arr.value(i) as f32;
}
} else if let Some(i32_arr) = array.as_any().downcast_ref::<Int32Array>() {
for i in 0..sequence_length {
tensor[[0, i, feature_idx]] = i32_arr.value(i) as f32;
}
} else {
return Err(anyhow!(
"Unsupported data type for column {} : {:?}",
feature_idx,
array.data_type()
));
}
}
Ok(tensor.into_dyn())
}
pub fn columnar_values_to_arrayrefs(args: &[ColumnarValue]) -> Result<Vec<ArrayRef>> {
if args.is_empty() {
return Err(anyhow!("No inputs provided for ColumnarValue conversion"));
}
let row_count = args
.iter()
.find_map(|arg| match arg {
ColumnarValue::Array(a) => {
if let Some(list_arr) = a.as_any().downcast_ref::<ListArray>() {
if list_arr.len() == 1 {
Some(list_arr.value(0).len())
} else {
Some(list_arr.len())
}
} else {
Some(a.len())
}
}
ColumnarValue::Scalar(_) => None,
})
.unwrap_or(1);
let mut arrs: Vec<ArrayRef> = Vec::with_capacity(args.len());
for cv in args {
let array_ref = match cv {
ColumnarValue::Array(a) => {
if let Some(list_arr) = a.as_any().downcast_ref::<ListArray>() {
if list_arr.len() == 1 {
list_arr.value(0)
} else {
a.clone()
}
} else {
a.clone()
}
}
ColumnarValue::Scalar(s) => {
match s {
ScalarValue::Float32(Some(v)) => {
let mut builder = Float32Builder::new();
builder.append_value_n(*v, row_count);
Arc::new(builder.finish()) as ArrayRef
}
ScalarValue::Float64(Some(v)) => {
let mut builder = Float32Builder::new();
builder.append_value_n(*v as f32, row_count);
Arc::new(builder.finish()) as ArrayRef
}
ScalarValue::Int64(Some(v)) => {
let mut builder = Int64Builder::new();
builder.append_value_n(*v, row_count);
Arc::new(builder.finish()) as ArrayRef
}
ScalarValue::Int32(Some(v)) => {
let mut builder = Int32Builder::new();
builder.append_value_n(*v, row_count);
Arc::new(builder.finish()) as ArrayRef
}
ScalarValue::UInt64(Some(v)) => {
let mut builder = Int64Builder::new();
builder.append_value_n(*v as i64, row_count);
Arc::new(builder.finish()) as ArrayRef
}
ScalarValue::UInt32(Some(v)) => {
let mut builder = Int32Builder::new();
builder.append_value_n(*v as i32, row_count);
Arc::new(builder.finish()) as ArrayRef
}
_ => {
return Err(anyhow!(
"Unsupported scalar type for ColumnarValue: {:?}",
s
));
}
}
}
};
arrs.push(array_ref);
}
Ok(arrs)
}
pub fn list_arrayref_to_ndarray(args: &[ArrayRef]) -> Result<ArrayD<f32>> {
if args.is_empty() {
return Err(anyhow!("No input array provided for conversion"));
}
if args.len() != 1 {
return Err(anyhow!(
"Expected single ListArray input for embedding conversion, got {} arrays",
args.len()
));
}
let array = &args[0];
if let Some(list_arr) = array.as_any().downcast_ref::<ListArray>() {
let num_rows = list_arr.len();
if num_rows == 0 {
return Err(anyhow!("Empty ListArray provided"));
}
let first_list = list_arr.value(0);
let embedding_dim = first_list.len();
let mut tensor = ndarray::Array2::<f32>::zeros((num_rows, embedding_dim));
for row_idx in 0..num_rows {
let embedding_array = list_arr.value(row_idx);
if let Some(f32_arr) = embedding_array.as_any().downcast_ref::<Float32Array>() {
if f32_arr.len() != embedding_dim {
return Err(anyhow!(
"Inconsistent embedding dimension at row {}: expected {}, got {}",
row_idx,
embedding_dim,
f32_arr.len()
));
}
for col_idx in 0..embedding_dim {
tensor[[row_idx, col_idx]] = f32_arr.value(col_idx);
}
} else if let Some(f64_arr) = embedding_array.as_any().downcast_ref::<Float64Array>() {
if f64_arr.len() != embedding_dim {
return Err(anyhow!(
"Inconsistent embedding dimension at row {}: expected {}, got {}",
row_idx,
embedding_dim,
f64_arr.len()
));
}
for col_idx in 0..embedding_dim {
tensor[[row_idx, col_idx]] = f64_arr.value(col_idx) as f32;
}
} else {
return Err(anyhow!(
"Unsupported inner array type in ListArray: {:?}",
embedding_array.data_type()
));
}
}
Ok(tensor.into_dyn())
} else {
Err(anyhow!(
"Expected ListArray for embedding conversion, got {:?}",
array.data_type()
))
}
}