use std::any::Any;
use std::sync::Arc;
use datafusion::arrow::array::{
Array, ArrayBuilder, ArrayRef, AsArray, BooleanBuilder, Float32Builder, Float64Builder,
GenericListArray, Int16Builder, Int32Builder, Int64Builder, Int8Builder, ListBuilder, MapArray,
MapBuilder, OffsetSizeTrait, StringBuilder, StructArray, StructBuilder,
};
use datafusion::arrow::buffer::OffsetBuffer;
use datafusion::arrow::compute::CastOptions;
use datafusion::arrow::datatypes::{DataType, Field, FieldRef, Fields};
use datafusion::common::{Result, ScalarValue};
use datafusion::logical_expr::{
ColumnarValue, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature,
TypeSignature, Volatility,
};
use datafusion_common::DataFusionError;
use parquet_variant::{Variant, VariantBuilderExt, VariantPath};
use parquet_variant_compute::{
shred_variant, variant_get, GetOptions, VariantArray, VariantArrayBuilder,
};
use super::normalize_variant_struct;
use crate::struct_expansion::map_field_names;
type BoxedArrayBuilder = Box<dyn ArrayBuilder>;
#[derive(Debug, Hash, PartialEq, Eq)]
pub struct FromVariantUdf {
signature: Signature,
target_type: DataType,
}
impl FromVariantUdf {
pub fn new(target_type: DataType) -> Self {
Self {
signature: Signature::new(TypeSignature::Any(1), Volatility::Immutable),
target_type,
}
}
pub fn target_type(&self) -> &DataType {
&self.target_type
}
}
impl ScalarUDFImpl for FromVariantUdf {
fn as_any(&self) -> &dyn Any {
self
}
fn name(&self) -> &str {
"hamelin_from_variant"
}
fn signature(&self) -> &Signature {
&self.signature
}
fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
Ok(self.target_type.clone())
}
fn return_field_from_args(&self, _args: ReturnFieldArgs) -> Result<Arc<Field>> {
Ok(Arc::new(Field::new(
self.name(),
self.target_type.clone(),
true,
)))
}
fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
if args.args.len() != 1 {
return Err(DataFusionError::Execution(format!(
"hamelin_from_variant expects 1 argument, got {}",
args.args.len()
)));
}
match &args.args[0] {
ColumnarValue::Scalar(scalar) => {
let array = scalar.to_array_of_size(1)?;
let result = self.extract_from_variant(&array)?;
let scalar = ScalarValue::try_from_array(&result, 0)?;
Ok(ColumnarValue::Scalar(scalar))
}
ColumnarValue::Array(array) => {
let result = self.extract_from_variant(array)?;
Ok(ColumnarValue::Array(result))
}
}
}
}
impl FromVariantUdf {
fn extract_with_variant_get(&self, array: &ArrayRef) -> Result<ArrayRef> {
let target_field = Arc::new(Field::new("result", self.target_type.clone(), true));
let options = GetOptions {
path: VariantPath::default(),
as_type: Some(target_field),
cast_options: CastOptions {
safe: true,
..Default::default()
},
};
variant_get(array, options).map_err(|e| DataFusionError::Execution(e.to_string()))
}
fn extract_from_variant(&self, array: &ArrayRef) -> Result<ArrayRef> {
let needs_variant_array = matches!(
&self.target_type,
DataType::Utf8
| DataType::Map(_, _)
| DataType::List(_)
| DataType::LargeList(_)
| DataType::Struct(_)
);
if !needs_variant_array {
return self.extract_with_variant_get(array);
}
let variant_array =
VariantArray::try_new(array).map_err(|e| DataFusionError::Execution(e.to_string()))?;
if matches!(&self.target_type, DataType::Utf8) {
return self.variant_to_string(&variant_array);
}
if let DataType::Map(entry_field, _sorted) = &self.target_type {
return variant_to_map(&variant_array, entry_field);
}
if let DataType::List(element_field) | DataType::LargeList(element_field) =
&self.target_type
{
return variant_to_list(&variant_array, element_field);
}
if let DataType::Struct(fields) = &self.target_type {
if variant_array.typed_value_field().is_none() {
if contains_map_type(&self.target_type) {
return variant_to_struct(&variant_array, fields);
}
let shredded = shred_variant(&variant_array, &self.target_type)
.map_err(|e| DataFusionError::Execution(e.to_string()))?;
if let Some(typed_value) = shredded.typed_value_field() {
return extract_typed_value(typed_value, &self.target_type);
}
}
}
self.extract_with_variant_get(array)
}
fn variant_to_string(&self, variant_array: &VariantArray) -> Result<ArrayRef> {
let mut builder =
StringBuilder::with_capacity(variant_array.len(), variant_array.len() * 16);
for i in 0..variant_array.len() {
if !variant_array.is_valid(i) {
builder.append_null();
continue;
}
let variant = variant_array.value(i);
match variant {
Variant::Null => builder.append_null(),
Variant::BooleanTrue => builder.append_value("true"),
Variant::BooleanFalse => builder.append_value("false"),
Variant::Int8(n) => builder.append_value(n.to_string()),
Variant::Int16(n) => builder.append_value(n.to_string()),
Variant::Int32(n) => builder.append_value(n.to_string()),
Variant::Int64(n) => builder.append_value(n.to_string()),
Variant::Float(f) => builder.append_value(f.to_string()),
Variant::Double(f) => builder.append_value(f.to_string()),
Variant::Decimal4(d) => builder.append_value(d.to_string()),
Variant::Decimal8(d) => builder.append_value(d.to_string()),
Variant::Decimal16(d) => builder.append_value(d.to_string()),
Variant::Date(d) => builder.append_value(d.to_string()),
Variant::TimestampMicros(ts) => builder.append_value(ts.to_rfc3339()),
Variant::TimestampNtzMicros(ts) => builder.append_value(ts.to_string()),
Variant::TimestampNanos(ts) => builder.append_value(ts.to_rfc3339()),
Variant::TimestampNtzNanos(ts) => builder.append_value(ts.to_string()),
Variant::Time(t) => builder.append_value(t.to_string()),
Variant::Uuid(u) => builder.append_value(u.to_string()),
Variant::Binary(_) => builder.append_null(), Variant::String(s) => builder.append_value(s),
Variant::ShortString(s) => builder.append_value(s),
Variant::Object(_) | Variant::List(_) => builder.append_null(),
}
}
Ok(Arc::new(builder.finish()))
}
}
fn extract_typed_value(typed_value: &ArrayRef, target_type: &DataType) -> Result<ArrayRef> {
match target_type {
DataType::List(element_field) => {
extract_list_typed_value::<i32>(typed_value, element_field)
}
DataType::LargeList(element_field) => {
extract_list_typed_value::<i64>(typed_value, element_field)
}
DataType::Struct(fields) => extract_struct_typed_value(typed_value, fields),
DataType::Map(entry_field, _sorted) => extract_map_typed_value(typed_value, entry_field),
_ => Ok(typed_value.clone()),
}
}
fn extract_list_typed_value<O: OffsetSizeTrait>(
typed_value: &ArrayRef,
element_field: &FieldRef,
) -> Result<ArrayRef> {
let list_array = typed_value
.as_any()
.downcast_ref::<GenericListArray<O>>()
.ok_or_else(|| {
DataFusionError::Execution("Expected list array in shredded variant".to_string())
})?;
let element_struct = list_array.values().as_struct();
let inner_typed_value = element_struct
.column_by_name("typed_value")
.ok_or_else(|| {
DataFusionError::Execution("Missing typed_value in shredded list element".to_string())
})?;
let extracted_values = extract_typed_value(inner_typed_value, element_field.data_type())?;
let new_field = Arc::new(Field::new(
element_field.name(),
extracted_values.data_type().clone(),
element_field.is_nullable(),
));
let new_list = GenericListArray::<O>::try_new(
new_field,
list_array.offsets().clone(),
extracted_values,
list_array.nulls().cloned(),
)
.map_err(|e| DataFusionError::Execution(e.to_string()))?;
Ok(Arc::new(new_list))
}
fn extract_struct_typed_value(typed_value: &ArrayRef, target_fields: &Fields) -> Result<ArrayRef> {
let struct_array = typed_value.as_struct();
let mut extracted_columns: Vec<ArrayRef> = Vec::with_capacity(target_fields.len());
for target_field in target_fields.iter() {
let field_variant = struct_array
.column_by_name(target_field.name())
.ok_or_else(|| {
DataFusionError::Execution(format!(
"Missing field '{}' in shredded struct",
target_field.name()
))
})?;
let field_struct = field_variant.as_struct();
let inner_typed_value = field_struct.column_by_name("typed_value").ok_or_else(|| {
DataFusionError::Execution(format!(
"Missing typed_value for field '{}' in shredded struct",
target_field.name()
))
})?;
let extracted = extract_typed_value(inner_typed_value, target_field.data_type())?;
extracted_columns.push(extracted);
}
let new_struct = StructArray::try_new(
target_fields.clone(),
extracted_columns,
struct_array.nulls().cloned(),
)
.map_err(|e| DataFusionError::Execution(e.to_string()))?;
Ok(Arc::new(new_struct))
}
fn extract_map_typed_value(typed_value: &ArrayRef, entry_field: &FieldRef) -> Result<ArrayRef> {
let map_array = typed_value
.as_any()
.downcast_ref::<MapArray>()
.ok_or_else(|| {
DataFusionError::Execution("Expected map array in shredded variant".to_string())
})?;
let DataType::Struct(entry_fields) = entry_field.data_type() else {
return Err(DataFusionError::Execution(
"Map entry field must be a struct".to_string(),
));
};
let key_field = entry_fields
.iter()
.next()
.ok_or_else(|| DataFusionError::Execution("Map entry must have key field".to_string()))?;
let value_field = entry_fields
.iter()
.nth(1)
.ok_or_else(|| DataFusionError::Execution("Map entry must have value field".to_string()))?;
let entries = map_array.entries();
let keys_struct = entries.column(0).as_struct();
let values_struct = entries.column(1).as_struct();
let keys_typed = keys_struct.column_by_name("typed_value").ok_or_else(|| {
DataFusionError::Execution("Missing typed_value for map keys".to_string())
})?;
let values_typed = values_struct.column_by_name("typed_value").ok_or_else(|| {
DataFusionError::Execution("Missing typed_value for map values".to_string())
})?;
let extracted_keys = extract_typed_value(keys_typed, key_field.data_type())?;
let extracted_values = extract_typed_value(values_typed, value_field.data_type())?;
let new_entry_fields = Fields::from(vec![
Arc::new(Field::new(
key_field.name(),
extracted_keys.data_type().clone(),
key_field.is_nullable(),
)),
Arc::new(Field::new(
value_field.name(),
extracted_values.data_type().clone(),
value_field.is_nullable(),
)),
]);
let new_entries = StructArray::try_new(
new_entry_fields.clone(),
vec![extracted_keys, extracted_values],
entries.nulls().cloned(),
)
.map_err(|e| DataFusionError::Execution(e.to_string()))?;
let new_entry_field = Arc::new(Field::new(
entry_field.name(),
DataType::Struct(new_entry_fields),
entry_field.is_nullable(),
));
let offsets: OffsetBuffer<i32> = map_array.offsets().clone();
let new_map = MapArray::try_new(
new_entry_field,
offsets,
new_entries,
map_array.nulls().cloned(),
false,
)
.map_err(|e| DataFusionError::Execution(e.to_string()))?;
Ok(Arc::new(new_map))
}
pub fn from_variant_udf(target_type: DataType) -> ScalarUDF {
ScalarUDF::new_from_impl(FromVariantUdf::new(target_type))
}
fn contains_map_type(data_type: &DataType) -> bool {
match data_type {
DataType::Map(_, _) => true,
DataType::Struct(fields) => fields.iter().any(|f| contains_map_type(f.data_type())),
DataType::List(f) | DataType::LargeList(f) => contains_map_type(f.data_type()),
_ => false,
}
}
fn variant_to_struct(variant_array: &VariantArray, fields: &Fields) -> Result<ArrayRef> {
let mut builder = {
let field_builders: Vec<BoxedArrayBuilder> = fields
.iter()
.map(|f| make_builder_for_type(f.data_type()))
.collect::<Result<Vec<_>>>()?;
StructBuilder::new(fields.clone(), field_builders)
};
for i in 0..variant_array.len() {
if !variant_array.is_valid(i) {
let field_builders = builder.field_builders_mut();
for (j, field) in fields.iter().enumerate() {
append_null_to_builder(field_builders[j].as_mut(), field.data_type())?;
}
builder.append_null();
continue;
}
let variant = variant_array.value(i);
match variant {
Variant::Object(obj) => {
let field_builders = builder.field_builders_mut();
for (j, field) in fields.iter().enumerate() {
let field_value = obj.get(field.name()).unwrap_or(Variant::Null);
append_value_to_builder(
field_builders[j].as_mut(),
field.data_type(),
&field_value,
)?;
}
builder.append(true);
}
Variant::Null => {
let field_builders = builder.field_builders_mut();
for (j, field) in fields.iter().enumerate() {
append_null_to_builder(field_builders[j].as_mut(), field.data_type())?;
}
builder.append_null();
}
_ => {
return Err(DataFusionError::Execution(format!(
"Expected object variant for struct conversion, got {:?}",
variant
)));
}
}
}
Ok(Arc::new(builder.finish()))
}
fn variant_to_map(variant_array: &VariantArray, entry_field: &FieldRef) -> Result<ArrayRef> {
let DataType::Struct(entry_fields) = entry_field.data_type() else {
return Err(DataFusionError::Execution(
"Map entry field must be a struct".to_string(),
));
};
let key_field = entry_fields
.iter()
.next()
.ok_or_else(|| DataFusionError::Execution("Map entry must have key field".to_string()))?;
let value_field = entry_fields
.iter()
.nth(1)
.ok_or_else(|| DataFusionError::Execution("Map entry must have value field".to_string()))?;
if super::is_variant_data_type(value_field.data_type()) {
return variant_to_map_with_variant_values(
variant_array,
entry_field,
key_field,
value_field,
);
}
let key_builder = make_builder_for_type(key_field.data_type())?;
let value_builder = make_builder_for_type(value_field.data_type())?;
let mut map_builder = MapBuilder::new(Some(map_field_names()), key_builder, value_builder);
for i in 0..variant_array.len() {
if !variant_array.is_valid(i) {
map_builder.append(false)?;
continue;
}
let variant = variant_array.value(i);
match variant {
Variant::Object(obj) => {
for (key, val) in obj.iter() {
append_value_to_builder(
map_builder.keys(),
key_field.data_type(),
&Variant::String(key),
)?;
append_value_to_builder(map_builder.values(), value_field.data_type(), &val)?;
}
map_builder.append(true)?;
}
Variant::Null => {
map_builder.append(false)?;
}
_ => {
return Err(DataFusionError::Execution(format!(
"Expected object variant for map conversion, got {:?}",
variant
)));
}
}
}
Ok(Arc::new(map_builder.finish()))
}
fn variant_to_map_with_variant_values(
variant_array: &VariantArray,
entry_field: &FieldRef,
key_field: &Arc<Field>,
value_field: &Arc<Field>,
) -> Result<ArrayRef> {
let mut offsets: Vec<i32> = vec![0];
let mut nulls: Vec<bool> = Vec::new();
let mut total_elements = 0;
for i in 0..variant_array.len() {
if variant_array.is_valid(i) {
if let Variant::Object(obj) = variant_array.value(i) {
total_elements += obj.iter().count();
}
}
}
let mut key_builder = StringBuilder::with_capacity(total_elements, total_elements * 16);
let mut variant_builder = VariantArrayBuilder::new(total_elements);
for i in 0..variant_array.len() {
if !variant_array.is_valid(i) {
nulls.push(false);
offsets.push(key_builder.len() as i32);
continue;
}
let variant = variant_array.value(i);
match variant {
Variant::Object(obj) => {
for (key, val) in obj.iter() {
key_builder.append_value(key);
variant_builder.append_value(val);
}
nulls.push(true);
offsets.push(key_builder.len() as i32);
}
Variant::Null => {
nulls.push(false);
offsets.push(key_builder.len() as i32);
}
_ => {
return Err(DataFusionError::Execution(format!(
"Expected object variant for map conversion, got {:?}",
variant
)));
}
}
}
let keys_array: ArrayRef = Arc::new(key_builder.finish());
let values_struct = normalize_variant_struct(variant_builder.build().into());
let values_array: ArrayRef = Arc::new(values_struct);
let entries_fields = Fields::from(vec![
Arc::new(Field::new(
key_field.name(),
key_field.data_type().clone(),
false,
)),
Arc::new(Field::new(
value_field.name(),
value_field.data_type().clone(),
true,
)),
]);
let entries =
StructArray::try_new(entries_fields.clone(), vec![keys_array, values_array], None)
.map_err(|e| DataFusionError::Execution(format!("Failed to create entries: {e}")))?;
let offset_buffer = OffsetBuffer::new(offsets.into());
let null_buffer = datafusion::arrow::buffer::NullBuffer::from(nulls);
let new_entry_field = Arc::new(Field::new(
entry_field.name(),
DataType::Struct(entries_fields),
entry_field.is_nullable(),
));
let map_array = MapArray::try_new(
new_entry_field,
offset_buffer,
entries,
Some(null_buffer),
false,
)
.map_err(|e| DataFusionError::Execution(format!("Failed to create map array: {e}")))?;
Ok(Arc::new(map_array))
}
fn variant_to_list(variant_array: &VariantArray, element_field: &FieldRef) -> Result<ArrayRef> {
if super::is_variant_data_type(element_field.data_type()) {
return variant_to_list_of_variants(variant_array, element_field);
}
let inner_builder = make_builder_for_type(element_field.data_type())?;
let mut list_builder = ListBuilder::new(inner_builder);
for i in 0..variant_array.len() {
if !variant_array.is_valid(i) {
list_builder.append(false);
continue;
}
let variant = variant_array.value(i);
match variant {
Variant::List(list) => {
for elem in list.iter() {
append_value_to_builder(
list_builder.values(),
element_field.data_type(),
&elem,
)?;
}
list_builder.append(true);
}
Variant::Null => {
list_builder.append(false);
}
_ => {
return Err(DataFusionError::Execution(format!(
"Expected list variant for list conversion, got {:?}",
variant
)));
}
}
}
Ok(Arc::new(list_builder.finish()))
}
fn variant_to_list_of_variants(
variant_array: &VariantArray,
element_field: &FieldRef,
) -> Result<ArrayRef> {
let mut total_elements = 0;
for i in 0..variant_array.len() {
if variant_array.is_valid(i) {
if let Variant::List(list) = variant_array.value(i) {
total_elements += list.iter().count();
}
}
}
let mut variant_builder = VariantArrayBuilder::new(total_elements);
let mut offsets: Vec<i32> = vec![0];
let mut nulls: Vec<bool> = Vec::new();
let mut current_offset = 0i32;
for i in 0..variant_array.len() {
if !variant_array.is_valid(i) {
nulls.push(false);
offsets.push(current_offset);
continue;
}
let variant = variant_array.value(i);
match variant {
Variant::List(list) => {
for elem in list.iter() {
variant_builder.append_value(elem);
current_offset += 1;
}
nulls.push(true);
offsets.push(current_offset);
}
Variant::Null => {
nulls.push(false);
offsets.push(current_offset);
}
_ => {
return Err(DataFusionError::Execution(format!(
"Expected list variant for list conversion, got {:?}",
variant
)));
}
}
}
let values_struct = normalize_variant_struct(variant_builder.build().into());
let values_array: ArrayRef = Arc::new(values_struct);
let offset_buffer = OffsetBuffer::new(offsets.into());
let null_buffer = datafusion::arrow::buffer::NullBuffer::from(nulls);
let list_array = GenericListArray::<i32>::try_new(
element_field.clone(),
offset_buffer,
values_array,
Some(null_buffer),
)
.map_err(|e| DataFusionError::Execution(format!("Failed to create list array: {e}")))?;
Ok(Arc::new(list_array))
}
fn make_builder_for_type(data_type: &DataType) -> Result<BoxedArrayBuilder> {
match data_type {
DataType::Boolean => Ok(Box::new(BooleanBuilder::new())),
DataType::Int8 => Ok(Box::new(Int8Builder::new())),
DataType::Int16 => Ok(Box::new(Int16Builder::new())),
DataType::Int32 => Ok(Box::new(Int32Builder::new())),
DataType::Int64 => Ok(Box::new(Int64Builder::new())),
DataType::Float32 => Ok(Box::new(Float32Builder::new())),
DataType::Float64 => Ok(Box::new(Float64Builder::new())),
DataType::Utf8 => Ok(Box::new(StringBuilder::new())),
DataType::List(element_field) => {
let inner_builder = make_builder_for_type(element_field.data_type())?;
Ok(Box::new(ListBuilder::new(inner_builder)))
}
DataType::Struct(fields) => {
let field_builders: Vec<BoxedArrayBuilder> = fields
.iter()
.map(|f| make_builder_for_type(f.data_type()))
.collect::<Result<Vec<_>>>()?;
Ok(Box::new(StructBuilder::new(fields.clone(), field_builders)))
}
DataType::Map(entry_field, _) => {
let DataType::Struct(entry_fields) = entry_field.data_type() else {
return Err(DataFusionError::Execution(
"Map entry must be a struct".to_string(),
));
};
let key_field = entry_fields
.iter()
.next()
.ok_or_else(|| DataFusionError::Execution("Map must have key field".to_string()))?;
let value_field = entry_fields.iter().nth(1).ok_or_else(|| {
DataFusionError::Execution("Map must have value field".to_string())
})?;
let key_builder = make_builder_for_type(key_field.data_type())?;
let value_builder = make_builder_for_type(value_field.data_type())?;
Ok(Box::new(MapBuilder::new(
Some(map_field_names()),
key_builder,
value_builder,
)))
}
other => Err(DataFusionError::Execution(format!(
"Unsupported element type for variant list/map conversion: {other}"
))),
}
}
fn append_value_to_builder(
builder: &mut dyn ArrayBuilder,
data_type: &DataType,
variant: &Variant,
) -> Result<()> {
match (data_type, variant) {
(DataType::Boolean, Variant::BooleanTrue) => {
builder
.as_any_mut()
.downcast_mut::<BooleanBuilder>()
.ok_or_else(|| DataFusionError::Execution("Type mismatch".to_string()))?
.append_value(true);
}
(DataType::Boolean, Variant::BooleanFalse) => {
builder
.as_any_mut()
.downcast_mut::<BooleanBuilder>()
.ok_or_else(|| DataFusionError::Execution("Type mismatch".to_string()))?
.append_value(false);
}
(DataType::Int8, Variant::Int8(n)) => {
builder
.as_any_mut()
.downcast_mut::<Int8Builder>()
.ok_or_else(|| DataFusionError::Execution("Type mismatch".to_string()))?
.append_value(*n);
}
(DataType::Int16, Variant::Int16(n)) => {
builder
.as_any_mut()
.downcast_mut::<Int16Builder>()
.ok_or_else(|| DataFusionError::Execution("Type mismatch".to_string()))?
.append_value(*n);
}
(DataType::Int32, Variant::Int32(n)) => {
builder
.as_any_mut()
.downcast_mut::<Int32Builder>()
.ok_or_else(|| DataFusionError::Execution("Type mismatch".to_string()))?
.append_value(*n);
}
(DataType::Int64, Variant::Int64(n)) => {
builder
.as_any_mut()
.downcast_mut::<Int64Builder>()
.ok_or_else(|| DataFusionError::Execution("Type mismatch".to_string()))?
.append_value(*n);
}
(DataType::Int32, Variant::Int8(n)) => {
builder
.as_any_mut()
.downcast_mut::<Int32Builder>()
.ok_or_else(|| DataFusionError::Execution("Type mismatch".to_string()))?
.append_value(*n as i32);
}
(DataType::Int32, Variant::Int16(n)) => {
builder
.as_any_mut()
.downcast_mut::<Int32Builder>()
.ok_or_else(|| DataFusionError::Execution("Type mismatch".to_string()))?
.append_value(*n as i32);
}
(DataType::Int64, Variant::Int8(n)) => {
builder
.as_any_mut()
.downcast_mut::<Int64Builder>()
.ok_or_else(|| DataFusionError::Execution("Type mismatch".to_string()))?
.append_value(*n as i64);
}
(DataType::Int64, Variant::Int16(n)) => {
builder
.as_any_mut()
.downcast_mut::<Int64Builder>()
.ok_or_else(|| DataFusionError::Execution("Type mismatch".to_string()))?
.append_value(*n as i64);
}
(DataType::Int64, Variant::Int32(n)) => {
builder
.as_any_mut()
.downcast_mut::<Int64Builder>()
.ok_or_else(|| DataFusionError::Execution("Type mismatch".to_string()))?
.append_value(*n as i64);
}
(DataType::Float32, Variant::Float(f)) => {
builder
.as_any_mut()
.downcast_mut::<Float32Builder>()
.ok_or_else(|| DataFusionError::Execution("Type mismatch".to_string()))?
.append_value(*f);
}
(DataType::Float64, Variant::Double(f)) => {
builder
.as_any_mut()
.downcast_mut::<Float64Builder>()
.ok_or_else(|| DataFusionError::Execution("Type mismatch".to_string()))?
.append_value(*f);
}
(DataType::Float64, Variant::Float(f)) => {
builder
.as_any_mut()
.downcast_mut::<Float64Builder>()
.ok_or_else(|| DataFusionError::Execution("Type mismatch".to_string()))?
.append_value(*f as f64);
}
(DataType::Utf8, Variant::String(s)) => {
builder
.as_any_mut()
.downcast_mut::<StringBuilder>()
.ok_or_else(|| DataFusionError::Execution("Type mismatch".to_string()))?
.append_value(s);
}
(DataType::Utf8, Variant::ShortString(s)) => {
builder
.as_any_mut()
.downcast_mut::<StringBuilder>()
.ok_or_else(|| DataFusionError::Execution("Type mismatch".to_string()))?
.append_value(s);
}
(DataType::List(element_field), Variant::List(list)) => {
let list_builder = builder
.as_any_mut()
.downcast_mut::<ListBuilder<BoxedArrayBuilder>>()
.ok_or_else(|| DataFusionError::Execution("Type mismatch for list".to_string()))?;
for elem in list.iter() {
append_value_to_builder(list_builder.values(), element_field.data_type(), &elem)?;
}
list_builder.append(true);
}
(DataType::Struct(fields), Variant::Object(obj)) => {
let struct_builder = builder
.as_any_mut()
.downcast_mut::<StructBuilder>()
.ok_or_else(|| {
DataFusionError::Execution("Type mismatch for struct".to_string())
})?;
let field_builders = struct_builder.field_builders_mut();
for (i, field) in fields.iter().enumerate() {
let field_builder = field_builders.get_mut(i).ok_or_else(|| {
DataFusionError::Execution(format!(
"Missing field builder for {}",
field.name()
))
})?;
let field_value = obj.get(field.name()).unwrap_or(Variant::Null);
append_value_to_builder(field_builder.as_mut(), field.data_type(), &field_value)?;
}
struct_builder.append(true);
}
(DataType::Map(entry_field, _), Variant::Object(obj)) => {
let DataType::Struct(entry_fields) = entry_field.data_type() else {
return Err(DataFusionError::Execution(
"Map entry must be a struct".to_string(),
));
};
let key_field = entry_fields
.iter()
.next()
.ok_or_else(|| DataFusionError::Execution("Map must have key field".to_string()))?;
let value_field = entry_fields.iter().nth(1).ok_or_else(|| {
DataFusionError::Execution("Map must have value field".to_string())
})?;
let map_builder = builder
.as_any_mut()
.downcast_mut::<MapBuilder<BoxedArrayBuilder, BoxedArrayBuilder>>()
.ok_or_else(|| DataFusionError::Execution("Type mismatch for map".to_string()))?;
for (key, val) in obj.iter() {
append_value_to_builder(
map_builder.keys(),
key_field.data_type(),
&Variant::String(key),
)?;
append_value_to_builder(map_builder.values(), value_field.data_type(), &val)?;
}
map_builder.append(true)?;
}
(_, Variant::Null) => {
append_null_to_builder(builder, data_type)?;
}
_ => {
return Err(DataFusionError::Execution(format!(
"Cannot convert variant {:?} to type {data_type}",
variant
)));
}
}
Ok(())
}
fn append_null_to_builder(builder: &mut dyn ArrayBuilder, data_type: &DataType) -> Result<()> {
match data_type {
DataType::Boolean => builder
.as_any_mut()
.downcast_mut::<BooleanBuilder>()
.ok_or_else(|| DataFusionError::Execution("Type mismatch".to_string()))?
.append_null(),
DataType::Int8 => builder
.as_any_mut()
.downcast_mut::<Int8Builder>()
.ok_or_else(|| DataFusionError::Execution("Type mismatch".to_string()))?
.append_null(),
DataType::Int16 => builder
.as_any_mut()
.downcast_mut::<Int16Builder>()
.ok_or_else(|| DataFusionError::Execution("Type mismatch".to_string()))?
.append_null(),
DataType::Int32 => builder
.as_any_mut()
.downcast_mut::<Int32Builder>()
.ok_or_else(|| DataFusionError::Execution("Type mismatch".to_string()))?
.append_null(),
DataType::Int64 => builder
.as_any_mut()
.downcast_mut::<Int64Builder>()
.ok_or_else(|| DataFusionError::Execution("Type mismatch".to_string()))?
.append_null(),
DataType::Float32 => builder
.as_any_mut()
.downcast_mut::<Float32Builder>()
.ok_or_else(|| DataFusionError::Execution("Type mismatch".to_string()))?
.append_null(),
DataType::Float64 => builder
.as_any_mut()
.downcast_mut::<Float64Builder>()
.ok_or_else(|| DataFusionError::Execution("Type mismatch".to_string()))?
.append_null(),
DataType::Utf8 => builder
.as_any_mut()
.downcast_mut::<StringBuilder>()
.ok_or_else(|| DataFusionError::Execution("Type mismatch".to_string()))?
.append_null(),
DataType::List(_) => builder
.as_any_mut()
.downcast_mut::<ListBuilder<BoxedArrayBuilder>>()
.ok_or_else(|| DataFusionError::Execution("Type mismatch".to_string()))?
.append(false),
DataType::Struct(fields) => {
let struct_builder = builder
.as_any_mut()
.downcast_mut::<StructBuilder>()
.ok_or_else(|| DataFusionError::Execution("Type mismatch".to_string()))?;
let field_builders = struct_builder.field_builders_mut();
for (i, field) in fields.iter().enumerate() {
append_null_to_builder(field_builders[i].as_mut(), field.data_type())?;
}
struct_builder.append_null();
}
DataType::Map(_, _) => builder
.as_any_mut()
.downcast_mut::<MapBuilder<BoxedArrayBuilder, BoxedArrayBuilder>>()
.ok_or_else(|| DataFusionError::Execution("Type mismatch".to_string()))?
.append(false)?,
_ => {
return Err(DataFusionError::Execution(format!(
"Cannot append null to unsupported type: {data_type}"
)))
}
}
Ok(())
}