use base64::prelude::*;
use datafusion::arrow::array::{
Array, BinaryArray, BooleanArray, Float64Array, Int64Array, IntervalDayTimeArray,
LargeBinaryArray, LargeStringArray, ListArray, MapArray, StringArray, StringViewArray,
StructArray,
};
use datafusion::arrow::datatypes::{DataType, IntervalUnit, Schema};
use hamelin_executor::executor::ExecutorError;
use hamelin_executor::results::ResultSet;
use hamelin_lib::catalog::Column;
use hamelin_lib::types::decimal_type::Decimal;
use hamelin_lib::types::struct_type::Struct;
use hamelin_lib::types::Type;
use parquet_variant_compute::{unshred_variant, VariantArray};
use serde_json::Value;
pub fn arrow_to_hamelin_type(dt: &DataType) -> Type {
match dt {
DataType::Boolean => Type::Boolean,
DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 => Type::Int,
DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | DataType::UInt64 => Type::Int,
DataType::Float16 | DataType::Float32 | DataType::Float64 => Type::Double,
DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => Type::String,
DataType::Binary | DataType::LargeBinary | DataType::BinaryView => Type::Binary,
DataType::Timestamp(_, _) => Type::Timestamp,
DataType::Interval(IntervalUnit::YearMonth) => Type::CalendarInterval,
DataType::Interval(_) => Type::Interval,
DataType::Decimal128(precision, scale) | DataType::Decimal256(precision, scale) => {
Decimal::new(*precision as i32, *scale as i32)
.map(Type::from)
.unwrap_or(Type::Unknown)
}
DataType::List(field) | DataType::LargeList(field) => {
let element_type = arrow_to_hamelin_type(field.data_type());
hamelin_lib::types::array::Array::new(element_type).into()
}
DataType::Struct(_) if crate::udf::is_variant_data_type(dt) => Type::Variant,
DataType::Struct(fields) => {
let mut s = Struct::new([]);
for f in fields.iter() {
s = s.with_str(f.name(), arrow_to_hamelin_type(f.data_type()));
}
s.into()
}
DataType::Map(field, _) => {
if let DataType::Struct(entry_fields) = field.data_type() {
if entry_fields.len() >= 2 {
let key_type = arrow_to_hamelin_type(entry_fields[0].data_type());
let value_type = arrow_to_hamelin_type(entry_fields[1].data_type());
hamelin_lib::types::map::Map::new(key_type, value_type).into()
} else {
Type::Unknown
}
} else {
Type::Unknown
}
}
DataType::Null => Type::Unknown,
_ => Type::Unknown,
}
}
pub fn arrow_schema_to_columns(schema: &Schema) -> Vec<Column> {
schema
.fields()
.iter()
.map(|f| Column {
name: f.name().as_str().into(),
typ: arrow_to_hamelin_type(f.data_type()).into(),
})
.collect()
}
pub fn arrow_batches_to_result_set(
batches: &[datafusion::arrow::record_batch::RecordBatch],
columns: Vec<Column>,
) -> Result<ResultSet, ExecutorError> {
let types: Vec<Type> = columns
.iter()
.map(|col| {
Type::try_from(col.typ.clone()).map_err(|e| {
ExecutorError::UnexpectedResultSet(
anyhow::anyhow!("Cannot convert column '{}' type: {}", col.name, e).into(),
)
})
})
.collect::<Result<_, _>>()?;
if batches.is_empty() {
return Ok(ResultSet::new(columns, vec![]));
}
let mut rows: Vec<Vec<Value>> = Vec::new();
let expected_columns = types.len();
let expected_schema = batches[0].schema();
for batch in batches {
debug_assert_eq!(
batch.num_columns(),
expected_columns,
"RecordBatch column count does not match output schema"
);
debug_assert!(
batch.schema().as_ref() == expected_schema.as_ref(),
"RecordBatch schema does not match first batch schema"
);
let num_rows = batch.num_rows();
for row_idx in 0..num_rows {
let mut row: Vec<Value> = Vec::with_capacity(batch.num_columns());
for (col_idx, typ) in types.iter().enumerate() {
let col = batch.column(col_idx);
let value = arrow_value_to_json_typed(col.as_ref(), row_idx, typ)?;
row.push(value);
}
rows.push(row);
}
}
Ok(ResultSet::new(columns, rows))
}
fn variant_struct_to_json(arr: &StructArray, idx: usize) -> Result<Value, ExecutorError> {
let variant_array = VariantArray::try_new(arr).map_err(|e| {
ExecutorError::UnexpectedResultSet(
anyhow::anyhow!("Failed to create VariantArray: {}", e).into(),
)
})?;
let unshredded = unshred_variant(&variant_array).map_err(|e| {
ExecutorError::UnexpectedResultSet(
anyhow::anyhow!("Failed to unshred variant: {}", e).into(),
)
})?;
if !unshredded.is_valid(idx) {
return Ok(Value::Null);
}
let variant = unshredded.value(idx);
variant_to_json(&variant)
}
fn decimal_to_json_number(integer: f64, scale: u8) -> Result<Value, ExecutorError> {
let divisor = 10f64.powi(scale as i32);
let float_val = integer / divisor;
Ok(serde_json::Number::from_f64(float_val)
.map(Value::Number)
.unwrap_or(Value::Null))
}
fn variant_to_json(variant: &parquet_variant::Variant) -> Result<Value, ExecutorError> {
use parquet_variant::Variant;
match variant {
Variant::Null => Ok(Value::Null),
Variant::BooleanTrue => Ok(Value::Bool(true)),
Variant::BooleanFalse => Ok(Value::Bool(false)),
Variant::Int8(n) => Ok(Value::Number((*n).into())),
Variant::Int16(n) => Ok(Value::Number((*n).into())),
Variant::Int32(n) => Ok(Value::Number((*n).into())),
Variant::Int64(n) => Ok(Value::Number((*n).into())),
Variant::Float(f) => Ok(serde_json::Number::from_f64(*f as f64)
.map(Value::Number)
.unwrap_or(Value::Null)),
Variant::Double(f) => Ok(serde_json::Number::from_f64(*f)
.map(Value::Number)
.unwrap_or(Value::Null)),
Variant::Decimal4(d) => decimal_to_json_number(d.integer() as f64, d.scale()),
Variant::Decimal8(d) => decimal_to_json_number(d.integer() as f64, d.scale()),
Variant::Decimal16(d) => decimal_to_json_number(d.integer() as f64, d.scale()),
Variant::Date(d) => Ok(Value::String(d.to_string())),
Variant::TimestampMicros(ts) => Ok(Value::String(ts.to_rfc3339())),
Variant::TimestampNtzMicros(ts) => Ok(Value::String(ts.to_string())),
Variant::TimestampNanos(ts) => Ok(Value::String(ts.to_rfc3339())),
Variant::TimestampNtzNanos(ts) => Ok(Value::String(ts.to_string())),
Variant::Time(t) => Ok(Value::String(t.to_string())),
Variant::Uuid(u) => Ok(Value::String(u.to_string())),
Variant::Binary(b) => Ok(Value::String(BASE64_STANDARD.encode(b))),
Variant::String(s) => Ok(Value::String(s.to_string())),
Variant::ShortString(s) => Ok(Value::String(s.to_string())),
Variant::Object(obj) => {
let mut map = serde_json::Map::new();
for (key, value) in obj.iter() {
map.insert(key.to_string(), variant_to_json(&value)?);
}
Ok(Value::Object(map))
}
Variant::List(list) => {
let mut arr = Vec::with_capacity(list.len());
for value in list.iter() {
arr.push(variant_to_json(&value)?);
}
Ok(Value::Array(arr))
}
}
}
fn arrow_value_to_json_typed(
array: &dyn Array,
idx: usize,
typ: &Type,
) -> Result<Value, ExecutorError> {
use datafusion::arrow::array::{
Decimal128Array, Decimal256Array, DurationMicrosecondArray, Float32Array, Int16Array,
Int32Array, Int8Array, IntervalYearMonthArray, TimestampMicrosecondArray, UInt16Array,
UInt32Array, UInt64Array, UInt8Array,
};
if array.is_null(idx) {
return Ok(Value::Null);
}
if array.data_type() == &DataType::Null {
return Ok(Value::Null);
}
match typ {
Type::Boolean => {
let arr = array
.as_any()
.downcast_ref::<BooleanArray>()
.ok_or_else(|| {
ExecutorError::UnexpectedResultSet(
anyhow::anyhow!("Expected BooleanArray for Bool type").into(),
)
})?;
Ok(Value::Bool(arr.value(idx)))
}
Type::Int => {
if let Some(arr) = array.as_any().downcast_ref::<Int64Array>() {
return Ok(Value::Number(arr.value(idx).into()));
}
if let Some(arr) = array.as_any().downcast_ref::<Int32Array>() {
return Ok(Value::Number(arr.value(idx).into()));
}
if let Some(arr) = array.as_any().downcast_ref::<Int16Array>() {
return Ok(Value::Number(arr.value(idx).into()));
}
if let Some(arr) = array.as_any().downcast_ref::<Int8Array>() {
return Ok(Value::Number(arr.value(idx).into()));
}
if let Some(arr) = array.as_any().downcast_ref::<UInt64Array>() {
return Ok(Value::Number(arr.value(idx).into()));
}
if let Some(arr) = array.as_any().downcast_ref::<UInt32Array>() {
return Ok(Value::Number(arr.value(idx).into()));
}
if let Some(arr) = array.as_any().downcast_ref::<UInt16Array>() {
return Ok(Value::Number(arr.value(idx).into()));
}
if let Some(arr) = array.as_any().downcast_ref::<UInt8Array>() {
return Ok(Value::Number(arr.value(idx).into()));
}
Err(ExecutorError::UnexpectedResultSet(
anyhow::anyhow!(
"Expected integer array for Int type, got {:?}",
array.data_type()
)
.into(),
))
}
Type::Double => {
if let Some(arr) = array.as_any().downcast_ref::<Float64Array>() {
let v = arr.value(idx);
return Ok(serde_json::Number::from_f64(v)
.map(Value::Number)
.unwrap_or(Value::Null));
}
if let Some(arr) = array.as_any().downcast_ref::<Float32Array>() {
let v = arr.value(idx) as f64;
return Ok(serde_json::Number::from_f64(v)
.map(Value::Number)
.unwrap_or(Value::Null));
}
Err(ExecutorError::UnexpectedResultSet(
anyhow::anyhow!(
"Expected float array for Double type, got {:?}",
array.data_type()
)
.into(),
))
}
Type::String => {
if let Some(arr) = array.as_any().downcast_ref::<StringArray>() {
return Ok(Value::String(arr.value(idx).to_string()));
}
if let Some(arr) = array.as_any().downcast_ref::<LargeStringArray>() {
return Ok(Value::String(arr.value(idx).to_string()));
}
if let Some(arr) = array.as_any().downcast_ref::<StringViewArray>() {
return Ok(Value::String(arr.value(idx).to_string()));
}
Err(ExecutorError::UnexpectedResultSet(
anyhow::anyhow!(
"Expected string array for String type, got {:?}",
array.data_type()
)
.into(),
))
}
Type::Binary => {
if let Some(arr) = array.as_any().downcast_ref::<BinaryArray>() {
return Ok(Value::String(BASE64_STANDARD.encode(arr.value(idx))));
}
if let Some(arr) = array.as_any().downcast_ref::<LargeBinaryArray>() {
return Ok(Value::String(BASE64_STANDARD.encode(arr.value(idx))));
}
Err(ExecutorError::UnexpectedResultSet(
anyhow::anyhow!(
"Expected binary array for Binary type, got {:?}",
array.data_type()
)
.into(),
))
}
Type::Timestamp => {
let arr = array
.as_any()
.downcast_ref::<TimestampMicrosecondArray>()
.ok_or_else(|| {
ExecutorError::UnexpectedResultSet(
anyhow::anyhow!(
"Expected TimestampMicrosecondArray for Timestamp type, got {:?}",
array.data_type()
)
.into(),
)
})?;
let micros = arr.value(idx);
match chrono::DateTime::from_timestamp_micros(micros) {
Some(dt) => Ok(Value::String(dt.to_rfc3339())),
None => Ok(Value::Null),
}
}
Type::Interval => {
if let Some(arr) = array.as_any().downcast_ref::<IntervalDayTimeArray>() {
let interval = arr.value(idx);
let total_millis = interval.days as i64 * 86_400_000 + interval.milliseconds as i64;
let is_negative = total_millis < 0;
let abs_millis = total_millis.abs();
let days = abs_millis / 86_400_000;
let remaining = abs_millis % 86_400_000;
let hours = remaining / 3_600_000;
let remaining = remaining % 3_600_000;
let minutes = remaining / 60_000;
let remaining = remaining % 60_000;
let seconds = remaining / 1000;
let millis = remaining % 1000;
let sign = if is_negative { "-" } else { "" };
return Ok(Value::String(format!(
"{}{} {:02}:{:02}:{:02}.{:03}",
sign, days, hours, minutes, seconds, millis
)));
}
if let Some(arr) = array.as_any().downcast_ref::<DurationMicrosecondArray>() {
let micros = arr.value(idx);
let is_negative = micros < 0;
let abs_micros = micros.abs();
let total_millis = abs_micros / 1_000;
let days = total_millis / 86_400_000;
let remaining_millis = total_millis % 86_400_000;
let hours = remaining_millis / 3_600_000;
let remaining = remaining_millis % 3_600_000;
let minutes = remaining / 60_000;
let remaining = remaining % 60_000;
let seconds = remaining / 1000;
let millis = remaining % 1000;
let sign = if is_negative { "-" } else { "" };
return Ok(Value::String(format!(
"{}{} {:02}:{:02}:{:02}.{:03}",
sign, days, hours, minutes, seconds, millis
)));
}
Err(ExecutorError::UnexpectedResultSet(
anyhow::anyhow!(
"Expected IntervalDayTimeArray or DurationMicrosecondArray for Interval type, got {:?}",
array.data_type()
)
.into(),
))
}
Type::CalendarInterval => {
let arr = array
.as_any()
.downcast_ref::<IntervalYearMonthArray>()
.ok_or_else(|| {
ExecutorError::UnexpectedResultSet(
anyhow::anyhow!(
"Expected IntervalYearMonthArray for CalendarInterval type, got {:?}",
array.data_type()
)
.into(),
)
})?;
let total_months = arr.value(idx);
let is_negative = total_months < 0;
let abs_months = total_months.abs();
let years = abs_months / 12;
let months = abs_months % 12;
let sign = if is_negative { "-" } else { "" };
Ok(Value::String(format!("{}{}-{}", sign, years, months)))
}
Type::Decimal(_) => {
if let Some(arr) = array.as_any().downcast_ref::<Decimal128Array>() {
return Ok(Value::String(arr.value_as_string(idx)));
}
if let Some(arr) = array.as_any().downcast_ref::<Decimal256Array>() {
return Ok(Value::String(arr.value_as_string(idx)));
}
Err(ExecutorError::UnexpectedResultSet(
anyhow::anyhow!(
"Expected Decimal128Array or Decimal256Array for Decimal type, got {:?}",
array.data_type()
)
.into(),
))
}
Type::Tuple(tuple) => {
let arr = array
.as_any()
.downcast_ref::<StructArray>()
.ok_or_else(|| {
ExecutorError::UnexpectedResultSet(
anyhow::anyhow!(
"Expected StructArray for Tuple type, got {:?}",
array.data_type()
)
.into(),
)
})?;
let mut result = Vec::with_capacity(tuple.elements.len());
for (i, elem_type) in tuple.elements.iter().enumerate() {
let col = arr.column(i);
result.push(arrow_value_to_json_typed(col.as_ref(), idx, elem_type)?);
}
Ok(Value::Array(result))
}
Type::Variant => {
let arr = array
.as_any()
.downcast_ref::<StructArray>()
.ok_or_else(|| {
ExecutorError::UnexpectedResultSet(
anyhow::anyhow!(
"Expected StructArray for Variant type, got {:?}",
array.data_type()
)
.into(),
)
})?;
let json_value = variant_struct_to_json(arr, idx)?;
Ok(Value::String(json_value.to_string()))
}
Type::Array(array_type) => {
let arr = array.as_any().downcast_ref::<ListArray>().ok_or_else(|| {
ExecutorError::UnexpectedResultSet(
anyhow::anyhow!(
"Expected ListArray for Array type, got {:?}",
array.data_type()
)
.into(),
)
})?;
let values = arr.value(idx);
let mut result = Vec::with_capacity(values.len());
for i in 0..values.len() {
result.push(arrow_value_to_json_typed(
values.as_ref(),
i,
&array_type.element_type,
)?);
}
Ok(Value::Array(result))
}
Type::Map(map_type) => {
let arr = array.as_any().downcast_ref::<MapArray>().ok_or_else(|| {
ExecutorError::UnexpectedResultSet(
anyhow::anyhow!(
"Expected MapArray for Map type, got {:?}",
array.data_type()
)
.into(),
)
})?;
let entries = arr.value(idx);
let struct_arr = entries
.as_any()
.downcast_ref::<StructArray>()
.ok_or_else(|| {
ExecutorError::UnexpectedResultSet(
anyhow::anyhow!("Expected StructArray for map entries").into(),
)
})?;
let keys = struct_arr.column(0);
let values = struct_arr.column(1);
let mut map = serde_json::Map::new();
for i in 0..entries.len() {
let key = arrow_value_to_json_typed(keys.as_ref(), i, &map_type.key_type)?;
let key_str = match key {
Value::String(s) => s,
other => other.to_string(),
};
let value = arrow_value_to_json_typed(values.as_ref(), i, &map_type.value_type)?;
map.insert(key_str, value);
}
Ok(Value::Object(map))
}
Type::Struct(struct_type) => {
let arr = array
.as_any()
.downcast_ref::<StructArray>()
.ok_or_else(|| {
ExecutorError::UnexpectedResultSet(
anyhow::anyhow!(
"Expected StructArray for Struct type, got {:?}",
array.data_type()
)
.into(),
)
})?;
let mut result = Vec::with_capacity(arr.num_columns());
for (i, (_, field_type)) in struct_type.iter().enumerate() {
let col = arr.column(i);
result.push(arrow_value_to_json_typed(col.as_ref(), idx, field_type)?);
}
Ok(Value::Array(result))
}
Type::Range(range_type) => {
let arr = array
.as_any()
.downcast_ref::<StructArray>()
.ok_or_else(|| {
ExecutorError::UnexpectedResultSet(
anyhow::anyhow!(
"Expected StructArray for Range type, got {:?}",
array.data_type()
)
.into(),
)
})?;
let begin = arrow_value_to_json_typed(arr.column(0).as_ref(), idx, &range_type.of)?;
let end = arrow_value_to_json_typed(arr.column(1).as_ref(), idx, &range_type.of)?;
Ok(Value::Array(vec![begin, end]))
}
Type::Rows => {
if let Some(arr) = array.as_any().downcast_ref::<Int64Array>() {
return Ok(Value::Number(arr.value(idx).into()));
}
if let Some(arr) = array.as_any().downcast_ref::<Int32Array>() {
return Ok(Value::Number(arr.value(idx).into()));
}
Err(ExecutorError::UnexpectedResultSet(
anyhow::anyhow!(
"Expected integer array for Rows type, got {:?}",
array.data_type()
)
.into(),
))
}
Type::Unknown => Ok(Value::Null),
Type::Function(_) => Err(ExecutorError::UnexpectedResultSet(
anyhow::anyhow!("Function types cannot be converted to JSON").into(),
)),
}
}