use datafusion::arrow::array::{
Array, AsArray, DictionaryArray, GenericListArray, Int32Array, ListBuilder, StringBuilder,
StructArray, StructBuilder,
};
use datafusion::arrow::datatypes::{DataType, Field, Fields, Int32Type};
use datafusion::common::{Result, internal_err};
use datafusion::error::DataFusionError;
use datafusion::logical_expr::{
ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility,
};
use micromegas_datafusion_extensions::properties::properties_udf::extract_properties_as_vec;
use std::any::Any;
use std::collections::HashMap;
use std::sync::Arc;
#[derive(Debug, PartialEq, Eq, Hash)]
pub struct PropertiesToDict {
signature: Signature,
}
impl PropertiesToDict {
pub fn new() -> Self {
Self::default()
}
}
impl Default for PropertiesToDict {
fn default() -> Self {
Self {
signature: Signature::exact(
vec![DataType::List(Arc::new(Field::new(
"Property",
DataType::Struct(Fields::from(vec![
Field::new("key", DataType::Utf8, false),
Field::new("value", DataType::Utf8, false),
])),
false,
)))],
Volatility::Immutable,
),
}
}
}
impl ScalarUDFImpl for PropertiesToDict {
fn as_any(&self) -> &dyn Any {
self
}
fn name(&self) -> &str {
"properties_to_dict"
}
fn signature(&self) -> &Signature {
&self.signature
}
fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
Ok(DataType::Dictionary(
Box::new(DataType::Int32),
Box::new(DataType::List(Arc::new(Field::new(
"Property",
DataType::Struct(Fields::from(vec![
Field::new("key", DataType::Utf8, false),
Field::new("value", DataType::Utf8, false),
])),
false,
)))),
))
}
fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
let args = args.args;
if args.len() != 1 {
return internal_err!("properties_to_dict expects exactly one argument");
}
match &args[0] {
ColumnarValue::Array(array) => {
let list_array = array
.as_any()
.downcast_ref::<GenericListArray<i32>>()
.ok_or_else(|| {
DataFusionError::Internal(
"properties_to_dict requires a list array as input".to_string(),
)
})?;
let dict_array = build_dictionary_from_properties(list_array)?;
Ok(ColumnarValue::Array(Arc::new(dict_array)))
}
ColumnarValue::Scalar(_) => {
internal_err!("properties_to_dict does not support scalar inputs")
}
}
}
}
struct PropertiesDictionaryBuilder {
map: HashMap<Vec<(String, String)>, usize>,
values_builder: ListBuilder<StructBuilder>,
keys: Vec<Option<i32>>,
}
impl PropertiesDictionaryBuilder {
fn new(capacity: usize) -> Self {
let prop_struct_fields = vec![
Field::new("key", DataType::Utf8, false),
Field::new("value", DataType::Utf8, false),
];
let prop_field = Arc::new(Field::new(
"Property",
DataType::Struct(Fields::from(prop_struct_fields.clone())),
false,
));
let values_builder =
ListBuilder::new(StructBuilder::from_fields(prop_struct_fields, capacity))
.with_field(prop_field);
Self {
map: HashMap::new(),
values_builder,
keys: Vec::with_capacity(capacity),
}
}
fn append_property_list(&mut self, struct_array: &StructArray) -> Result<()> {
let prop_vec = extract_properties_as_vec(struct_array)?;
match self.map.get(&prop_vec) {
Some(&index) => {
self.keys.push(Some(index as i32));
}
None => {
let new_index = self.map.len();
self.add_to_values(&prop_vec)?;
self.map.insert(prop_vec, new_index);
self.keys.push(Some(new_index as i32));
}
}
Ok(())
}
fn append_null(&mut self) {
self.keys.push(None);
}
fn add_to_values(&mut self, properties: &[(String, String)]) -> Result<()> {
let struct_builder = self.values_builder.values();
for (key, value) in properties {
struct_builder
.field_builder::<StringBuilder>(0)
.ok_or_else(|| DataFusionError::Internal("Failed to get key builder".to_string()))?
.append_value(key);
struct_builder
.field_builder::<StringBuilder>(1)
.ok_or_else(|| {
DataFusionError::Internal("Failed to get value builder".to_string())
})?
.append_value(value);
struct_builder.append(true);
}
self.values_builder.append(true);
Ok(())
}
fn finish(mut self) -> Result<DictionaryArray<Int32Type>> {
let keys = Int32Array::from(self.keys);
let values = Arc::new(self.values_builder.finish());
DictionaryArray::try_new(keys, values)
.map_err(|e| DataFusionError::ArrowError(Box::new(e), None))
}
}
pub fn build_dictionary_from_properties(
list_array: &GenericListArray<i32>,
) -> Result<DictionaryArray<Int32Type>> {
let mut builder = PropertiesDictionaryBuilder::new(list_array.len());
for i in 0..list_array.len() {
if list_array.is_null(i) {
builder.append_null();
} else {
let start = list_array.value_offsets()[i] as usize;
let end = list_array.value_offsets()[i + 1] as usize;
let sliced_values = list_array.values().slice(start, end - start);
let struct_array = sliced_values.as_struct();
builder.append_property_list(struct_array)?;
}
}
builder.finish()
}