#![cfg(feature = "rhai-runtime")]
use arrow_array::{
Array, ArrayRef, BooleanArray, Float64Array, Int64Array, StringArray,
builder::{BooleanBuilder, Float64Builder, Int64Builder, StringBuilder},
};
use arrow_schema::DataType;
use datafusion::logical_expr::ColumnarValue;
use datafusion::scalar::ScalarValue;
use rhai::Dynamic;
use std::sync::Arc;
use crate::error::RhaiError;
pub fn column_row_to_dynamic(
arg: &ColumnarValue,
row: usize,
expected: &DataType,
) -> Result<Dynamic, RhaiError> {
match arg {
ColumnarValue::Scalar(s) => scalar_to_dynamic(s),
ColumnarValue::Array(a) => array_row_to_dynamic(a, row, expected),
}
}
pub fn array_row_to_dynamic(
arr: &ArrayRef,
row: usize,
expected: &DataType,
) -> Result<Dynamic, RhaiError> {
if arr.is_null(row) {
return Ok(Dynamic::UNIT);
}
match expected {
DataType::Float64 => {
let a = arr.as_any().downcast_ref::<Float64Array>().ok_or_else(|| {
RhaiError::Conversion(format!("expected Float64 array, got {:?}", arr.data_type()))
})?;
Ok(Dynamic::from(a.value(row)))
}
DataType::Int64 => {
let a = arr.as_any().downcast_ref::<Int64Array>().ok_or_else(|| {
RhaiError::Conversion(format!("expected Int64 array, got {:?}", arr.data_type()))
})?;
Ok(Dynamic::from(a.value(row)))
}
DataType::Utf8 => {
let a = arr.as_any().downcast_ref::<StringArray>().ok_or_else(|| {
RhaiError::Conversion(format!("expected Utf8 array, got {:?}", arr.data_type()))
})?;
Ok(Dynamic::from(a.value(row).to_owned()))
}
DataType::Boolean => {
let a = arr.as_any().downcast_ref::<BooleanArray>().ok_or_else(|| {
RhaiError::Conversion(format!("expected Boolean array, got {:?}", arr.data_type()))
})?;
Ok(Dynamic::from(a.value(row)))
}
DataType::Null => Ok(Dynamic::UNIT),
other => Err(RhaiError::Conversion(format!(
"unsupported Arrow type for Rhai bridge: {other:?}"
))),
}
}
pub fn scalar_to_dynamic(s: &ScalarValue) -> Result<Dynamic, RhaiError> {
Ok(match s {
ScalarValue::Null => Dynamic::UNIT,
ScalarValue::Boolean(Some(b)) => Dynamic::from(*b),
ScalarValue::Float64(Some(f)) => Dynamic::from(*f),
ScalarValue::Float32(Some(f)) => Dynamic::from(*f as f64),
ScalarValue::Int64(Some(i)) => Dynamic::from(*i),
ScalarValue::Int32(Some(i)) => Dynamic::from(*i as i64),
ScalarValue::Utf8(Some(s)) => Dynamic::from(s.clone()),
ScalarValue::LargeUtf8(Some(s)) => Dynamic::from(s.clone()),
s if s.is_null() => Dynamic::UNIT,
other => {
return Err(RhaiError::Conversion(format!(
"unsupported ScalarValue for Rhai bridge: {other:?}"
)));
}
})
}
#[derive(Debug)]
pub enum OutBuilder {
Bool(BooleanBuilder),
Int(Int64Builder),
Float(Float64Builder),
Str(StringBuilder),
}
impl OutBuilder {
pub fn new(ty: &DataType, rows: usize) -> Result<Self, RhaiError> {
Ok(match ty {
DataType::Boolean => Self::Bool(BooleanBuilder::with_capacity(rows)),
DataType::Int64 => Self::Int(Int64Builder::with_capacity(rows)),
DataType::Float64 => Self::Float(Float64Builder::with_capacity(rows)),
DataType::Utf8 => Self::Str(StringBuilder::with_capacity(rows, 16 * rows)),
other => {
return Err(RhaiError::Conversion(format!(
"unsupported output Arrow type: {other:?}"
)));
}
})
}
pub fn push(&mut self, d: Dynamic) -> Result<(), RhaiError> {
if d.is_unit() {
match self {
Self::Bool(b) => b.append_null(),
Self::Int(b) => b.append_null(),
Self::Float(b) => b.append_null(),
Self::Str(b) => b.append_null(),
}
return Ok(());
}
match self {
Self::Bool(b) => {
let v = d
.as_bool()
.map_err(|t| RhaiError::Conversion(format!("expected bool, got {t}")))?;
b.append_value(v);
}
Self::Int(b) => {
let v = d
.as_int()
.map_err(|t| RhaiError::Conversion(format!("expected int, got {t}")))?;
b.append_value(v);
}
Self::Float(b) => {
let v = d
.as_float()
.map_err(|t| RhaiError::Conversion(format!("expected float, got {t}")))?;
b.append_value(v);
}
Self::Str(b) => {
let v = d
.into_string()
.map_err(|t| RhaiError::Conversion(format!("expected string, got {t}")))?;
b.append_value(v);
}
}
Ok(())
}
pub fn finish(self) -> ArrayRef {
match self {
Self::Bool(mut b) => Arc::new(b.finish()) as ArrayRef,
Self::Int(mut b) => Arc::new(b.finish()) as ArrayRef,
Self::Float(mut b) => Arc::new(b.finish()) as ArrayRef,
Self::Str(mut b) => Arc::new(b.finish()) as ArrayRef,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use arrow_array::Int64Array;
#[test]
fn int64_array_roundtrips_without_float_coercion() {
let arr: ArrayRef = Arc::new(Int64Array::from(vec![i64::MAX, 0, i64::MIN]));
let d0 = array_row_to_dynamic(&arr, 0, &DataType::Int64).unwrap();
assert_eq!(d0.as_int().unwrap(), i64::MAX);
let d2 = array_row_to_dynamic(&arr, 2, &DataType::Int64).unwrap();
assert_eq!(d2.as_int().unwrap(), i64::MIN);
}
#[test]
fn null_row_yields_unit() {
let arr: ArrayRef = Arc::new(Float64Array::from(vec![Some(1.5), None, Some(3.0)]));
let d = array_row_to_dynamic(&arr, 1, &DataType::Float64).unwrap();
assert!(d.is_unit());
}
#[test]
fn out_builder_handles_nulls() {
let mut b = OutBuilder::new(&DataType::Float64, 3).unwrap();
b.push(Dynamic::from(1.5)).unwrap();
b.push(Dynamic::UNIT).unwrap();
b.push(Dynamic::from(3.0)).unwrap();
let arr = b.finish();
let f = arr.as_any().downcast_ref::<Float64Array>().unwrap();
assert_eq!(f.value(0), 1.5);
assert!(f.is_null(1));
assert_eq!(f.value(2), 3.0);
}
#[test]
fn string_roundtrip() {
let arr: ArrayRef = Arc::new(StringArray::from(vec![Some("hi"), None, Some("there")]));
let d0 = array_row_to_dynamic(&arr, 0, &DataType::Utf8).unwrap();
assert_eq!(d0.into_string().unwrap(), "hi");
let d1 = array_row_to_dynamic(&arr, 1, &DataType::Utf8).unwrap();
assert!(d1.is_unit());
}
}